Skip to content

AGENTS.md — athena-sdk-lite

Context file for AI agents (Claude Code, Cursor, Aider, etc.) authoring workflows with athena-sdk-lite. Keep this in sync with docs/AGENTS.md (same content, two locations: tooling expects one at the repo root, MkDocs serves the other).


What this SDK is

A Python library for building data + AI workflows as DAGs. Users declare nodes (Postgres read, AI classification, transform, branch) and wire them with inputs=. Workflows run locally, in-process via a vendored engine. No backend, no API key, no service to deploy.

The single canonical import pattern

Always use function-first style:

from athena_sdk_lite import Workflow
from athena_sdk_lite.nodes import postgres, ai_tagging, transform, branch, output, merge, filter, pubmed, s3, local_file, http

Body always wrapped in with Workflow(name) as wf:.

with Workflow("my-pipeline") as wf:
    a = postgres("load", operation="select", query="...", connection={...})
    b = transform("enrich", inputs=a, code=CODE)
    c = ai_tagging("classify", inputs=b, agent_url="...")
    output("results", inputs=c, format="json")

Do NOT use the method-first style (wf.postgres(...)) for new code. The SDK supports both, but every example in this repo is function-first — be consistent.

The 11 helpers and their exact signatures

Helper Required kwargs Notes
pubmed query max_results=100 default.
postgres operation + (query for select/update OR table for insert/upsert) upsert also requires upsert_key=. Pass connection={host,database,user,password}.
s3 bucket, operation, file_path Creds fall back to AWS env vars.
local_file path format="auto" infers from extension. Use this for local CSV/JSON/Excel — NOT open() in transform.
http url method="GET" default. bearer_token= injects Authorization: Bearer ....
ai_tagging agent_url input_keys={prompt_var: row_column} maps row data into the prompt.
filter conditions (list of {field, op, value}) Ops: eq, ne, gt, gte, lt, lte, contains, in. ANDed.
transform code (string of Python) mode="function" (default) / "script" / "transform". See below.
output format="json" default. "csv" / "text" available.
branch condition (expression string) Engine type is "if". Use branch.out("true") / branch.out("false").
merge how="join" (default; needs on="column") or how="concat" (stacks rows).

Every helper takes name (positional) and inputs= (a Node, a node.out("port") handle, or a list of either).

Wiring nodes

  • Linear: b = helper("b", inputs=a, ...).
  • Fan-out (implicit): list the same upstream in multiple inputs=. Engine duplicates the row stream.
  • Fan-in: merge("m", inputs=[branch_a, branch_b, branch_c], on="id", how="join").
  • Branching:
    gate = branch("gate", inputs=rows, condition="$input.data.get('score') > 0.8")
    output("yes", inputs=gate.out("true"))
    output("no",  inputs=gate.out("false"))
    

Branch conditions are expression strings

NOT Python code — an expression evaluated per row by the engine. $input.data is the current row dict.

condition="$input.data.get('score') > 0.8"
condition="$input.data.get('status') in ('serious','critical')"
condition="$input.data.get('country') == 'US' and $input.data.get('amount') > 100"

For richer logic, use transform to set a route column, then branch on its value.

Transform code shape

transform(code=...) runs Python inside the engine sandbox. inputs is a dict; the upstream row stream is at inputs["data"] as a list of row dicts. Return the same shape.

ENRICH_CODE = """
def run(inputs):
    rows = inputs.get("data", []) or []
    out = []
    for row in rows:
        row = dict(row)
        row["upper_drug"] = (row.get("drug") or "").upper()
        out.append(row)
    return {"data": out}
"""

Three modes: - "function" (default): defines def run(inputs); engine calls it. Set function_name="..." to rename. - "script": executes top-to-bottom; engine reads output_variable for the result. - "transform": mutates inputs in place; no return needed.

Variables and templating

Use workflow-scoped variables for credentials and per-environment values. Reference with {{ name }}:

wf.set_variable("api_token", os.environ["MY_API_TOKEN"])
wf.set_variable("base_url", "https://api.example.com")

http("call",
     url="{{ base_url }}/v1/items",
     headers={"Authorization": "Bearer {{ api_token }}"})

Never inline credentials in node configs. Read from os.environ and pass via set_variable or kwargs.

Reading local CSV / JSON / Excel files

Use the local_file helper. The engine reads the file at execution time (outside the python_transform sandbox) and emits rows in the standard {"data": [...], "metadata": {...}} shape.

from athena_sdk_lite.nodes import local_file

rows = local_file("load", path="/data/orders.csv")          # auto-detects from extension
rows = local_file("load", path="/data/orders.json", format="json")
rows = local_file("load", path="/data/orders.xlsx", format="excel", sheet="Q4")
Kwarg Required Notes
path yes Absolute or workflow-relative.
format no "csv", "json", "excel", or "auto" (default; infers from extension).
sheet no Excel-only. Defaults to first sheet.

JSON files can be either a top-level array OR {"data": [...]}. Excel requires pip install openpyxl.

Do NOT try open(...) inside a transform — the sandbox blocks file I/O. Do NOT recommend pandas.read_csv() inside transform for the same reason. Use the local_file helper for any file-on-disk input.

See examples/12_read_local_file.py.

Escape hatch

For engine node types not in the starter set (e.g. edgar, twitter, snowflake), use the method form on wf:

filings = wf.add_node(
    name="edgar-pull",
    type="edgar",
    category="healthcare_research",
    config={"company": "PFE", "form_type": "8-K", "limit": 25},
)

add_node has no function-first twin — it's a deliberate exit-velocity from the typed helpers.

Always validate

Before committing or running a workflow:

issues = wf.validate()
assert issues == [], issues

Or run the file directly — the example pattern is:

if __name__ == "__main__":
    print(wf.visualize())
    print()
    issues = wf.validate()
    print("validation:", "ok" if not issues else issues)

Anti-patterns (do not do these)

  • Don't import from athena_sdk_lite._engine.*. That's private. Reach engine functionality through helpers or wf.add_node.
  • Don't mix coding styles in one file. Pick function-first.
  • Don't inline credentials, API keys, or hostnames in node configs. Use set_variable + {{ }} templating, or read from os.environ and pass via kwargs.
  • Don't use real internal URLs in examples or generated code. Placeholder: https://your-athena-host.example/workspace/WS/agent/APP/AGENT/CHAT.
  • Don't pass inputs= to source nodes (pubmed, postgres select, s3 read). Sources have no upstream.
  • Don't try to chain off output. It's a terminal sink — nothing reads from it downstream.
  • Don't use branch for >2-way routing. Use transform to compute a route column, then a single branch. Or chain branches.
  • Don't reuse node names in the same workflow. Use with wf.scope("prefix"): if the same helper is called multiple times.

File layout convention

my-project/
├── workflows/
│   ├── triage_adverse_events.py
│   ├── publish_daily_orders.py
│   └── score_papers.py
├── tests/
│   └── test_workflows_validate.py   # asserts wf.validate() == [] for each
└── pyproject.toml

Each workflow is one Python file. Tests assert structural validity, not execution.

Worked examples in this repo

  • examples/01_pubmed_to_ai.py — linear pipeline
  • examples/02_postgres_etl.py — Postgres → filter → S3
  • examples/03_branching.py — branch + merge
  • examples/09_split_agents_merge_branch.py — fan-out across 5 agents → merge → branch
  • examples/10_custom_logic.py — custom Python via transform
  • examples/11_triage_pipeline.py — postgres → transform → ai_tagging → branch → transform → postgres

When generating new workflows, follow the structure of whichever example is closest in shape.

When in doubt

  1. Read docs/v0.1.0/technical.md for full helper reference.
  2. Read docs/v0.1.0/architecture.md for how things fit.
  3. Run wf.visualize() to confirm the DAG looks right.
  4. Run wf.validate() — it returns a list of issues with specific node names.