Workflow patterns¶
The 11 helpers compose into a small set of shapes that cover ~95% of real workflows. This page is the vocabulary — once you see these shapes, you can mix and match them for whatever you're building.
Every pattern below is a runnable snippet. The full set under examples/ shows variations.
1. ETL — DB to DB¶
The bread-and-butter case. Pull rows from one Postgres table, shape them, write to another.
from athena_sdk_lite import Workflow
from athena_sdk_lite.nodes import postgres, transform, filter
with Workflow("etl") as wf:
rows = postgres("load", operation="select", query="SELECT ... FROM source", connection={...})
enriched = transform("shape", inputs=rows, code=SHAPE_CODE)
kept = filter("keep-recent", inputs=enriched,
conditions=[{"field": "year", "op": "gte", "value": 2024}])
postgres("save", inputs=kept, operation="upsert",
table="target", upsert_key="id", connection={...})
When to use: straight transformations between SQL tables.
2. Source enrichment with AI¶
Pull data, normalize it in Python, classify each row with an Athena agent, dump results.
from athena_sdk_lite import Workflow
from athena_sdk_lite.nodes import pubmed, transform, ai_tagging, output
with Workflow("enrich-and-classify") as wf:
papers = pubmed("search", query="checkpoint inhibitors", max_results=200)
enriched = transform("extract-features", inputs=papers, code=FEATURES_CODE)
tagged = ai_tagging("classify", inputs=enriched, agent_url="{{ agent_url }}",
input_keys={"title": "title", "abstract": "abstract"})
output("results", inputs=tagged, format="json")
When to use: any "data → enrich → AI verdict → store" pipeline. Drop-in replace pubmed with postgres, s3, or http for different sources.
3. Multi-source ingest¶
Pull from several sources, concat them, treat as one stream.
from athena_sdk_lite import Workflow
from athena_sdk_lite.nodes import pubmed, http, postgres, merge, transform
with Workflow("multi-source") as wf:
biomed = pubmed("biomed", query="...")
web = http("web", url="https://api.example.com/articles")
db = postgres("db", operation="select", query="SELECT ... FROM articles",
connection={...})
combined = merge("union", inputs=[biomed, web, db], how="concat")
transform("normalize", inputs=combined, code=NORMALIZE_CODE)
When to use: you want to apply the same downstream pipeline to records from multiple origins.
4. Multi-agent scoring panel (fan-out + fan-in)¶
Send the same rows to N different AI agents in parallel, join their scores back into one row per record. This is the canonical use of fan-out + merge(how="join").
from athena_sdk_lite import Workflow
from athena_sdk_lite.nodes import postgres, ai_tagging, merge, branch, output
with Workflow("panel-grading") as wf:
rows = postgres("load", operation="select", query="...", connection={...})
# Fan-out — same upstream listed in multiple inputs= makes the engine
# duplicate the row stream to each agent in parallel.
score_a = ai_tagging("score_a", inputs=rows, agent_url="...", output_column="score_a")
score_b = ai_tagging("score_b", inputs=rows, agent_url="...", output_column="score_b")
score_c = ai_tagging("score_c", inputs=rows, agent_url="...", output_column="score_c")
# Fan-in — wait for all three, align on `id`, one output row per input.
panel = merge("panel", inputs=[score_a, score_b, score_c], on="id", how="join")
output("scored", inputs=panel, format="json")
When to use: parallel AI calls where each agent produces a different score column for the same row. See examples/09_split_agents_merge_branch.py for a 5-agent variant.
5. Triage + escalation (branch with two distinct sinks)¶
Classify rows, route critical ones to a webhook/escalation table, route the rest to a queue.
from athena_sdk_lite import Workflow
from athena_sdk_lite.nodes import postgres, ai_tagging, branch, transform, http
with Workflow("triage") as wf:
events = postgres("load", operation="select", query="...", connection={...})
classified = ai_tagging("classify", inputs=events, agent_url="...")
gate = branch("is-critical", inputs=classified,
condition="$input.data.get('severity') == 'critical'")
# TRUE side — page someone
alert = transform("build-alert", inputs=gate.out("true"), code=BUILD_ALERT_CODE)
http("page", inputs=alert, url="https://hooks.slack.com/...",
method="POST", bearer_token="{{ slack_token }}")
# FALSE side — log to a routine queue
postgres("queue", inputs=gate.out("false"), operation="insert",
table="triage_queue", connection={...})
When to use: any "critical vs routine" routing. The escalation can be Slack/PagerDuty (via http), a database table, or another output. See examples/11_triage_pipeline.py.
6. Validation + dead-letter routing¶
Run rows through a structural check; valid rows go to the main path, invalid rows go to a quarantine sink so nothing is silently dropped.
from athena_sdk_lite import Workflow
from athena_sdk_lite.nodes import postgres, transform, branch, output
with Workflow("validate-and-route") as wf:
rows = postgres("load", operation="select", query="...", connection={...})
# Tag each row with a validity flag (write your own logic in `transform`)
checked = transform("validate", inputs=rows, code=VALIDATE_CODE)
gate = branch("is-valid", inputs=checked,
condition="$input.data.get('valid') is True")
# Valid → continue downstream
postgres("save", inputs=gate.out("true"), operation="upsert",
table="processed", upsert_key="id", connection={...})
# Invalid → dead-letter queue for manual review
output("quarantine", inputs=gate.out("false"), format="json")
When to use: when you can't afford to silently drop bad rows. The dead-letter sink keeps them visible.
7. AI-classify and persist (S3 → Postgres)¶
Read files from S3, classify each with an agent, write structured results to Postgres.
from athena_sdk_lite import Workflow
from athena_sdk_lite.nodes import s3, transform, ai_tagging, postgres
with Workflow("s3-classify-store") as wf:
docs = s3("read", bucket="my-bucket", operation="read",
file_path="incoming/*.json", region="us-east-1")
normalized = transform("normalize", inputs=docs, code=NORMALIZE_CODE)
classified = ai_tagging("classify", inputs=normalized, agent_url="...")
postgres("store", inputs=classified, operation="upsert",
table="classified_docs", upsert_key="doc_id", connection={...})
When to use: anything that reads files and persists structured AI output. S3 can be swapped for http (poll an API) or postgres (cursor a table).
8. Local file as input (CSV / JSON / Excel)¶
Use the local_file helper for any file-on-disk input. The engine reads the file at execution time and emits rows in the standard source-node shape.
from athena_sdk_lite import Workflow
from athena_sdk_lite.nodes import local_file, filter, ai_tagging, output
with Workflow("from-csv") as wf:
rows = local_file("load", path="/data/orders.csv") # auto-detect by extension
# Or: local_file("load", path="...", format="json")
# Or: local_file("load", path="...", format="excel", sheet="Q4")
kept = filter("keep-large", inputs=rows,
conditions=[{"field": "total", "op": "gt", "value": 100}])
classified = ai_tagging("classify", inputs=kept, agent_url="...")
output("results", inputs=classified, format="json")
| Kwarg | Default | Notes |
|---|---|---|
path |
required | Absolute or workflow-relative file path. |
format |
"auto" |
"csv", "json", "excel", or "auto" (infers from extension). |
sheet |
none | Excel-only. Defaults to first sheet. |
- JSON files may be a top-level array OR
{"data": [...]}— both are accepted. - Excel requires
pip install openpyxl. The helper raises a clear error if it's missing. - CSV uses
csv.DictReader— UTF-8 by default.
Don't try open() inside a transform. The python_transform sandbox hard-blocks file I/O. The whole reason local_file exists is to read files outside that sandbox at execution time.
Full worked example: examples/12_read_local_file.py.
9. Periodic API poll, transform, sink¶
Hit an HTTP endpoint, project + reshape, drop to S3 or Postgres. Triggering is your problem (cron, scheduler) — the SDK is the body of the work.
from athena_sdk_lite import Workflow
from athena_sdk_lite.nodes import http, transform, s3
with Workflow("api-snapshot") as wf:
wf.set_variable("api_token", os.environ["API_TOKEN"])
raw = http("fetch",
url="https://api.example.com/v1/snapshots",
headers={"Authorization": "Bearer {{ api_token }}"},
timeout=60)
clean = transform("project", inputs=raw, code=PROJECTION_CODE)
s3("archive", inputs=clean,
bucket="snapshots", operation="write",
file_path="snapshots/{{ today }}.json",
file_format="json", region="us-east-1")
When to use: scheduled pulls that need transformation before storage.
Combining patterns¶
Real workflows mix these. Some examples from the repo:
- Pattern 2 + Pattern 5 — enrich, classify, then triage on the classifier output. See
examples/11_triage_pipeline.py. - Pattern 4 + Pattern 5 — multi-agent scoring, then branch on combined-score threshold. See
examples/09_split_agents_merge_branch.py. - Pattern 3 + Pattern 6 — multi-source ingest with validation routing.
There is no special syntax for combining patterns — every connection is still just inputs=. Read top-to-bottom in the workflow file; the shape declares itself.
What these patterns can't cleanly express¶
Worth being honest about the edges. If you find yourself fighting the SDK to get one of these, see the workarounds.
Multi-way switch (3+ destinations)¶
branch is two-way only. For 3+ ways:
# Option A: chain branches
is_a = branch("is_a", inputs=rows, condition="$input.data.get('route') == 'A'")
is_b = branch("is_b", inputs=is_a.out("false"),
condition="$input.data.get('route') == 'B'")
output("path_A", inputs=is_a.out("true"))
output("path_B", inputs=is_b.out("true"))
output("path_C", inputs=is_b.out("false"))
# Option B: push routing into transform, then a single branch per destination
classified = transform("route", inputs=rows, code='''
def run(inputs):
out = []
for r in inputs.get("data", []):
r["route"] = "A" if r["score"] > 0.8 else "B" if r["score"] > 0.4 else "C"
out.append(r)
return {"data": out}
''')
Retry on failure¶
No native retry primitive. Wrap inside transform:
RETRY_CODE = """
import time
def run(inputs):
out = []
for row in inputs.get("data", []):
for attempt in range(3):
try:
row["enriched"] = do_external_call(row)
break
except Exception:
if attempt == 2: raise
time.sleep(2 ** attempt)
out.append(row)
return {"data": out}
"""
Loop over a non-row collection¶
The engine already loops over row batches implicitly. If you need to loop over something else (e.g. "for each of these 50 URLs, fetch and combine"), materialize the collection as a row stream first, then let the engine iterate:
URLS_AS_ROWS = """
def run(inputs):
return {"data": [{"url": u} for u in MY_URL_LIST]}
"""
with Workflow("iterate-urls") as wf:
seed = transform("urls", code=URLS_AS_ROWS, mode="function")
fetched = http("fetch-each", inputs=seed, url="{{ row.url }}")
...
Streaming / windowed operations¶
This SDK is batch-only by design. For streaming, you want the full athena-sdk + nexus-backend.
Validation will catch many of the silent-footgun shapes¶
wf.validate() now flags:
branchnodes with only one port consumed (false-path or true-path silently dropped)outputnodes with downstream consumers (output is terminal — can't chain off it)mergenodes with fewer than 2 inputs (fan-in is meaningless with 1)- Plus the original checks: duplicate names, dangling edges, no terminal, cycles
Always run wf.validate() before wf.run(), and put it in your pre-commit hook so structural mistakes can't get into main.
Next¶
- Building a workflow → — step-by-step
- How nodes connect → —
inputs=, ports, topology - Reference → — every helper, every kwarg