Agentic Spark — declarative, self-healing Apache Spark blueprints.
Project description
Aqueduct
Self-healing Spark pipelines. Declarative. Observable. Autonomous.
Wake up to a pending patch — not a wall of Spark errors.
Aqueduct is a control plane for Apache Spark. Define pipelines as YAML Blueprints; Aqueduct compiles, executes, and observes them on Spark — and when they break, an LLM agent autonomously diagnoses and patches them. Pluggable stores (DuckDB / Postgres), watermark-based incremental processing, zero-cost observability, Airflow integration. No DAG code, no on-call digging through logs.
How It Works
- You write a Blueprint (YAML)
- Aqueduct compiles it into a Manifest
- Executor runs it on Spark
- Surveyor observes everything
- On failure → LLM Agent creates a structured
PatchSpec - Patch goes through guardrails → sandbox → explain gates → applied (or staged for review)
Core Concepts
| Concept | Purpose |
|---|---|
| Blueprint | Your pipeline definition |
| Channel | Transformations (SQL or native ops) |
| Spillway | Routes bad rows to error sink |
| Probe | Non-blocking observability taps |
| Assert | Inline quality gates |
| Depot | Cross-run state & watermarks |
| Arcade | Reusable sub-pipelines |
Full details in the documentation.
The Healing Flow
When a pipeline fails, Aqueduct does not throw the stack trace at an LLM and hope for the best. Healing runs as a staged pipeline. Every stage is auditable, and the LLM works inside a constrained grammar — it cannot write code, mutate files, or run shell commands.
Modes
Approval mode decides who applies a generated patch. Deterministic guardrails — allowed paths, forbidden operations, minimum confidence — bound every patch regardless of mode.
| Mode | Who applies the patch | When it changes the Blueprint | Use when |
|---|---|---|---|
disabled |
LLM never fires | Never | Healing is intentionally off. |
human |
Engineer reviews and applies | Only after human accepts | Production. Default behind CI/CD. |
ci |
External CI receives patch and opens a PR | Only after merge | Production with code review. |
auto |
Aqueduct applies in-memory, re-validates, writes only if the re-run succeeds | Only on a successful re-run | Trusted environments — dev, scoped pipelines. |
Low-confidence patches and any guardrail violation auto-escalate to human review.
The stages
-
Capture. Transient errors retry first. Non-transient failures — schema drift, missing columns, bad paths, OOM — trigger the agent. The Surveyor assembles a self-contained failure package: compiled module config, a provenance map of where every config value came from, a sliced lineage neighbourhood, and a structured root-cause block with the offending column and Spark's own suggestions. The LLM sees an actionable diagnosis, not a JVM trace.
-
Prune. A context pruner trims the package to the failure's blast radius. Pruning improves accuracy and latency; cost is not the bottleneck.
-
Generate. The LLM responds only with a structured patch — a list of typed operations that map one-to-one to Blueprint edits. Anything else is rejected.
-
Reprompt. Schema errors, guardrail violations, and gate rejections feed back into the same conversation as annotated, field-level corrections. The model sees what it wrote and why it was wrong, instead of restarting cold. Slow providers and stuck-signature loops are bounded by a multi-axis budget — wall-clock, tokens, reprompt count, and progress-stall windows — and the loop can honestly defer to a human when no fix is possible, returning a structured diagnosis instead of a hallucinated patch.
-
Gate. Before a patch touches the Blueprint, four gates run in order: guardrails (path and operation policy), compile-check (the patched dict must still produce a valid Manifest), lineage gate (column-level diff catches broken references before Spark sees them), and sandbox gate (a sampled or full replay catches "parsed but produces nothing"). Sandbox rejections feed back into the same reprompt thread, not a separate retry.
-
Confirm and write. Only after every gate passes does the patch run against the real pipeline. The on-disk Blueprint is rewritten only if the full re-run succeeds. Failed patches stage to
patches/pending/for inspection; successful patches commit topatches/applied/, giving every autonomous change a Git-diffable audit trail.
Why it is reliable
- No silent mutations. Every patch is a structured diff with a rationale and a confidence score. Low confidence escalates to human review.
- No production data corruption. The sandbox validates patches against representative data before they reach live writes.
- No runaway loops. Budgets bound wall-clock, tokens, and stuck-signature counts. A rolling rate-limit caps healing attempts per hour per blueprint.
- No black-box decisions. Every LLM turn persists with the gate that rejected it, a stable error signature, and the prompt version. One outer run id joins every iteration of one heal call.
Why it is efficient
Healing terminates on the first successful patch. Each turn carries only the context the model needs. Structured error extraction replaces multi-kilobyte traces with a short root-cause block. The gate pyramid rejects bad patches in seconds via lineage and sandbox sampling — full-pipeline replay only ever runs against patches that have already passed cheaper checks.
Getting Started
Installation
pip install aqueduct-core[spark]
Compose extras as needed — pip install aqueduct-core[spark,airflow,aws]:
| Extra | Adds | Install when |
|---|---|---|
spark |
PySpark 4 + Delta Lake | Running pipelines on this host. |
airflow |
Apache Airflow operator shim | Scheduler / worker host; the box submitting jobs to Spark. |
secrets |
AWS + GCP + Azure secret-manager SDKs (or pick aws / gcp / azure individually) |
Resolving @aq.secret('KEY') against a cloud vault. |
stores |
Postgres + Redis backends (or pick postgres / redis individually) |
Replacing single-writer DuckDB defaults for obs / lineage / depot. |
all |
Everything above | Single-laptop dev. |
A first blueprint
aqueduct: "1.0"
id: hello.pipeline
macros:
active: "status = 'active' AND deleted_at IS NULL"
modules:
- id: load
type: Ingress
config: { format: csv, path: "data/in.csv", options: { header: true } }
- id: clean
type: Channel
config:
op: sql
query: "SELECT order_id, amount FROM load WHERE {{ macros.active }}"
- id: save
type: Egress
config: { format: parquet, path: "data/out/", mode: overwrite }
edges:
- { from: load, to: clean }
- { from: clean, to: save }
agent:
approval_mode: human
budget:
max_reprompts: 5
max_seconds: 120
same_signature_overall: 3
Engine-wide defaults live in a separate aqueduct.yml (LLM provider, store backends, danger settings). Inline module tests live in *.aqtest.yml. Repeatable healing benchmarks live in *.aqscenario.yml. The Gallery has runnable examples of each.
Five commands to know
aqueduct doctor blueprints/hello.yml— preflight check. Validates YAML, resolves paths, verifies LLM reachability, opens stores.aqueduct run blueprints/hello.yml— execute the pipeline. On failure, the agent generates a patch underpatches/pending/.aqueduct patch apply patches/pending/<id>.json --blueprint blueprints/hello.yml— review and accept a staged patch. Moves it topatches/applied/.aqueduct test blueprints/hello.aqtest.yml— run Channel / Junction / Funnel modules against inline data. No Ingress, no Egress, no external I/O.aqueduct benchmark gallery/aqscenarios/ --model claude-sonnet-4-6 --model qwen2.5-coder:7b— compare LLM models against simulated failures. No Spark required.
Full reference in CLI Reference.
References
- Blueprint & Engine Spec — Module types, configs, architecture, healing loop
- CLI Reference — All commands and flags
- Spark Guide — Warnings, performance, tuning
- Observability Guide — Schemas + diagnostic query cookbook
- Production Guide — Cluster deployment, security, Delta operations
- Compatibility Matrix — Supported Python × Spark versions, pinning recipe
- Roadmap — Deferred features and future plans
- Gallery — Real working examples
Contributing
Contributions are welcome! See CONTRIBUTING.md.
Aqueduct is Apache 2.0 licensed — free, open source, no telemetry, no lock-in.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file aqueduct_core-1.1.2.tar.gz.
File metadata
- Download URL: aqueduct_core-1.1.2.tar.gz
- Upload date:
- Size: 304.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
93d95ce9ce7593b958bd1722fcc63c78b3e6b889dd95d3b4338eaa5d37503723
|
|
| MD5 |
78d0fb4722b03f802c3ed3e5d5dcad54
|
|
| BLAKE2b-256 |
42db5d115f28a47d0af42d38b5af6b5a046f8562f8e187eae13bea185535c660
|
File details
Details for the file aqueduct_core-1.1.2-py3-none-any.whl.
File metadata
- Download URL: aqueduct_core-1.1.2-py3-none-any.whl
- Upload date:
- Size: 325.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
59d0f3d2a74b7575b38ebaeac33adcd4aea7d8e1ff2cd0f07c508c0e4218f548
|
|
| MD5 |
340d6fb9bf3c915777782451a3f64340
|
|
| BLAKE2b-256 |
3cb517aff74a320afb83867926be528a165280245cde51c13eeed95fbc623cd3
|