Skip to content

How nodes connect

Every node in this SDK is connected to its upstream through a single keyword: inputs=. That one keyword covers every topology — linear, fan-out, fan-in, branching, and merging — without any operator overloading or special syntax.

This page explains exactly how that works and how to express every common shape.


1. The basic rule

b = helper("b", inputs=a, ...)

This creates an edge from node a to node b. Edges are not declared separately — they are inferred from inputs=.

inputs= accepts:

  • A single Node: inputs=upstream
  • A named output handle: inputs=branch_node.out("true")
  • A list of either: inputs=[a, b, c]
  • None (or omitted) — typical for source nodes that have no upstream

2. Ports

Every node has one default output port called "output" and one default input port called "input". When you write inputs=a, the SDK creates an edge from a.outputb.input.

Branch nodes are the one exception: they have two named output ports, "true" and "false". Use node.out("port") to reference a non-default port:

gate = branch("g", inputs=rows, condition="$input.data.x > 0")
output("yes", inputs=gate.out("true"))    # consumes the true side
output("no",  inputs=gate.out("false"))   # consumes the false side

If you accidentally write inputs=gate (without .out(...)), both downstreams would read from the default "output" port — which a branch node doesn't have. The validator catches this.

3. Linear

   load ──> enrich ──> classify ──> save
load = postgres("load", operation="select", query="...", connection={...})
enrich = transform("enrich", inputs=load, code="...")
classify = ai_tagging("classify", inputs=enrich, agent_url="...")
postgres("save", inputs=classify, operation="insert", table="results", connection={...})

Each node names its upstream. Order in the file matches order in the DAG.

4. Fan-out (implicit)

List the same upstream in multiple downstream inputs=. No special syntax — the engine duplicates the row stream to each consumer.

                ┌──> score_methodology
                ├──> score_novelty
   candidates ──┤
                ├──> score_clinical
                └──> score_citations
candidates = postgres("pg_read", operation="select", query="...", connection={...})

score_methodology = ai_tagging("score_methodology", inputs=candidates, agent_url="...")
score_novelty     = ai_tagging("score_novelty",     inputs=candidates, agent_url="...")
score_clinical    = ai_tagging("score_clinical",    inputs=candidates, agent_url="...")
score_citations   = ai_tagging("score_citations",   inputs=candidates, agent_url="...")

All four downstream nodes see the same row stream. The engine handles parallel execution.

5. Fan-in (merge)

merge waits for all of its upstreams to complete, then emits one combined batch.

   score_a ──┐
   score_b ──┼──> merge_scores
   score_c ──┘
merged = merge(
    "merge_scores",
    inputs=[score_a, score_b, score_c],
    on="id",          # column to align rows on (join mode only)
    how="join",       # or "concat"
)

Two strategies:

how Behavior
"join" Align rows across upstreams by on= column. One output row per input id.
"concat" Stack rows from all upstreams. No alignment. on= ignored.

Use "join" when each upstream emits one row per source id (typical for parallel agents producing scores). Use "concat" when each upstream emits independent rows (typical for branch+merge unioning two paths).

6. Branch

A branch routes each row to one of two downstreams based on a condition.

                ┌── true ──> alerts
   rows ──> gate
                └── false ─> queue
gate = branch("gate", inputs=rows, condition="$input.data.get('score') > 0.8")

output("alerts", inputs=gate.out("true"))
output("queue",  inputs=gate.out("false"))

The condition is an expression string, not Python. The engine evaluates it per row. $input.data is the current row dict; use .get(...) for safety.

Branch + merge (union the paths)

                ┌── true ──> for_yes ──┐
   rows ──> gate                       ├──> combined
                └── false ─> for_no ───┘
gate = branch("gate", inputs=rows, condition="...")
yes_path = transform("for_yes", inputs=gate.out("true"),  code="...")
no_path  = transform("for_no",  inputs=gate.out("false"), code="...")
combined = merge("combined", inputs=[yes_path, no_path], how="concat")

how="concat" because the two paths produce independent rows — there's no alignment key.

7. Multi-way routing

branch is two-way. For 3+ way routing, use transform to compute a route column, then chain branches (or branch on string equality):

classified = transform("route", inputs=rows, code='''
def run(inputs):
    out = []
    for r in inputs.get("data", []):
        r["route"] = "A" if r["score"] > 0.8 else "B" if r["score"] > 0.4 else "C"
        out.append(r)
    return {"data": out}
''')

is_a = branch("is_a", inputs=classified, condition="$input.data.get('route') == 'A'")
output("path_A", inputs=is_a.out("true"))

is_b = branch("is_b", inputs=is_a.out("false"),
              condition="$input.data.get('route') == 'B'")
output("path_B", inputs=is_b.out("true"))
output("path_C", inputs=is_b.out("false"))

8. The full picture (a real example)

From examples/09_split_agents_merge_branch.py:

   pg_read
      ├──> score_methodology ──┐
      ├──> score_novelty       │
      ├──> score_stats         ├──> merge_scores ──> branch_threshold
      ├──> score_clinical      │                       │ true     │ false
      └──> score_citation     ─┘                       ▼          ▼
                                                  pg_upsert   out_review

Postgres source → 5-way fan-out across parallel AI agents → fan-in via merge(how="join", on="id") → branch on a combined-score threshold → two terminal sinks.

The whole thing is ~100 lines of Python. Every connection is just inputs=.

9. Scopes — name-prefix isolation

When the same helper is called multiple times (e.g. screening two different drugs), node names collide. Use wf.scope("prefix") to isolate them:

def fetch_evidence(*, topic):
    papers = pubmed("search", query=topic, max_results=200)
    return filter("recent", inputs=papers,
                  conditions=[{"field": "year", "op": "gte", "value": 2021}])

with Workflow("multi-topic") as wf:
    with wf.scope("onco"):
        onco = fetch_evidence(topic="glioblastoma CAR-T")
        # nodes become: onco.search, onco.recent

    with wf.scope("immuno"):
        immuno = fetch_evidence(topic="checkpoint inhibitors")
        # nodes become: immuno.search, immuno.recent

    merge("combined", inputs=[onco, immuno], how="concat")

Nested scopes join with .scope("a").scope("b") produces a.b.<name>.

10. What the engine does at run time

You declare the graph; the engine handles:

  • Topological order: runs nodes in dependency order. Independent branches run in parallel.
  • Stream duplication: when fan-out has multiple consumers, each gets a copy of the upstream's row batch.
  • Merge waiting: a merge node blocks until all its upstreams complete.
  • Branch routing: evaluates the condition per row; routes to true or false consumers.
  • Variable resolution: {{ name }} references are replaced from wf.variables at run time.

You don't write any of this orchestration yourself. The whole point of declaring the graph with inputs= is to give the engine enough information to do it for you.

11. Validating connections

wf.validate() checks:

  • Every non-source node has an upstream
  • Every branch's true and false ports are wired (or explicitly skipped)
  • No cycles
  • No orphan nodes (built but not connected to anything)
  • Required kwargs present per node type

Always call it before wf.run():

issues = wf.validate()
assert issues == [], issues

Next