athena-sdk-lite — Technical Reference¶
Version: 0.1.0
Audience: engineers who are about to write or maintain a workflow with this SDK. Use as a reference, not a tutorial — examples live under examples/.
For the big picture, read overview.md. For internals, architecture.md.
1. Install¶
Heavy node-family dependencies are opt-in extras:
During local dev, an editable install from the repo:
2. The two coding styles¶
Both produce identical workflows. Pick one per file; don't mix.
Method-first¶
from athena_sdk_lite import Workflow
wf = Workflow("name")
a = wf.pubmed("search", query="...")
b = wf.ai_tagging("classify", inputs=a, agent_url="...")
wf.output("results", inputs=b, format="json")
Function-first (used by every example in this repo)¶
from athena_sdk_lite import Workflow
from athena_sdk_lite.nodes import pubmed, ai_tagging, output
with Workflow("name") as wf:
a = pubmed("search", query="...")
b = ai_tagging("classify", inputs=a, agent_url="...")
output("results", inputs=b, format="json")
Function-first helpers raise RuntimeError if called outside a with Workflow(...): block.
3. The 11 starter helpers¶
Every helper takes a name (string) as its first positional argument and inputs= as the upstream connection. Other kwargs are helper-specific.
pubmed — biomedical literature¶
| Kwarg | Default | Notes |
|---|---|---|
query |
— | Required. |
max_results |
100 | Soft cap on returned records. |
inputs |
None | Sources rarely need an upstream. |
postgres — DB read/write¶
# select
postgres("load", operation="select",
query="SELECT id, total FROM orders",
connection={"host": ..., "database": ..., "user": ..., "password": ...})
# insert
postgres("write", inputs=rows, operation="insert", table="results", connection={...})
# upsert
postgres("save", inputs=rows, operation="upsert",
table="results", upsert_key="id", skip_existing=False,
batch_size=200, connection={...})
Validation enforced at build time:
- select / update require query=
- insert / upsert require table=
- upsert requires upsert_key=
s3 — object storage¶
s3("export", inputs=rows, bucket="my-bucket",
operation="write", file_path="orders.csv", file_format="csv",
region="us-east-1", prefix="daily/")
Credentials fall back to standard AWS env vars (AWS_ACCESS_KEY_ID, ...) if access_key_id= / secret_access_key= are not supplied.
local_file — read CSV / JSON / Excel from disk¶
rows = local_file("load", path="/data/orders.csv") # auto-detect
rows = local_file("load", path="/data/orders.json", format="json")
rows = local_file("load", path="/data/q4.xlsx", format="excel", sheet="Q4-2025")
| Kwarg | Default | Notes |
|---|---|---|
path |
required | Absolute or workflow-relative. |
format |
"auto" |
"csv", "json", "excel", or "auto" (infers from .csv / .json / .xlsx / .xls extension). |
sheet |
none | Excel-only. Defaults to first sheet. |
Emits rows in the standard {"data": [...], "metadata": {...}} shape. JSON files may be a top-level array or {"data": [...]}. Excel requires pip install openpyxl.
The engine reads the file at execution time — don't try open() inside a transform; the python_transform sandbox blocks file I/O. local_file is the supported alternative.
http — generic request¶
http("call", url="{{ base_url }}/v1/items",
method="GET",
headers={"Accept": "application/json"},
bearer_token="{{ api_token }}",
timeout=30)
bearer_token= injects Authorization: Bearer <value>. An explicit Authorization in headers= wins. {{ name }} references workflow variables (see §6).
ai_tagging — Athena agent / classification¶
ai_tagging("classify", inputs=rows,
agent_url="https://your-athena-host.example/workspace/WS/agent/APP/AGENT/CHAT",
response_type="json",
input_keys={"title": "title", "abstract": "abstract"},
output_column="score",
rate_limit_rps=2,
max_concurrent_requests=10)
input_keys maps the prompt-template names to columns in the upstream row. output_column is the row column the model's response is written to.
filter — row filter¶
filter("recent", inputs=rows,
conditions=[{"field": "year", "op": "gte", "value": 2021},
{"field": "status", "op": "eq", "value": "active"}])
Conditions are AND-ed. Supported ops: eq, ne, gt, gte, lt, lte, contains, in.
transform — user-supplied Python¶
transform("enrich", inputs=rows,
code=ENRICH_CODE, # string of Python
mode="function", # or "script" / "transform"
function_name="run")
Three execution modes:
| Mode | What the engine does |
|---|---|
"function" |
Defines def <function_name>(inputs): ...; calls it. |
"script" |
Executes top-to-bottom; reads output_variable as result. |
"transform" |
Mutates inputs dict in place. |
inputs is a dict; the row stream from upstream lives at inputs["data"] as a list of dicts. Return the same shape ({"data": [...]} ). See examples/10_custom_logic.py for a worked example.
output — terminal sink¶
Whatever reaches output is what wf.run() returns for this node.
branch — two-way conditional (engine node type: "if")¶
gate = branch("by-score", inputs=rows,
condition="$input.data.get('score') > 0.8")
output("publish", inputs=gate.out("true"))
output("review", inputs=gate.out("false"))
condition= is an expression string evaluated per row by the engine. $input.data is the current row; standard dict access works.
The helper is named branch because if is a Python keyword. The serialized engine type is still "if".
For multi-way routing, set a route column with transform, then branch on its value:
classified = transform("classify-route", inputs=rows, code="""
def run(inputs):
out = []
for r in inputs.get("data", []):
r["route"] = "A" if r["score"] > 0.8 else "B" if r["score"] > 0.4 else "C"
out.append(r)
return {"data": out}
""")
g1 = branch("is-A", inputs=classified, condition="$input.data.get('route') == 'A'")
# chain further branches on g1.out("false") as needed
merge — fan-in¶
combined = merge("combine", inputs=[branch_a, branch_b, branch_c],
on="id", # column to align on (join mode only)
how="join") # or "concat"
how="join"— wait for all upstream branches, then align rows byon=.how="concat"— stack rows from all branches;on=ignored.
4. Topology patterns¶
Linear¶
a = pubmed(...)
b = transform("enrich", inputs=a, ...)
c = ai_tagging("classify", inputs=b, ...)
output("results", inputs=c)
Fan-out (implicit)¶
List the same node in multiple inputs=:
src = postgres("read", ...)
score_a = ai_tagging("score_a", inputs=src, ...)
score_b = ai_tagging("score_b", inputs=src, ...)
score_c = ai_tagging("score_c", inputs=src, ...)
The engine duplicates src's row stream to each downstream consumer.
Fan-in¶
Branch + merge¶
gate = branch("gate", inputs=rows, condition="...")
yes_path = transform("for_yes", inputs=gate.out("true"), code=...)
no_path = transform("for_no", inputs=gate.out("false"), code=...)
combined = merge("union", inputs=[yes_path, no_path], how="concat")
5. Inspection¶
print(wf.visualize()) # ascii DAG
issues = wf.validate() # [] if good; otherwise list of issue strings
result = wf.run() # local execution; returns dict[node_name, output]
print(repr(wf)) # Workflow('name', N nodes, M edges)
print(len(wf), list(n.name for n in wf)) # node count + iteration in topo order
node = wf["search"] # KeyError with typo hint if missing
wf.validate() is pure (no I/O). Always call it before run() and in CI.
5.1 Inspecting a specific node's output¶
wf.run() returns a plain dict[node_name, output]. To see what any node emitted:
result = wf.run()
print(result["classify"]) # full output dict for the `classify` node
print(result["classify"]["data"]) # just the rows the node emitted
print(result["classify"]["data"][0]) # the first row
The output shape is whatever the node type produces — most sources / transforms / ai_tagging nodes emit {"data": [...rows...], "metadata": {...}}. output nodes return whatever reached them.
To dump every node's output in one pass:
import json
for name, output in result.items():
print(f"\n── {name} ──")
print(json.dumps(output, indent=2, default=str)) # default=str handles datetime / Decimal / UUID
To see only the first few rows per node (sane when batches are big):
for name, output in result.items():
rows = (output or {}).get("data", []) if isinstance(output, dict) else None
if isinstance(rows, list):
print(f"\n── {name} ({len(rows)} rows) ──")
for i, row in enumerate(rows[:3]):
print(f" [{i}] {row}")
if len(rows) > 3:
print(f" ... +{len(rows) - 3} more")
For mid-run streaming (printing as data flows rather than after the fact), use a pass-through transform as a "tap" node, or register an on_node_complete lifecycle hook — see §9 below.
6. Workflow-scoped variables¶
wf.set_variable("api_token", os.environ["MY_API_TOKEN"])
wf.set_variable("base_url", "https://api.example.com")
http("call",
url="{{ base_url }}/v1/items",
headers={"Authorization": "Bearer {{ api_token }}"})
{{ name }} references are resolved by the engine at run time. Use this for credentials and per-environment values; do not hardcode them in node configs.
wf.variables is a read-only copy:
7. Scopes — name-prefix isolation¶
When a helper (yours or someone else's) is called multiple times, scopes keep node names unique without renaming everything by hand:
with wf.scope("onco"):
fetch_evidence(topic="glioblastoma CAR-T") # → onco.search, onco.recent, ...
with wf.scope("immuno"):
fetch_evidence(topic="checkpoint inhibitors") # → immuno.search, immuno.recent, ...
Nested scopes join with . — scope("a").scope("b") produces a.b.<name>.
8. Escape hatch — reach any engine node type¶
The 11 helpers cover the common case. For anything else (edgar, twitter, snowflake, ...), use add_node:
filings = wf.add_node(
name="edgar-pull",
type="edgar", # any engine node type
category="healthcare_research", # its category
config={"company": "PFE", "form_type": "8-K", "limit": 25},
)
add_node is a Workflow method (no function-first twin); the whole point is to reach beyond the typed helpers.
9. Wrapper-authoring (extensions.py)¶
If you're writing a domain package (pharma-workflows, marketing-pipelines, ...) on top of lite, you have three tools.
9.1 Register a helper¶
@Workflow.register_helper
def pharma_pubmed_etl(self, name, *, drug, indication, model_url, inputs=None):
papers = self.pubmed(f"{name}-search", inputs=inputs,
query=f"{drug} {indication}", max_results=200)
recent = self.filter(f"{name}-recent", inputs=papers,
conditions=[{"field": "year", "op": "gte", "value": 2021}])
return self.ai_tagging(f"{name}-classify", inputs=recent, agent_url=model_url)
After registration, every Workflow instance exposes wf.pharma_pubmed_etl(...). The body uses self.<helper> because self is the workflow. End users call it as a one-liner; they don't see the three nodes inside.
Trigger registration on package import:
# pharma_workflows/__init__.py
from .helpers import pharma_pubmed_etl # decorator runs at import time
9.2 Sub-workflow composition with include¶
def screening_pattern():
with Workflow("screening-template") as sub:
papers = pubmed("search", query="<placeholder>", max_results=200)
ai_tagging("classify", inputs=papers,
agent_url="https://placeholder.example/workspace/_/agent/_/_/_")
return sub
wf.include(screening_pattern(), name_prefix="onco",
overrides={"search": {"query": "glioblastoma CAR-T"}})
include deep-copies the source workflow's nodes and internal edges into wf under the prefix. overrides= patches node config after copy. The source workflow is untouched and can be included multiple times.
9.3 Lifecycle hooks¶
@Workflow.before_run
def authenticate(workflow):
refresh_token()
@Workflow.after_run
def log_completion(workflow, result):
log.info("workflow %s produced %d outputs", workflow.name, len(result))
@Workflow.on_node_complete
def emit_metric(workflow, node, output):
metrics.incr(f"node.{node.node_type}.completed")
@Workflow.on_error
def page_oncall(workflow, exc):
pager.notify(workflow.name, exc)
Class-level hooks fire for every workflow run. For per-instance hooks:
A misbehaving hook (raises) is logged and skipped — it does not break the run.
10. Testing your workflows¶
Two test patterns to add to your suite (the SDK itself uses both):
Structural (pure, fast):
def test_pipeline_validates():
wf = build_my_pipeline()
assert wf.validate() == []
data = wf.to_workflow_data()
assert {n["node_name"] for n in data["nodes"]} == {"load", "enrich", "save"}
Examples regression gate:
If you ship example scripts, run them in CI to catch breakage. See tests/test_examples_validate.py for a reference implementation (parametrized subprocess runner that asserts each example exits cleanly and validates).
11. Things to know about errors¶
Workflow.add_noderaisesValueErrorif the resolved (scoped) node name is already in use.- Required-arg checks happen at build time, not run time.
postgres(operation="select")withoutquery=raises immediately. wf["typo"]raisesKeyErrorwith a "Did you mean ...?" hint based on Levenshtein distance.wf.iter_topo()raisesRuntimeErrorif the graph has a cycle (created viawf.connect(a, b)after both already exist withb → asomewhere).- Function-first helpers raise
RuntimeError("No current workflow...")if called outside awith Workflow(...):block.
12. Conventions for shipping to clients¶
If you're handing this SDK to a downstream team:
- Pin the version in their
pyproject.toml/requirements.txt. Pre-1.0, a minor bump may break. - Don't ship examples inside the package. Keep them in a separate docs repo or this
examples/dir; importingathena_sdk_lite.examplesis not supported. - Workflow files live in their repo, organized however they prefer. A common convention:
workflows/<team>/<name>.py. - CI gate: run
wf.validate()on every workflow file at commit time — keeps broken DAGs out ofmain. - Credentials come from
os.environ, never inlined. The SDK does not autoload.env; users addpython-dotenvthemselves if needed.