Skip to content

athena-sdk-lite — Technical Reference

Version: 0.1.0 Audience: engineers who are about to write or maintain a workflow with this SDK. Use as a reference, not a tutorial — examples live under examples/.

For the big picture, read overview.md. For internals, architecture.md.


1. Install

pip install athena-sdk-lite

Heavy node-family dependencies are opt-in extras:

pip install 'athena-sdk-lite[nodes-google,nodes-snowflake]'

During local dev, an editable install from the repo:

pip install -e ./athena-sdk-lite

2. The two coding styles

Both produce identical workflows. Pick one per file; don't mix.

Method-first

from athena_sdk_lite import Workflow

wf = Workflow("name")
a = wf.pubmed("search", query="...")
b = wf.ai_tagging("classify", inputs=a, agent_url="...")
wf.output("results", inputs=b, format="json")

Function-first (used by every example in this repo)

from athena_sdk_lite import Workflow
from athena_sdk_lite.nodes import pubmed, ai_tagging, output

with Workflow("name") as wf:
    a = pubmed("search", query="...")
    b = ai_tagging("classify", inputs=a, agent_url="...")
    output("results", inputs=b, format="json")

Function-first helpers raise RuntimeError if called outside a with Workflow(...): block.

3. The 11 starter helpers

Every helper takes a name (string) as its first positional argument and inputs= as the upstream connection. Other kwargs are helper-specific.

pubmed — biomedical literature

pubmed("search", query="glioblastoma CAR-T", max_results=100)
Kwarg Default Notes
query Required.
max_results 100 Soft cap on returned records.
inputs None Sources rarely need an upstream.

postgres — DB read/write

# select
postgres("load", operation="select",
         query="SELECT id, total FROM orders",
         connection={"host": ..., "database": ..., "user": ..., "password": ...})

# insert
postgres("write", inputs=rows, operation="insert", table="results", connection={...})

# upsert
postgres("save", inputs=rows, operation="upsert",
         table="results", upsert_key="id", skip_existing=False,
         batch_size=200, connection={...})

Validation enforced at build time: - select / update require query= - insert / upsert require table= - upsert requires upsert_key=

s3 — object storage

s3("export", inputs=rows, bucket="my-bucket",
   operation="write", file_path="orders.csv", file_format="csv",
   region="us-east-1", prefix="daily/")

Credentials fall back to standard AWS env vars (AWS_ACCESS_KEY_ID, ...) if access_key_id= / secret_access_key= are not supplied.

local_file — read CSV / JSON / Excel from disk

rows = local_file("load", path="/data/orders.csv")          # auto-detect
rows = local_file("load", path="/data/orders.json", format="json")
rows = local_file("load", path="/data/q4.xlsx", format="excel", sheet="Q4-2025")
Kwarg Default Notes
path required Absolute or workflow-relative.
format "auto" "csv", "json", "excel", or "auto" (infers from .csv / .json / .xlsx / .xls extension).
sheet none Excel-only. Defaults to first sheet.

Emits rows in the standard {"data": [...], "metadata": {...}} shape. JSON files may be a top-level array or {"data": [...]}. Excel requires pip install openpyxl.

The engine reads the file at execution time — don't try open() inside a transform; the python_transform sandbox blocks file I/O. local_file is the supported alternative.

http — generic request

http("call", url="{{ base_url }}/v1/items",
     method="GET",
     headers={"Accept": "application/json"},
     bearer_token="{{ api_token }}",
     timeout=30)

bearer_token= injects Authorization: Bearer <value>. An explicit Authorization in headers= wins. {{ name }} references workflow variables (see §6).

ai_tagging — Athena agent / classification

ai_tagging("classify", inputs=rows,
           agent_url="https://your-athena-host.example/workspace/WS/agent/APP/AGENT/CHAT",
           response_type="json",
           input_keys={"title": "title", "abstract": "abstract"},
           output_column="score",
           rate_limit_rps=2,
           max_concurrent_requests=10)

input_keys maps the prompt-template names to columns in the upstream row. output_column is the row column the model's response is written to.

filter — row filter

filter("recent", inputs=rows,
       conditions=[{"field": "year", "op": "gte", "value": 2021},
                   {"field": "status", "op": "eq", "value": "active"}])

Conditions are AND-ed. Supported ops: eq, ne, gt, gte, lt, lte, contains, in.

transform — user-supplied Python

transform("enrich", inputs=rows,
          code=ENRICH_CODE,    # string of Python
          mode="function",     # or "script" / "transform"
          function_name="run")

Three execution modes:

Mode What the engine does
"function" Defines def <function_name>(inputs): ...; calls it.
"script" Executes top-to-bottom; reads output_variable as result.
"transform" Mutates inputs dict in place.

inputs is a dict; the row stream from upstream lives at inputs["data"] as a list of dicts. Return the same shape ({"data": [...]} ). See examples/10_custom_logic.py for a worked example.

output — terminal sink

output("results", inputs=rows, format="json")   # or "csv", "text"

Whatever reaches output is what wf.run() returns for this node.

branch — two-way conditional (engine node type: "if")

gate = branch("by-score", inputs=rows,
              condition="$input.data.get('score') > 0.8")
output("publish", inputs=gate.out("true"))
output("review",  inputs=gate.out("false"))

condition= is an expression string evaluated per row by the engine. $input.data is the current row; standard dict access works.

The helper is named branch because if is a Python keyword. The serialized engine type is still "if".

For multi-way routing, set a route column with transform, then branch on its value:

classified = transform("classify-route", inputs=rows, code="""
def run(inputs):
    out = []
    for r in inputs.get("data", []):
        r["route"] = "A" if r["score"] > 0.8 else "B" if r["score"] > 0.4 else "C"
        out.append(r)
    return {"data": out}
""")
g1 = branch("is-A", inputs=classified, condition="$input.data.get('route') == 'A'")
# chain further branches on g1.out("false") as needed

merge — fan-in

combined = merge("combine", inputs=[branch_a, branch_b, branch_c],
                 on="id",        # column to align on (join mode only)
                 how="join")     # or "concat"
  • how="join" — wait for all upstream branches, then align rows by on=.
  • how="concat" — stack rows from all branches; on= ignored.

4. Topology patterns

Linear

a = pubmed(...)
b = transform("enrich", inputs=a, ...)
c = ai_tagging("classify", inputs=b, ...)
output("results", inputs=c)

Fan-out (implicit)

List the same node in multiple inputs=:

src = postgres("read", ...)
score_a = ai_tagging("score_a", inputs=src, ...)
score_b = ai_tagging("score_b", inputs=src, ...)
score_c = ai_tagging("score_c", inputs=src, ...)

The engine duplicates src's row stream to each downstream consumer.

Fan-in

combined = merge("combine", inputs=[score_a, score_b, score_c], on="id", how="join")

Branch + merge

gate = branch("gate", inputs=rows, condition="...")
yes_path = transform("for_yes", inputs=gate.out("true"), code=...)
no_path  = transform("for_no",  inputs=gate.out("false"), code=...)
combined = merge("union", inputs=[yes_path, no_path], how="concat")

5. Inspection

print(wf.visualize())             # ascii DAG
issues = wf.validate()            # [] if good; otherwise list of issue strings
result = wf.run()                 # local execution; returns dict[node_name, output]
print(repr(wf))                   # Workflow('name', N nodes, M edges)
print(len(wf), list(n.name for n in wf))   # node count + iteration in topo order
node = wf["search"]               # KeyError with typo hint if missing

wf.validate() is pure (no I/O). Always call it before run() and in CI.

5.1 Inspecting a specific node's output

wf.run() returns a plain dict[node_name, output]. To see what any node emitted:

result = wf.run()
print(result["classify"])              # full output dict for the `classify` node
print(result["classify"]["data"])      # just the rows the node emitted
print(result["classify"]["data"][0])   # the first row

The output shape is whatever the node type produces — most sources / transforms / ai_tagging nodes emit {"data": [...rows...], "metadata": {...}}. output nodes return whatever reached them.

To dump every node's output in one pass:

import json
for name, output in result.items():
    print(f"\n── {name} ──")
    print(json.dumps(output, indent=2, default=str))   # default=str handles datetime / Decimal / UUID

To see only the first few rows per node (sane when batches are big):

for name, output in result.items():
    rows = (output or {}).get("data", []) if isinstance(output, dict) else None
    if isinstance(rows, list):
        print(f"\n── {name} ({len(rows)} rows) ──")
        for i, row in enumerate(rows[:3]):
            print(f"  [{i}] {row}")
        if len(rows) > 3:
            print(f"  ... +{len(rows) - 3} more")

For mid-run streaming (printing as data flows rather than after the fact), use a pass-through transform as a "tap" node, or register an on_node_complete lifecycle hook — see §9 below.

6. Workflow-scoped variables

wf.set_variable("api_token", os.environ["MY_API_TOKEN"])
wf.set_variable("base_url", "https://api.example.com")

http("call",
     url="{{ base_url }}/v1/items",
     headers={"Authorization": "Bearer {{ api_token }}"})

{{ name }} references are resolved by the engine at run time. Use this for credentials and per-environment values; do not hardcode them in node configs.

wf.variables is a read-only copy:

snapshot = wf.variables   # dict
snapshot["x"] = 1         # does NOT affect the workflow

7. Scopes — name-prefix isolation

When a helper (yours or someone else's) is called multiple times, scopes keep node names unique without renaming everything by hand:

with wf.scope("onco"):
    fetch_evidence(topic="glioblastoma CAR-T")   # → onco.search, onco.recent, ...

with wf.scope("immuno"):
    fetch_evidence(topic="checkpoint inhibitors") # → immuno.search, immuno.recent, ...

Nested scopes join with .scope("a").scope("b") produces a.b.<name>.

8. Escape hatch — reach any engine node type

The 11 helpers cover the common case. For anything else (edgar, twitter, snowflake, ...), use add_node:

filings = wf.add_node(
    name="edgar-pull",
    type="edgar",                       # any engine node type
    category="healthcare_research",     # its category
    config={"company": "PFE", "form_type": "8-K", "limit": 25},
)

add_node is a Workflow method (no function-first twin); the whole point is to reach beyond the typed helpers.

9. Wrapper-authoring (extensions.py)

If you're writing a domain package (pharma-workflows, marketing-pipelines, ...) on top of lite, you have three tools.

9.1 Register a helper

@Workflow.register_helper
def pharma_pubmed_etl(self, name, *, drug, indication, model_url, inputs=None):
    papers = self.pubmed(f"{name}-search", inputs=inputs,
                         query=f"{drug} {indication}", max_results=200)
    recent = self.filter(f"{name}-recent", inputs=papers,
                         conditions=[{"field": "year", "op": "gte", "value": 2021}])
    return self.ai_tagging(f"{name}-classify", inputs=recent, agent_url=model_url)

After registration, every Workflow instance exposes wf.pharma_pubmed_etl(...). The body uses self.<helper> because self is the workflow. End users call it as a one-liner; they don't see the three nodes inside.

Trigger registration on package import:

# pharma_workflows/__init__.py
from .helpers import pharma_pubmed_etl   # decorator runs at import time

9.2 Sub-workflow composition with include

def screening_pattern():
    with Workflow("screening-template") as sub:
        papers = pubmed("search", query="<placeholder>", max_results=200)
        ai_tagging("classify", inputs=papers,
                   agent_url="https://placeholder.example/workspace/_/agent/_/_/_")
    return sub

wf.include(screening_pattern(), name_prefix="onco",
           overrides={"search": {"query": "glioblastoma CAR-T"}})

include deep-copies the source workflow's nodes and internal edges into wf under the prefix. overrides= patches node config after copy. The source workflow is untouched and can be included multiple times.

9.3 Lifecycle hooks

@Workflow.before_run
def authenticate(workflow):
    refresh_token()

@Workflow.after_run
def log_completion(workflow, result):
    log.info("workflow %s produced %d outputs", workflow.name, len(result))

@Workflow.on_node_complete
def emit_metric(workflow, node, output):
    metrics.incr(f"node.{node.node_type}.completed")

@Workflow.on_error
def page_oncall(workflow, exc):
    pager.notify(workflow.name, exc)

Class-level hooks fire for every workflow run. For per-instance hooks:

wf.add_hook("before_run", lambda w: ...)

A misbehaving hook (raises) is logged and skipped — it does not break the run.

10. Testing your workflows

Two test patterns to add to your suite (the SDK itself uses both):

Structural (pure, fast):

def test_pipeline_validates():
    wf = build_my_pipeline()
    assert wf.validate() == []
    data = wf.to_workflow_data()
    assert {n["node_name"] for n in data["nodes"]} == {"load", "enrich", "save"}

Examples regression gate:

If you ship example scripts, run them in CI to catch breakage. See tests/test_examples_validate.py for a reference implementation (parametrized subprocess runner that asserts each example exits cleanly and validates).

11. Things to know about errors

  • Workflow.add_node raises ValueError if the resolved (scoped) node name is already in use.
  • Required-arg checks happen at build time, not run time. postgres(operation="select") without query= raises immediately.
  • wf["typo"] raises KeyError with a "Did you mean ...?" hint based on Levenshtein distance.
  • wf.iter_topo() raises RuntimeError if the graph has a cycle (created via wf.connect(a, b) after both already exist with b → a somewhere).
  • Function-first helpers raise RuntimeError("No current workflow...") if called outside a with Workflow(...): block.

12. Conventions for shipping to clients

If you're handing this SDK to a downstream team:

  • Pin the version in their pyproject.toml / requirements.txt. Pre-1.0, a minor bump may break.
  • Don't ship examples inside the package. Keep them in a separate docs repo or this examples/ dir; importing athena_sdk_lite.examples is not supported.
  • Workflow files live in their repo, organized however they prefer. A common convention: workflows/<team>/<name>.py.
  • CI gate: run wf.validate() on every workflow file at commit time — keeps broken DAGs out of main.
  • Credentials come from os.environ, never inlined. The SDK does not autoload .env; users add python-dotenv themselves if needed.