Skip to content

athena-sdk-lite — Architecture

Version: 0.1.0 Audience: engineers who need to reason about how the parts fit together — before extending, debugging, or deploying.

For the elevator pitch, read overview.md first.


1. Layered view

┌─────────────────────────────────────────────────────────────────────────┐
│  USER LAYER                                                             │
│  ────────────────────────────────────────────────                       │
│  • End-user scripts: `from athena_sdk_lite import Workflow`             │
│  • Wrapper packages (e.g. pharma-workflows) registering helpers / hooks │
└────────────────────────────────┬────────────────────────────────────────┘
┌────────────────────────────────▼────────────────────────────────────────┐
│  PUBLIC API   (this package — src/athena_sdk_lite/)                     │
│  ────────────────────────────────────────────────                       │
│  workflow.py    Workflow, Node, scope, validate, visualize, run,        │
│                 add_node, connect, set_variable, iter_topo              │
│  nodes.py       11 typed helpers (mixin + module-level twins)           │
│  extensions.py  register_helper, lifecycle hooks, include               │
│  _compat.py     Workflow  →  engine workflow_data dict                  │
│  _runner.py     workflow_data  →  vendored WorkflowEngine.run()         │
└────────────────────────────────┬────────────────────────────────────────┘
                                 │  (engine schema is the boundary)
┌────────────────────────────────▼────────────────────────────────────────┐
│  VENDORED ENGINE   (src/athena_sdk_lite/_engine/)                       │
│  ────────────────────────────────────────────────                       │
│  • Copied from athena-sdk; treated as a private dependency.             │
│  • Knows how to execute each node type.                                 │
│  • Resolves templated variables and evaluates branch conditions.        │
└────────────────────────────────┬────────────────────────────────────────┘
              ┌──────────────────┼──────────────────┐
              ▼                  ▼                  ▼
        ┌──────────┐       ┌──────────┐       ┌──────────┐
        │ Postgres │       │   S3     │       │  Athena  │
        └──────────┘       └──────────┘       │  agent   │
                                              └──────────┘

Three things to internalize from this picture:

  1. The public API surface is small. Two files (workflow.py, nodes.py) are enough to understand it; a third (extensions.py) is needed only by wrapper authors.
  2. The boundary with the engine is a single dict shapeworkflow_data. _compat.py is the only file that knows that shape. If the engine schema changes, that's the only file that needs updating.
  3. External services are reached only through nodes. The SDK itself does no I/O. Postgres / S3 / HTTP / Athena calls happen inside the engine when a node runs.

2. Module map

File Role Read it when
workflow.py Workflow and Node dataclasses, edges, scopes, validation, visualization, run. You're learning the SDK or adding core behavior.
nodes.py Typed builders for the 11 starter helpers; method and function forms. You're using or adding a node helper.
extensions.py register_helper, include, lifecycle hooks (before_run, after_run, etc.). You're writing a wrapper package.
_compat.py Workflow → workflow_data dict conversion (the engine's wire format). The engine's schema changed.
_runner.py Bridges workflow_data to the vendored engine's WorkflowEngine.run(). You're debugging execution.
_engine/ Private; curated subset of the full athena-sdk engine. See §2.1 for what's in it. When tracing execution or auditing what runs.

2.1 What's inside _engine/

The engine is not the full athena-sdk runtime — it's a curated subset sized to lite's three things: orchestration, connectors, and in-process logging. Subsystems that only matter in a hosted-execution / nexus-backend deployment (triggers, scheduler, streaming, telemetry, memory profilers, DB-backed workflow loading, feature flags) have been removed. What remains:

_engine/   ~ 1.1 MB, 101 .py files, 24K LoC
├─ applogger/                  ← in-process logging (no DB destinations)
├─ config/                     ← config loader
├─ database/                   ← async DB connection layer (for postgres node)
├─ services/file_storage/      ← S3 service (for s3 node)
├─ utils/                      ← env var substitution
└─ workflow/
   ├─ engine.py, planner.py, model.py    ← orchestration brain
   ├─ dataflow/                ← context, data_manager, storage strategies
   ├─ events/                  ← event bus (powers `on_node_complete` hooks)
   ├─ executors/               ← per-category node executors
   ├─ expression/              ← branch conditions + `{{ }}` resolution
   ├─ workflowhandler/         ← default handler
   ├─ utils/                   ← resource detector
   └─ nodes/
      ├─ actions/      ← 8 connectors: postgres, pubmed, ai_tagging, s3,
      │                  local_file, api, edgar, output
      ├─ controls/     ← if (branch) + merge
      ├─ transforms/   ← filter + python_transform
      └─ placeholders/ ← `{{ name }}` resolution

The mental model in one phrase: engine + connectors + applogger.

This curation means: if you write wf.add_node(type="snowflake", ...), the factory will raise ValueError: Unknown action type because the snowflake module isn't shipped. The full athena-sdk distribution has it; lite intentionally doesn't.

3. Lifecycle of a workflow

   Python script
        │  (1) build
  ┌──────────────┐
  │  Workflow    │   in-memory graph (Nodes + Edges + Variables)
  └──────┬───────┘
         │  (2) validate  →  wf.validate()      [returns list of issues; [] if ok]
         │  (2') visualize →  wf.visualize()    [ascii DAG]
         │  (3) serialize  →  to_workflow_data()  ← _compat.py
  ┌──────────────────────┐
  │  workflow_data dict  │   {id, name, description, variables, nodes, connections}
  └──────────┬───────────┘
             │  (4) run     →  WorkflowEngine.run(workflow_data)  ← _runner.py
  ┌──────────────────────┐
  │  result dict          │   {node_name: output, ...}
  └──────────────────────┘
  • Steps 1, 2, 3 are pure — no I/O. Tests can stop at step 3 and assert on the dict shape (see tests/test_roundtrip.py).
  • Step 4 is the only step that touches external services. CI typically skips it; production code uses it.

validate() is more than a smoke check. It catches the silent-footgun shapes:

  • branch nodes with only one port consumed (rows on the other side would be silently dropped)
  • output nodes with a downstream consumer (output is terminal — chaining is a confused mistake)
  • merge nodes with fewer than 2 inputs (fan-in needs ≥2)
  • Plus the original checks: duplicate names, dangling edges, no terminal node, cycles

Run it before every commit. A pre-commit hook that walks workflows/ and asserts wf.validate() == [] keeps structural mistakes out of main.

4. The Workflow object

class Workflow:
    name: str
    description: str | None
    _nodes:        dict[str, Node]         # name → Node, insertion-ordered
    _connections: list[Connection]         # source/target + named ports
    _variables:   dict[str, Any]           # for {{ name }} templating
    _scope_stack: list[str]                # current scope prefix
    _instance_hooks: dict[str, list]       # per-instance lifecycle hooks

A workflow is a mutable graph builder during construction, then a frozen blueprint when serialized. There is no separate "compile" step — validate() and to_workflow_data() can be called at any point.

Nodes and ports

@dataclass(frozen=True)
class Node:
    name: str           # scoped name, e.g. "step1.search"
    node_type: str      # engine type, e.g. "pubmed", "if", "python_transform"
    node_category: str  # engine category, e.g. "healthcare_research"
    configuration: dict[str, Any]

A node has one default output port ("output"). Branch nodes additionally expose named ports ("true", "false") via node.out("true"). Edges in the underlying engine carry source_output and target_input strings — that's how the engine routes data from a branch's true side to one downstream and false side to another.

Edges

Edges are not declared explicitly. When you pass inputs=upstream_node to any helper, the SDK creates an edge from upstream_node's output port to the new node's "input" port. Listing the same upstream in multiple downstream inputs= is how fan-out is expressed — there is no separate fan-out concept; it's just multiple edges from the same source.

5. Function-first vs method-first

Both styles produce identical workflows. The function-first form uses a ContextVar set by Workflow.__enter__ so module-level helpers find the active workflow:

# method-first
wf = Workflow("x")
a = wf.pubmed("search", query="...")

# function-first  (identical result)
from athena_sdk_lite.nodes import pubmed
with Workflow("x") as wf:
    a = pubmed("search", query="...")

The function-first form raises RuntimeError if called outside a with Workflow(...): block — there is no silent auto-workflow creation. Wrapper authors should still prefer the method form on the workflow object (self.pubmed(...)) since their helpers receive self explicitly.

6. Extension points

The SDK offers exactly three extension surfaces. Wrapper authors should be able to do everything with these; no monkey-patching, no subclassing tricks.

6.1 Workflow.register_helper

Adds a new method to every Workflow instance. The decorated function must take self as its first argument.

@Workflow.register_helper
def pharma_pubmed_etl(self, name, *, drug, indication, model_url, inputs=None):
    papers = self.pubmed(f"{name}-search", inputs=inputs, query=f"{drug} {indication}")
    recent = self.filter(f"{name}-recent", inputs=papers, conditions=[...])
    return self.ai_tagging(f"{name}-classify", inputs=recent, agent_url=model_url)

# now every workflow has it:
wf.pharma_pubmed_etl("nivo", drug="nivolumab", indication="melanoma", model_url="...")

Registration is idempotent for the same callable; re-registering a different function with the same name is rejected to prevent silent shadowing.

6.2 Workflow.include

Deep-copies another Workflow into this one with a name prefix, preserving internal edges. Useful for shipping reusable patterns:

sub = screening_pattern()                                  # returns a Workflow object
wf.include(sub, name_prefix="onco",
           overrides={"search": {"query": "glioblastoma"}})

Source is left untouched; the same template can be included multiple times. overrides lets the caller patch node configs after the copy.

6.3 Lifecycle hooks

Four events, registered at class level (fire for every workflow) or instance level:

Event Signature
before_run (workflow) -> None
after_run (workflow, result) -> None
on_node_complete (workflow, node, output) -> None
on_error (workflow, exc) -> None
@Workflow.before_run
def authenticate(workflow): refresh_token()

wf.add_hook("after_run", lambda w, r: metrics.emit(w.name, len(r)))

A misbehaving hook (raises an exception) is logged and skipped — it does not break the workflow run.

7. Trust boundaries and execution model

  • User code is trusted. Workflows are authored by your engineers; the SDK does not sandbox them. The one exception is transform node code, which runs inside the engine's python_transform sandbox.
  • External services are not trusted. All I/O happens inside engine nodes, with explicit credentials passed through configuration (Postgres connection dict, S3 keys, agent URL).
  • No global state in the SDK. Workflow instances are independent. The exceptions — registered helpers and class-level hooks — live on the Workflow class object and are explicit on import (a wrapper package's __init__.py registers them at import time).
  • No async. Execution is in-process, synchronous, in topological order. The engine internally may parallelize independent branches (e.g. five agents fanning out from one upstream); the user does not see threads.

8. Versioning posture

0.1.0 — pre-1.0. The public API listed in nodes.py's __all__ and the helpers on Workflow are intended to remain stable; the engine wire format (_compat.py's output) is locked against the vendored engine version. Breaking changes go through a CHANGELOG entry and a minor-version bump until 1.0, then strict SemVer.

9. What lives outside this repo

  • Real production execution — the full athena-sdk + nexus-backend ship hosted execution, scheduling, registry, and a UI. This lite SDK is the authoring surface, not the deployment substrate.
  • Domain-specific node types — wrappers like pharma-workflows ship those.
  • The Athena agent itself — reached via agent_url= in ai_tagging; it lives on a separate service.