How nodes connect¶
Every node in this SDK is connected to its upstream through a single keyword: inputs=. That one keyword covers every topology — linear, fan-out, fan-in, branching, and merging — without any operator overloading or special syntax.
This page explains exactly how that works and how to express every common shape.
1. The basic rule¶
This creates an edge from node a to node b. Edges are not declared separately — they are inferred from inputs=.
inputs= accepts:
- A single Node:
inputs=upstream - A named output handle:
inputs=branch_node.out("true") - A list of either:
inputs=[a, b, c] None(or omitted) — typical for source nodes that have no upstream
2. Ports¶
Every node has one default output port called "output" and one default input port called "input". When you write inputs=a, the SDK creates an edge from a.output → b.input.
Branch nodes are the one exception: they have two named output ports, "true" and "false". Use node.out("port") to reference a non-default port:
gate = branch("g", inputs=rows, condition="$input.data.x > 0")
output("yes", inputs=gate.out("true")) # consumes the true side
output("no", inputs=gate.out("false")) # consumes the false side
If you accidentally write inputs=gate (without .out(...)), both downstreams would read from the default "output" port — which a branch node doesn't have. The validator catches this.
3. Linear¶
load = postgres("load", operation="select", query="...", connection={...})
enrich = transform("enrich", inputs=load, code="...")
classify = ai_tagging("classify", inputs=enrich, agent_url="...")
postgres("save", inputs=classify, operation="insert", table="results", connection={...})
Each node names its upstream. Order in the file matches order in the DAG.
4. Fan-out (implicit)¶
List the same upstream in multiple downstream inputs=. No special syntax — the engine duplicates the row stream to each consumer.
candidates = postgres("pg_read", operation="select", query="...", connection={...})
score_methodology = ai_tagging("score_methodology", inputs=candidates, agent_url="...")
score_novelty = ai_tagging("score_novelty", inputs=candidates, agent_url="...")
score_clinical = ai_tagging("score_clinical", inputs=candidates, agent_url="...")
score_citations = ai_tagging("score_citations", inputs=candidates, agent_url="...")
All four downstream nodes see the same row stream. The engine handles parallel execution.
5. Fan-in (merge)¶
merge waits for all of its upstreams to complete, then emits one combined batch.
merged = merge(
"merge_scores",
inputs=[score_a, score_b, score_c],
on="id", # column to align rows on (join mode only)
how="join", # or "concat"
)
Two strategies:
how |
Behavior |
|---|---|
"join" |
Align rows across upstreams by on= column. One output row per input id. |
"concat" |
Stack rows from all upstreams. No alignment. on= ignored. |
Use "join" when each upstream emits one row per source id (typical for parallel agents producing scores). Use "concat" when each upstream emits independent rows (typical for branch+merge unioning two paths).
6. Branch¶
A branch routes each row to one of two downstreams based on a condition.
gate = branch("gate", inputs=rows, condition="$input.data.get('score') > 0.8")
output("alerts", inputs=gate.out("true"))
output("queue", inputs=gate.out("false"))
The condition is an expression string, not Python. The engine evaluates it per row. $input.data is the current row dict; use .get(...) for safety.
Branch + merge (union the paths)¶
gate = branch("gate", inputs=rows, condition="...")
yes_path = transform("for_yes", inputs=gate.out("true"), code="...")
no_path = transform("for_no", inputs=gate.out("false"), code="...")
combined = merge("combined", inputs=[yes_path, no_path], how="concat")
how="concat" because the two paths produce independent rows — there's no alignment key.
7. Multi-way routing¶
branch is two-way. For 3+ way routing, use transform to compute a route column, then chain branches (or branch on string equality):
classified = transform("route", inputs=rows, code='''
def run(inputs):
out = []
for r in inputs.get("data", []):
r["route"] = "A" if r["score"] > 0.8 else "B" if r["score"] > 0.4 else "C"
out.append(r)
return {"data": out}
''')
is_a = branch("is_a", inputs=classified, condition="$input.data.get('route') == 'A'")
output("path_A", inputs=is_a.out("true"))
is_b = branch("is_b", inputs=is_a.out("false"),
condition="$input.data.get('route') == 'B'")
output("path_B", inputs=is_b.out("true"))
output("path_C", inputs=is_b.out("false"))
8. The full picture (a real example)¶
From examples/09_split_agents_merge_branch.py:
pg_read
│
├──> score_methodology ──┐
├──> score_novelty │
├──> score_stats ├──> merge_scores ──> branch_threshold
├──> score_clinical │ │ true │ false
└──> score_citation ─┘ ▼ ▼
pg_upsert out_review
Postgres source → 5-way fan-out across parallel AI agents → fan-in via merge(how="join", on="id") → branch on a combined-score threshold → two terminal sinks.
The whole thing is ~100 lines of Python. Every connection is just inputs=.
9. Scopes — name-prefix isolation¶
When the same helper is called multiple times (e.g. screening two different drugs), node names collide. Use wf.scope("prefix") to isolate them:
def fetch_evidence(*, topic):
papers = pubmed("search", query=topic, max_results=200)
return filter("recent", inputs=papers,
conditions=[{"field": "year", "op": "gte", "value": 2021}])
with Workflow("multi-topic") as wf:
with wf.scope("onco"):
onco = fetch_evidence(topic="glioblastoma CAR-T")
# nodes become: onco.search, onco.recent
with wf.scope("immuno"):
immuno = fetch_evidence(topic="checkpoint inhibitors")
# nodes become: immuno.search, immuno.recent
merge("combined", inputs=[onco, immuno], how="concat")
Nested scopes join with . — scope("a").scope("b") produces a.b.<name>.
10. What the engine does at run time¶
You declare the graph; the engine handles:
- Topological order: runs nodes in dependency order. Independent branches run in parallel.
- Stream duplication: when fan-out has multiple consumers, each gets a copy of the upstream's row batch.
- Merge waiting: a
mergenode blocks until all its upstreams complete. - Branch routing: evaluates the condition per row; routes to
trueorfalseconsumers. - Variable resolution:
{{ name }}references are replaced fromwf.variablesat run time.
You don't write any of this orchestration yourself. The whole point of declaring the graph with inputs= is to give the engine enough information to do it for you.
11. Validating connections¶
wf.validate() checks:
- Every non-source node has an upstream
- Every branch's
trueandfalseports are wired (or explicitly skipped) - No cycles
- No orphan nodes (built but not connected to anything)
- Required kwargs present per node type
Always call it before wf.run():
Next¶
- Building a workflow → — step-by-step build
- Vibe coding → — let an agent generate the connections for you
- Reference → — every helper, every kwarg