athena-sdk-lite — Architecture¶
Version: 0.1.0 Audience: engineers who need to reason about how the parts fit together — before extending, debugging, or deploying.
For the elevator pitch, read overview.md first.
1. Layered view¶
┌─────────────────────────────────────────────────────────────────────────┐
│ USER LAYER │
│ ──────────────────────────────────────────────── │
│ • End-user scripts: `from athena_sdk_lite import Workflow` │
│ • Wrapper packages (e.g. pharma-workflows) registering helpers / hooks │
└────────────────────────────────┬────────────────────────────────────────┘
│
┌────────────────────────────────▼────────────────────────────────────────┐
│ PUBLIC API (this package — src/athena_sdk_lite/) │
│ ──────────────────────────────────────────────── │
│ workflow.py Workflow, Node, scope, validate, visualize, run, │
│ add_node, connect, set_variable, iter_topo │
│ nodes.py 11 typed helpers (mixin + module-level twins) │
│ extensions.py register_helper, lifecycle hooks, include │
│ _compat.py Workflow → engine workflow_data dict │
│ _runner.py workflow_data → vendored WorkflowEngine.run() │
└────────────────────────────────┬────────────────────────────────────────┘
│ (engine schema is the boundary)
┌────────────────────────────────▼────────────────────────────────────────┐
│ VENDORED ENGINE (src/athena_sdk_lite/_engine/) │
│ ──────────────────────────────────────────────── │
│ • Copied from athena-sdk; treated as a private dependency. │
│ • Knows how to execute each node type. │
│ • Resolves templated variables and evaluates branch conditions. │
└────────────────────────────────┬────────────────────────────────────────┘
│
┌──────────────────┼──────────────────┐
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Postgres │ │ S3 │ │ Athena │
└──────────┘ └──────────┘ │ agent │
└──────────┘
Three things to internalize from this picture:
- The public API surface is small. Two files (
workflow.py,nodes.py) are enough to understand it; a third (extensions.py) is needed only by wrapper authors. - The boundary with the engine is a single dict shape —
workflow_data._compat.pyis the only file that knows that shape. If the engine schema changes, that's the only file that needs updating. - External services are reached only through nodes. The SDK itself does no I/O. Postgres / S3 / HTTP / Athena calls happen inside the engine when a node runs.
2. Module map¶
| File | Role | Read it when |
|---|---|---|
workflow.py |
Workflow and Node dataclasses, edges, scopes, validation, visualization, run. |
You're learning the SDK or adding core behavior. |
nodes.py |
Typed builders for the 11 starter helpers; method and function forms. | You're using or adding a node helper. |
extensions.py |
register_helper, include, lifecycle hooks (before_run, after_run, etc.). |
You're writing a wrapper package. |
_compat.py |
Workflow → workflow_data dict conversion (the engine's wire format). |
The engine's schema changed. |
_runner.py |
Bridges workflow_data to the vendored engine's WorkflowEngine.run(). |
You're debugging execution. |
_engine/ |
Private; curated subset of the full athena-sdk engine. See §2.1 for what's in it. | When tracing execution or auditing what runs. |
2.1 What's inside _engine/¶
The engine is not the full athena-sdk runtime — it's a curated subset sized to lite's three things: orchestration, connectors, and in-process logging. Subsystems that only matter in a hosted-execution / nexus-backend deployment (triggers, scheduler, streaming, telemetry, memory profilers, DB-backed workflow loading, feature flags) have been removed. What remains:
_engine/ ~ 1.1 MB, 101 .py files, 24K LoC
├─ applogger/ ← in-process logging (no DB destinations)
├─ config/ ← config loader
├─ database/ ← async DB connection layer (for postgres node)
├─ services/file_storage/ ← S3 service (for s3 node)
├─ utils/ ← env var substitution
└─ workflow/
├─ engine.py, planner.py, model.py ← orchestration brain
├─ dataflow/ ← context, data_manager, storage strategies
├─ events/ ← event bus (powers `on_node_complete` hooks)
├─ executors/ ← per-category node executors
├─ expression/ ← branch conditions + `{{ }}` resolution
├─ workflowhandler/ ← default handler
├─ utils/ ← resource detector
└─ nodes/
├─ actions/ ← 8 connectors: postgres, pubmed, ai_tagging, s3,
│ local_file, api, edgar, output
├─ controls/ ← if (branch) + merge
├─ transforms/ ← filter + python_transform
└─ placeholders/ ← `{{ name }}` resolution
The mental model in one phrase: engine + connectors + applogger.
This curation means: if you write wf.add_node(type="snowflake", ...), the factory will raise ValueError: Unknown action type because the snowflake module isn't shipped. The full athena-sdk distribution has it; lite intentionally doesn't.
3. Lifecycle of a workflow¶
Python script
│
│ (1) build
▼
┌──────────────┐
│ Workflow │ in-memory graph (Nodes + Edges + Variables)
└──────┬───────┘
│
│ (2) validate → wf.validate() [returns list of issues; [] if ok]
│ (2') visualize → wf.visualize() [ascii DAG]
│
│ (3) serialize → to_workflow_data() ← _compat.py
▼
┌──────────────────────┐
│ workflow_data dict │ {id, name, description, variables, nodes, connections}
└──────────┬───────────┘
│
│ (4) run → WorkflowEngine.run(workflow_data) ← _runner.py
▼
┌──────────────────────┐
│ result dict │ {node_name: output, ...}
└──────────────────────┘
- Steps 1, 2, 3 are pure — no I/O. Tests can stop at step 3 and assert on the dict shape (see
tests/test_roundtrip.py). - Step 4 is the only step that touches external services. CI typically skips it; production code uses it.
validate() is more than a smoke check. It catches the silent-footgun shapes:
branchnodes with only one port consumed (rows on the other side would be silently dropped)outputnodes with a downstream consumer (output is terminal — chaining is a confused mistake)mergenodes with fewer than 2 inputs (fan-in needs ≥2)- Plus the original checks: duplicate names, dangling edges, no terminal node, cycles
Run it before every commit. A pre-commit hook that walks workflows/ and asserts wf.validate() == [] keeps structural mistakes out of main.
4. The Workflow object¶
class Workflow:
name: str
description: str | None
_nodes: dict[str, Node] # name → Node, insertion-ordered
_connections: list[Connection] # source/target + named ports
_variables: dict[str, Any] # for {{ name }} templating
_scope_stack: list[str] # current scope prefix
_instance_hooks: dict[str, list] # per-instance lifecycle hooks
A workflow is a mutable graph builder during construction, then a frozen blueprint when serialized. There is no separate "compile" step — validate() and to_workflow_data() can be called at any point.
Nodes and ports¶
@dataclass(frozen=True)
class Node:
name: str # scoped name, e.g. "step1.search"
node_type: str # engine type, e.g. "pubmed", "if", "python_transform"
node_category: str # engine category, e.g. "healthcare_research"
configuration: dict[str, Any]
A node has one default output port ("output"). Branch nodes additionally expose named ports ("true", "false") via node.out("true"). Edges in the underlying engine carry source_output and target_input strings — that's how the engine routes data from a branch's true side to one downstream and false side to another.
Edges¶
Edges are not declared explicitly. When you pass inputs=upstream_node to any helper, the SDK creates an edge from upstream_node's output port to the new node's "input" port. Listing the same upstream in multiple downstream inputs= is how fan-out is expressed — there is no separate fan-out concept; it's just multiple edges from the same source.
5. Function-first vs method-first¶
Both styles produce identical workflows. The function-first form uses a ContextVar set by Workflow.__enter__ so module-level helpers find the active workflow:
# method-first
wf = Workflow("x")
a = wf.pubmed("search", query="...")
# function-first (identical result)
from athena_sdk_lite.nodes import pubmed
with Workflow("x") as wf:
a = pubmed("search", query="...")
The function-first form raises RuntimeError if called outside a with Workflow(...): block — there is no silent auto-workflow creation. Wrapper authors should still prefer the method form on the workflow object (self.pubmed(...)) since their helpers receive self explicitly.
6. Extension points¶
The SDK offers exactly three extension surfaces. Wrapper authors should be able to do everything with these; no monkey-patching, no subclassing tricks.
6.1 Workflow.register_helper¶
Adds a new method to every Workflow instance. The decorated function must take self as its first argument.
@Workflow.register_helper
def pharma_pubmed_etl(self, name, *, drug, indication, model_url, inputs=None):
papers = self.pubmed(f"{name}-search", inputs=inputs, query=f"{drug} {indication}")
recent = self.filter(f"{name}-recent", inputs=papers, conditions=[...])
return self.ai_tagging(f"{name}-classify", inputs=recent, agent_url=model_url)
# now every workflow has it:
wf.pharma_pubmed_etl("nivo", drug="nivolumab", indication="melanoma", model_url="...")
Registration is idempotent for the same callable; re-registering a different function with the same name is rejected to prevent silent shadowing.
6.2 Workflow.include¶
Deep-copies another Workflow into this one with a name prefix, preserving internal edges. Useful for shipping reusable patterns:
sub = screening_pattern() # returns a Workflow object
wf.include(sub, name_prefix="onco",
overrides={"search": {"query": "glioblastoma"}})
Source is left untouched; the same template can be included multiple times. overrides lets the caller patch node configs after the copy.
6.3 Lifecycle hooks¶
Four events, registered at class level (fire for every workflow) or instance level:
| Event | Signature |
|---|---|
before_run |
(workflow) -> None |
after_run |
(workflow, result) -> None |
on_node_complete |
(workflow, node, output) -> None |
on_error |
(workflow, exc) -> None |
@Workflow.before_run
def authenticate(workflow): refresh_token()
wf.add_hook("after_run", lambda w, r: metrics.emit(w.name, len(r)))
A misbehaving hook (raises an exception) is logged and skipped — it does not break the workflow run.
7. Trust boundaries and execution model¶
- User code is trusted. Workflows are authored by your engineers; the SDK does not sandbox them. The one exception is
transformnode code, which runs inside the engine'spython_transformsandbox. - External services are not trusted. All I/O happens inside engine nodes, with explicit credentials passed through configuration (Postgres connection dict, S3 keys, agent URL).
- No global state in the SDK. Workflow instances are independent. The exceptions — registered helpers and class-level hooks — live on the
Workflowclass object and are explicit on import (a wrapper package's__init__.pyregisters them at import time). - No async. Execution is in-process, synchronous, in topological order. The engine internally may parallelize independent branches (e.g. five agents fanning out from one upstream); the user does not see threads.
8. Versioning posture¶
0.1.0 — pre-1.0. The public API listed in nodes.py's __all__ and the helpers on Workflow are intended to remain stable; the engine wire format (_compat.py's output) is locked against the vendored engine version. Breaking changes go through a CHANGELOG entry and a minor-version bump until 1.0, then strict SemVer.
9. What lives outside this repo¶
- Real production execution — the full
athena-sdk+nexus-backendship hosted execution, scheduling, registry, and a UI. This lite SDK is the authoring surface, not the deployment substrate. - Domain-specific node types — wrappers like
pharma-workflowsship those. - The Athena agent itself — reached via
agent_url=inai_tagging; it lives on a separate service.