Building a workflow¶
A workflow is a directed acyclic graph (DAG) of nodes. You build it in pure Python — no config files, no YAML — and the SDK serializes it for the engine to run.
This page walks you through building a workflow end-to-end. By the end you'll have a runnable Python file that pulls data, transforms it, sends it to an AI agent, branches on the result, and writes outputs.
1. Install¶
For local development from this repo:
2. The empty workflow¶
Every workflow file starts the same way:
from athena_sdk_lite import Workflow
with Workflow("my-pipeline", description="What this does, briefly") as wf:
pass # nodes go here
if __name__ == "__main__":
print(wf.visualize())
print("validation:", wf.validate() or "ok")
with Workflow(...) as wf: sets the active workflow for function-first helpers, which is what every example in this repo uses.
3. Add a source¶
Sources have no upstream — they emit rows.
from athena_sdk_lite.nodes import postgres
with Workflow("orders-pipeline") as wf:
orders = postgres(
"load-orders",
operation="select",
query="SELECT id, customer_id, total FROM orders WHERE status='active'",
connection={
"host": "db.internal",
"database": "orders",
"user": "reader",
"password": "redacted",
},
)
Other source helpers:
pubmed(name, query=..., max_results=100)— biomedical literatures3(name, bucket=..., operation="read", file_path=...)— object storagehttp(name, url=..., method="GET")— generic HTTP
Or use wf.add_node(...) for engine node types not in the starter set (twitter, edgar, snowflake, etc.).
4. Transform the data¶
transform runs your Python on the row batch. inputs is a dict with the row stream at inputs["data"].
from athena_sdk_lite.nodes import transform
ENRICH_CODE = """
def run(inputs):
rows = inputs.get("data", []) or []
out = []
for row in rows:
row = dict(row)
row["high_value"] = row.get("total", 0) > 1000
out.append(row)
return {"data": out}
"""
with Workflow("orders-pipeline") as wf:
orders = postgres(...)
enriched = transform(
"enrich",
inputs=orders, # wires this node to the upstream
code=ENRICH_CODE,
mode="function", # default; defines run(inputs)
)
The wire is the inputs=orders kwarg. That single keyword is how every node in the SDK declares its upstream.
5. Call an AI agent¶
from athena_sdk_lite.nodes import ai_tagging
tagged = ai_tagging(
"classify",
inputs=enriched,
agent_url="https://your-athena-host.example/workspace/WS/agent/APP/AGENT/CHAT",
response_type="json",
input_keys={ # map prompt variables to row columns
"total": "total",
"high_value": "high_value",
},
)
input_keys is how the agent sees fields from your row. Don't hardcode the URL — use variables for that.
6. Branch on the result¶
from athena_sdk_lite.nodes import branch, output
gate = branch(
"is-fraud",
inputs=tagged,
condition="$input.data.get('fraud_score') > 0.7",
)
output("alerts", inputs=gate.out("true"), format="json")
output("queue", inputs=gate.out("false"), format="json")
The condition is not Python — it's an expression string evaluated per row by the engine. $input.data is the current row dict.
The helper is named branch because if is a Python reserved word; the underlying engine node type is "if".
7. Variables and credentials¶
Never inline credentials in node configs. Use workflow-scoped variables and {{ name }} templating:
import os
with Workflow("orders-pipeline") as wf:
wf.set_variable("db_password", os.environ["DB_PASSWORD"])
wf.set_variable("agent_url", os.environ["AGENT_URL"])
orders = postgres(
"load",
operation="select",
query="...",
connection={
"host": "db.internal",
"database": "orders",
"user": "reader",
"password": "{{ db_password }}", # ← templated
},
)
tagged = ai_tagging(
"classify",
inputs=orders,
agent_url="{{ agent_url }}", # ← templated
)
The engine resolves {{ name }} at run time from the workflow's variables dict.
8. Validate and inspect¶
Always run these two before you run the workflow:
print(wf.visualize()) # ASCII diagram of the DAG
issues = wf.validate() # list of structural issues; [] if good
assert issues == [], issues
validate() is pure — no I/O, no side effects. Run it in CI on every workflow file. See tests/test_examples_validate.py for a reference implementation.
9. Run¶
The engine handles execution order, fan-out, and merge waits. You don't write loops, scheduling, or threading.
Inspect a node's output¶
result is a plain dict keyed by node name. Pull any node's output by name:
result = wf.run()
print(result["classify"]) # full output dict for the classify node
print(result["classify"]["data"]) # just the rows
print(result["classify"]["data"][0]) # the first row
Most node types (sources, transforms, ai_tagging) emit {"data": [...rows...], "metadata": {...}}. To see every node at once:
for name, output in result.items():
print(f"\n── {name} ──")
rows = (output or {}).get("data", []) if isinstance(output, dict) else None
if isinstance(rows, list):
for i, row in enumerate(rows[:3]):
print(f" [{i}] {row}")
if len(rows) > 3:
print(f" ... +{len(rows) - 3} more")
else:
print(f" {output}")
Every example in examples/ has a commented print(result["<node>"]) recipe at the bottom showing which node to inspect for that workflow.
10. Test¶
Two patterns. Pick one or both.
Structural test (fast, pure):
def test_orders_pipeline_validates():
from workflows.orders_pipeline import wf
assert wf.validate() == []
assert "load-orders" in wf
assert "classify" in wf
Subprocess test (catches example/doc drift; see tests/test_examples_validate.py):
import subprocess, sys
def test_workflow_runs_cleanly():
result = subprocess.run([sys.executable, "workflows/orders_pipeline.py"],
capture_output=True, text=True)
assert result.returncode == 0
assert "validation: ok" in result.stdout
The full workflow¶
Putting steps 1–9 together:
import os
from athena_sdk_lite import Workflow
from athena_sdk_lite.nodes import postgres, transform, ai_tagging, branch, output
ENRICH_CODE = """
def run(inputs):
rows = inputs.get("data", []) or []
out = []
for row in rows:
row = dict(row)
row["high_value"] = row.get("total", 0) > 1000
out.append(row)
return {"data": out}
"""
with Workflow("orders-pipeline",
description="Triage active orders for fraud review") as wf:
wf.set_variable("db_password", os.environ.get("DB_PASSWORD", "redacted"))
wf.set_variable("agent_url", os.environ.get(
"AGENT_URL",
"https://your-athena-host.example/workspace/WS/agent/APP/AGENT/CHAT",
))
orders = postgres(
"load-orders",
operation="select",
query="SELECT id, customer_id, total FROM orders WHERE status='active'",
connection={
"host": "db.internal", "database": "orders",
"user": "reader", "password": "{{ db_password }}",
},
)
enriched = transform("enrich", inputs=orders, code=ENRICH_CODE)
tagged = ai_tagging(
"classify", inputs=enriched,
agent_url="{{ agent_url }}",
response_type="json",
input_keys={"total": "total", "high_value": "high_value"},
)
gate = branch(
"is-fraud", inputs=tagged,
condition="$input.data.get('fraud_score') > 0.7",
)
output("alerts", inputs=gate.out("true"), format="json")
output("queue", inputs=gate.out("false"), format="json")
if __name__ == "__main__":
print(wf.visualize())
print()
issues = wf.validate()
print("validation:", "ok" if not issues else issues)
Save as workflows/orders_pipeline.py, run python workflows/orders_pipeline.py, see the DAG, and ship it.
Next¶
- How nodes connect → —
inputs=, ports, fan-out, fan-in, branch+merge - Vibe coding → — let an AI agent author workflows in this SDK
- Reference → — every helper, every kwarg, every error