Skip to content

Building a workflow

A workflow is a directed acyclic graph (DAG) of nodes. You build it in pure Python — no config files, no YAML — and the SDK serializes it for the engine to run.

This page walks you through building a workflow end-to-end. By the end you'll have a runnable Python file that pulls data, transforms it, sends it to an AI agent, branches on the result, and writes outputs.


1. Install

pip install athena-sdk-lite

For local development from this repo:

pip install -e ./athena-sdk-lite

2. The empty workflow

Every workflow file starts the same way:

from athena_sdk_lite import Workflow

with Workflow("my-pipeline", description="What this does, briefly") as wf:
    pass   # nodes go here

if __name__ == "__main__":
    print(wf.visualize())
    print("validation:", wf.validate() or "ok")

with Workflow(...) as wf: sets the active workflow for function-first helpers, which is what every example in this repo uses.

3. Add a source

Sources have no upstream — they emit rows.

from athena_sdk_lite.nodes import postgres

with Workflow("orders-pipeline") as wf:
    orders = postgres(
        "load-orders",
        operation="select",
        query="SELECT id, customer_id, total FROM orders WHERE status='active'",
        connection={
            "host": "db.internal",
            "database": "orders",
            "user": "reader",
            "password": "redacted",
        },
    )

Other source helpers:

  • pubmed(name, query=..., max_results=100) — biomedical literature
  • s3(name, bucket=..., operation="read", file_path=...) — object storage
  • http(name, url=..., method="GET") — generic HTTP

Or use wf.add_node(...) for engine node types not in the starter set (twitter, edgar, snowflake, etc.).

4. Transform the data

transform runs your Python on the row batch. inputs is a dict with the row stream at inputs["data"].

from athena_sdk_lite.nodes import transform

ENRICH_CODE = """
def run(inputs):
    rows = inputs.get("data", []) or []
    out = []
    for row in rows:
        row = dict(row)
        row["high_value"] = row.get("total", 0) > 1000
        out.append(row)
    return {"data": out}
"""

with Workflow("orders-pipeline") as wf:
    orders = postgres(...)
    enriched = transform(
        "enrich",
        inputs=orders,        # wires this node to the upstream
        code=ENRICH_CODE,
        mode="function",      # default; defines run(inputs)
    )

The wire is the inputs=orders kwarg. That single keyword is how every node in the SDK declares its upstream.

5. Call an AI agent

from athena_sdk_lite.nodes import ai_tagging

tagged = ai_tagging(
    "classify",
    inputs=enriched,
    agent_url="https://your-athena-host.example/workspace/WS/agent/APP/AGENT/CHAT",
    response_type="json",
    input_keys={                   # map prompt variables to row columns
        "total": "total",
        "high_value": "high_value",
    },
)

input_keys is how the agent sees fields from your row. Don't hardcode the URL — use variables for that.

6. Branch on the result

from athena_sdk_lite.nodes import branch, output

gate = branch(
    "is-fraud",
    inputs=tagged,
    condition="$input.data.get('fraud_score') > 0.7",
)

output("alerts", inputs=gate.out("true"),  format="json")
output("queue",  inputs=gate.out("false"), format="json")

The condition is not Python — it's an expression string evaluated per row by the engine. $input.data is the current row dict.

The helper is named branch because if is a Python reserved word; the underlying engine node type is "if".

7. Variables and credentials

Never inline credentials in node configs. Use workflow-scoped variables and {{ name }} templating:

import os

with Workflow("orders-pipeline") as wf:
    wf.set_variable("db_password", os.environ["DB_PASSWORD"])
    wf.set_variable("agent_url",   os.environ["AGENT_URL"])

    orders = postgres(
        "load",
        operation="select",
        query="...",
        connection={
            "host": "db.internal",
            "database": "orders",
            "user": "reader",
            "password": "{{ db_password }}",     # ← templated
        },
    )
    tagged = ai_tagging(
        "classify",
        inputs=orders,
        agent_url="{{ agent_url }}",             # ← templated
    )

The engine resolves {{ name }} at run time from the workflow's variables dict.

8. Validate and inspect

Always run these two before you run the workflow:

print(wf.visualize())   # ASCII diagram of the DAG
issues = wf.validate()  # list of structural issues; [] if good
assert issues == [], issues

validate() is pure — no I/O, no side effects. Run it in CI on every workflow file. See tests/test_examples_validate.py for a reference implementation.

9. Run

result = wf.run()      # local, in-process; returns dict[node_name, output]

The engine handles execution order, fan-out, and merge waits. You don't write loops, scheduling, or threading.

Inspect a node's output

result is a plain dict keyed by node name. Pull any node's output by name:

result = wf.run()
print(result["classify"])              # full output dict for the classify node
print(result["classify"]["data"])      # just the rows
print(result["classify"]["data"][0])   # the first row

Most node types (sources, transforms, ai_tagging) emit {"data": [...rows...], "metadata": {...}}. To see every node at once:

for name, output in result.items():
    print(f"\n── {name} ──")
    rows = (output or {}).get("data", []) if isinstance(output, dict) else None
    if isinstance(rows, list):
        for i, row in enumerate(rows[:3]):
            print(f"  [{i}] {row}")
        if len(rows) > 3:
            print(f"  ... +{len(rows) - 3} more")
    else:
        print(f"  {output}")

Every example in examples/ has a commented print(result["<node>"]) recipe at the bottom showing which node to inspect for that workflow.

10. Test

Two patterns. Pick one or both.

Structural test (fast, pure):

def test_orders_pipeline_validates():
    from workflows.orders_pipeline import wf
    assert wf.validate() == []
    assert "load-orders" in wf
    assert "classify" in wf

Subprocess test (catches example/doc drift; see tests/test_examples_validate.py):

import subprocess, sys
def test_workflow_runs_cleanly():
    result = subprocess.run([sys.executable, "workflows/orders_pipeline.py"],
                            capture_output=True, text=True)
    assert result.returncode == 0
    assert "validation: ok" in result.stdout

The full workflow

Putting steps 1–9 together:

import os
from athena_sdk_lite import Workflow
from athena_sdk_lite.nodes import postgres, transform, ai_tagging, branch, output

ENRICH_CODE = """
def run(inputs):
    rows = inputs.get("data", []) or []
    out = []
    for row in rows:
        row = dict(row)
        row["high_value"] = row.get("total", 0) > 1000
        out.append(row)
    return {"data": out}
"""

with Workflow("orders-pipeline",
              description="Triage active orders for fraud review") as wf:

    wf.set_variable("db_password", os.environ.get("DB_PASSWORD", "redacted"))
    wf.set_variable("agent_url", os.environ.get(
        "AGENT_URL",
        "https://your-athena-host.example/workspace/WS/agent/APP/AGENT/CHAT",
    ))

    orders = postgres(
        "load-orders",
        operation="select",
        query="SELECT id, customer_id, total FROM orders WHERE status='active'",
        connection={
            "host": "db.internal", "database": "orders",
            "user": "reader", "password": "{{ db_password }}",
        },
    )

    enriched = transform("enrich", inputs=orders, code=ENRICH_CODE)

    tagged = ai_tagging(
        "classify", inputs=enriched,
        agent_url="{{ agent_url }}",
        response_type="json",
        input_keys={"total": "total", "high_value": "high_value"},
    )

    gate = branch(
        "is-fraud", inputs=tagged,
        condition="$input.data.get('fraud_score') > 0.7",
    )

    output("alerts", inputs=gate.out("true"),  format="json")
    output("queue",  inputs=gate.out("false"), format="json")


if __name__ == "__main__":
    print(wf.visualize())
    print()
    issues = wf.validate()
    print("validation:", "ok" if not issues else issues)

Save as workflows/orders_pipeline.py, run python workflows/orders_pipeline.py, see the DAG, and ship it.

Next