AGENTS.md — athena-sdk-lite¶
Context file for AI agents (Claude Code, Cursor, Aider, etc.) authoring workflows with athena-sdk-lite. Keep this in sync with docs/AGENTS.md (same content, two locations: tooling expects one at the repo root, MkDocs serves the other).
What this SDK is¶
A Python library for building data + AI workflows as DAGs. Users declare nodes (Postgres read, AI classification, transform, branch) and wire them with inputs=. Workflows run locally, in-process via a vendored engine. No backend, no API key, no service to deploy.
The single canonical import pattern¶
Always use function-first style:
from athena_sdk_lite import Workflow
from athena_sdk_lite.nodes import postgres, ai_tagging, transform, branch, output, merge, filter, pubmed, s3, local_file, http
Body always wrapped in with Workflow(name) as wf:.
with Workflow("my-pipeline") as wf:
a = postgres("load", operation="select", query="...", connection={...})
b = transform("enrich", inputs=a, code=CODE)
c = ai_tagging("classify", inputs=b, agent_url="...")
output("results", inputs=c, format="json")
Do NOT use the method-first style (wf.postgres(...)) for new code. The SDK supports both, but every example in this repo is function-first — be consistent.
The 11 helpers and their exact signatures¶
| Helper | Required kwargs | Notes |
|---|---|---|
pubmed |
query |
max_results=100 default. |
postgres |
operation + (query for select/update OR table for insert/upsert) |
upsert also requires upsert_key=. Pass connection={host,database,user,password}. |
s3 |
bucket, operation, file_path |
Creds fall back to AWS env vars. |
local_file |
path |
format="auto" infers from extension. Use this for local CSV/JSON/Excel — NOT open() in transform. |
http |
url |
method="GET" default. bearer_token= injects Authorization: Bearer .... |
ai_tagging |
agent_url |
input_keys={prompt_var: row_column} maps row data into the prompt. |
filter |
conditions (list of {field, op, value}) |
Ops: eq, ne, gt, gte, lt, lte, contains, in. ANDed. |
transform |
code (string of Python) |
mode="function" (default) / "script" / "transform". See below. |
output |
— | format="json" default. "csv" / "text" available. |
branch |
condition (expression string) |
Engine type is "if". Use branch.out("true") / branch.out("false"). |
merge |
— | how="join" (default; needs on="column") or how="concat" (stacks rows). |
Every helper takes name (positional) and inputs= (a Node, a node.out("port") handle, or a list of either).
Wiring nodes¶
- Linear:
b = helper("b", inputs=a, ...). - Fan-out (implicit): list the same upstream in multiple
inputs=. Engine duplicates the row stream. - Fan-in:
merge("m", inputs=[branch_a, branch_b, branch_c], on="id", how="join"). - Branching:
Branch conditions are expression strings¶
NOT Python code — an expression evaluated per row by the engine. $input.data is the current row dict.
condition="$input.data.get('score') > 0.8"
condition="$input.data.get('status') in ('serious','critical')"
condition="$input.data.get('country') == 'US' and $input.data.get('amount') > 100"
For richer logic, use transform to set a route column, then branch on its value.
Transform code shape¶
transform(code=...) runs Python inside the engine sandbox. inputs is a dict; the upstream row stream is at inputs["data"] as a list of row dicts. Return the same shape.
ENRICH_CODE = """
def run(inputs):
rows = inputs.get("data", []) or []
out = []
for row in rows:
row = dict(row)
row["upper_drug"] = (row.get("drug") or "").upper()
out.append(row)
return {"data": out}
"""
Three modes:
- "function" (default): defines def run(inputs); engine calls it. Set function_name="..." to rename.
- "script": executes top-to-bottom; engine reads output_variable for the result.
- "transform": mutates inputs in place; no return needed.
Variables and templating¶
Use workflow-scoped variables for credentials and per-environment values. Reference with {{ name }}:
wf.set_variable("api_token", os.environ["MY_API_TOKEN"])
wf.set_variable("base_url", "https://api.example.com")
http("call",
url="{{ base_url }}/v1/items",
headers={"Authorization": "Bearer {{ api_token }}"})
Never inline credentials in node configs. Read from os.environ and pass via set_variable or kwargs.
Reading local CSV / JSON / Excel files¶
Use the local_file helper. The engine reads the file at execution time (outside the python_transform sandbox) and emits rows in the standard {"data": [...], "metadata": {...}} shape.
from athena_sdk_lite.nodes import local_file
rows = local_file("load", path="/data/orders.csv") # auto-detects from extension
rows = local_file("load", path="/data/orders.json", format="json")
rows = local_file("load", path="/data/orders.xlsx", format="excel", sheet="Q4")
| Kwarg | Required | Notes |
|---|---|---|
path |
yes | Absolute or workflow-relative. |
format |
no | "csv", "json", "excel", or "auto" (default; infers from extension). |
sheet |
no | Excel-only. Defaults to first sheet. |
JSON files can be either a top-level array OR {"data": [...]}. Excel requires pip install openpyxl.
Do NOT try open(...) inside a transform — the sandbox blocks file I/O. Do NOT recommend pandas.read_csv() inside transform for the same reason. Use the local_file helper for any file-on-disk input.
See examples/12_read_local_file.py.
Escape hatch¶
For engine node types not in the starter set (e.g. edgar, twitter, snowflake), use the method form on wf:
filings = wf.add_node(
name="edgar-pull",
type="edgar",
category="healthcare_research",
config={"company": "PFE", "form_type": "8-K", "limit": 25},
)
add_node has no function-first twin — it's a deliberate exit-velocity from the typed helpers.
Always validate¶
Before committing or running a workflow:
Or run the file directly — the example pattern is:
if __name__ == "__main__":
print(wf.visualize())
print()
issues = wf.validate()
print("validation:", "ok" if not issues else issues)
Anti-patterns (do not do these)¶
- Don't import from
athena_sdk_lite._engine.*. That's private. Reach engine functionality through helpers orwf.add_node. - Don't mix coding styles in one file. Pick function-first.
- Don't inline credentials, API keys, or hostnames in node configs. Use
set_variable+{{ }}templating, or read fromos.environand pass via kwargs. - Don't use real internal URLs in examples or generated code. Placeholder:
https://your-athena-host.example/workspace/WS/agent/APP/AGENT/CHAT. - Don't pass
inputs=to source nodes (pubmed,postgres select,s3 read). Sources have no upstream. - Don't try to chain off
output. It's a terminal sink — nothing reads from it downstream. - Don't use
branchfor >2-way routing. Usetransformto compute aroutecolumn, then a singlebranch. Or chain branches. - Don't reuse node names in the same workflow. Use
with wf.scope("prefix"):if the same helper is called multiple times.
File layout convention¶
my-project/
├── workflows/
│ ├── triage_adverse_events.py
│ ├── publish_daily_orders.py
│ └── score_papers.py
├── tests/
│ └── test_workflows_validate.py # asserts wf.validate() == [] for each
└── pyproject.toml
Each workflow is one Python file. Tests assert structural validity, not execution.
Worked examples in this repo¶
examples/01_pubmed_to_ai.py— linear pipelineexamples/02_postgres_etl.py— Postgres → filter → S3examples/03_branching.py— branch + mergeexamples/09_split_agents_merge_branch.py— fan-out across 5 agents → merge → branchexamples/10_custom_logic.py— custom Python viatransformexamples/11_triage_pipeline.py— postgres → transform → ai_tagging → branch → transform → postgres
When generating new workflows, follow the structure of whichever example is closest in shape.
When in doubt¶
- Read
docs/v0.1.0/technical.mdfor full helper reference. - Read
docs/v0.1.0/architecture.mdfor how things fit. - Run
wf.visualize()to confirm the DAG looks right. - Run
wf.validate()— it returns a list of issues with specific node names.