Skip to main content

Declarative, self-healing Apache Spark pipelines.

Project description

Aqueduct

Intelligent, self-healing Spark pipelines. Declarative. Observable. Autonomous.

License Python

Aqueduct is a control plane for Apache Spark. You write pipelines as YAML Blueprints. Aqueduct validates, compiles, and executes them—monitoring every step. When something breaks, Aqueduct can autonomously patch the pipeline using an LLM agent, applying structured, auditable fixes.


What Makes Aqueduct Different?

  • Declarative YAML Blueprints — No DAG wiring in code. Version-control your entire pipeline.
  • Any Spark Connector — Pass any format Spark supports (JDBC, Kafka, Avro, ORC, Delta, Parquet, CSV…) directly in config. Aqueduct adds no format restrictions.
  • LLM-First Observability — Every failure ships with a complete FailureContext JSON, ready for an agent (or you) to diagnose without digging through logs.
  • Patch Grammar, Not Codegen — The LLM operates inside a structured PatchSpec schema. Patches are auditable, reversible, and never hallucinate invalid YAML.
  • Zero-Cost Observability — Probes capture schema snapshots, null rates, and sample rows using lazy Spark operations and sampling. No full scans.
  • Spillway Error Routing — Bad rows route to a separate error Egress with _aq_error_* metadata columns. Good rows flow uninterrupted.
  • Depot KV Store — Cross-run pipeline state (watermarks, counters) backed by DuckDB. Read at compile time via @aq.depot.get(), write at runtime via format: depot Egress.
  • Passive-by-Default Gates — Regulators (data quality gates) compile away entirely unless wired to a Probe signal. Zero overhead for unused features.

Open-Core Model

Aqueduct Core is Apache 2.0 licensed and always will be.

You can run it locally, in CI, or on a production Spark cluster — free, forever.

A commercial frontend, Aqueduct Platform, adds:

  • Centralized dashboards for all your pipelines
  • Team collaboration and RBAC
  • Managed Depot (persistent KV store)
  • Audit logs of every LLM patch

The Core engine emits a documented webhook event stream so you can integrate it with any frontend — ours, yours, or a third-party.

This repository contains the full engine. No telemetry. No proprietary code.


Installation

# Core CLI + parser + compiler + LLM self-healing (no Spark dependency)
pip install aqueduct-core

# With Spark execution (required for aqueduct run)
pip install aqueduct-core[spark]

# Everything
pip install aqueduct-core[all]

The base package installs the CLI, parser, compiler, and LLM self-healing. All LLM providers (Anthropic, OpenAI-compatible, Ollama) use httpx which is a core dependency — no extra install needed. Spark execution requires the [spark] extra (pyspark, delta-spark).

Requires Python 3.11+ and Java 17 (for local Spark).


Quick Start

1. Write a Blueprint

# pipeline.yml
aqueduct: "1.0"
id: my.first.pipeline
name: "Order Processing"

context:
  today: "@aq.date.today()"

modules:
  - id: raw_orders
    type: Ingress
    label: "Load orders"
    config:
      format: parquet
      path: "s3a://data/orders/"

  - id: clean_orders
    type: Channel
    label: "Filter invalid rows"
    config:
      op: sql
      query: |
        SELECT *
        FROM raw_orders
        WHERE amount IS NOT NULL
          AND order_date <= CURRENT_TIMESTAMP()
      spillway_condition: "amount IS NULL OR order_date > CURRENT_TIMESTAMP()"

  - id: good_output
    type: Egress
    label: "Write clean orders"
    config:
      format: parquet
      path: "s3a://processed/orders/date=${ctx.today}/"
      mode: overwrite

  - id: error_output
    type: Egress
    label: "Write rejected rows"
    config:
      format: parquet
      path: "s3a://errors/orders/date=${ctx.today}/"
      mode: overwrite

edges:
  - from: raw_orders
    to: clean_orders
  - from: clean_orders
    to: good_output
  - from: clean_orders
    to: error_output
    port: spillway

2. Configure the engine

# aqueduct.yml
aqueduct_config: "1.0"

deployment:
  master_url: "spark://your-cluster:7077"

spark_config:
  spark.hadoop.fs.s3a.endpoint: "http://minio:9000"
  spark.hadoop.fs.s3a.access.key: "@aq.secret('S3_KEY')"
  spark.hadoop.fs.s3a.secret.key: "@aq.secret('S3_SECRET')"
  spark.jars.packages: "org.apache.hadoop:hadoop-aws:3.3.4"

stores:
  observability:
    path: ".aqueduct/signals"
  depot:
    path: ".aqueduct/depot.duckdb"

3. Run

aqueduct run pipeline.yml --config aqueduct.yml
▶ my.first.pipeline  (4 modules)  run=abc123  master=spark://your-cluster:7077
  ✓ raw_orders
  ✓ clean_orders
  ✓ good_output
  ✓ error_output

✓ pipeline complete  run_id=abc123

Core Concepts

Blueprint

A YAML file describing your pipeline. Contains modules (data sources, transforms, sinks) and edges (data flow).

Manifest

The compiled, fully-resolved form of a Blueprint. All @aq.* runtime tokens resolved, Arcades expanded, passive Regulators compiled away. This is what the Executor runs.

Module Types

Type Role
Ingress Load data from any Spark-supported source
Egress Write data to any Spark-supported sink, or format: depot for KV state
Channel SQL transform; optional spillway_condition routes bad rows
Junction Split one DataFrame into named branches (conditional / broadcast / partition)
Funnel Merge multiple DataFrames (union_all / union / coalesce / zip)
Probe Capture observability signals (schema, null rates, sample rows) — never halts pipeline
Regulator Data quality gate; evaluates Probe signals; blocks or skips downstream on failure
Arcade Reusable sub-Blueprint; namespaced and inlined at compile time

Runtime Functions (@aq.*)

Resolved at compile time on the driver:

path: "s3a://data/date=@aq.date.today()/"
path: "s3a://data/from=@aq.depot.get('last_run_date', '2020-01-01')/"
run_id: "@aq.runtime.run_id()"
prev_run: "@aq.runtime.prev_run_id()"
key: "@aq.secret('MY_SECRET')"

Spillway

Route bad rows to a separate sink without stopping the pipeline:

- id: clean
  type: Channel
  config:
    op: sql
    query: "SELECT * FROM __input__"
    spillway_condition: "amount IS NULL"  # rows matching this → spillway port

edges:
  - from: clean
    to: good_sink           # clean rows
  - from: clean
    to: error_sink
    port: spillway          # null-amount rows with _aq_error_* columns

Depot KV Store

Persist state across pipeline runs:

# Read at compile time
path: "s3a://data/from=@aq.depot.get('last_processed_date', '2020-01-01')/"

# Write at runtime (Egress module)
- id: save_watermark
  type: Egress
  config:
    format: depot
    key: last_processed_date
    value_expr: "MAX(order_date)"   # single Spark aggregate

CLI Reference

aqueduct validate pipeline.yml              # Parse and validate only
aqueduct compile  pipeline.yml              # Output resolved Manifest JSON
aqueduct run      pipeline.yml              # Compile and execute
aqueduct run      pipeline.yml \
  --config aqueduct.yml \
  --store-dir .aqueduct/signals \
  --run-id my-run-001 \
  --ctx env=prod

aqueduct check-config                       # Validate aqueduct.yml schema; print resolved summary
aqueduct check-config --config path/to/aqueduct.yml

aqueduct doctor                             # Probe all resources end-to-end (config, stores, secrets, webhook, Spark, storage)
aqueduct doctor --skip-spark               # Skip JVM startup — fast CI health check
aqueduct doctor --config path/to/aqueduct.yml

aqueduct patch apply patch.json --blueprint pipeline.yml
aqueduct patch reject <patch-id> --reason "Incorrect column name"

Observability

Aqueduct writes all observability data to DuckDB files:

# Run records
duckdb .aqueduct/signals/runs.db
SELECT run_id, pipeline_id, status, started_at, finished_at FROM run_records;

# Probe signals
duckdb .aqueduct/signals/signals.db
SELECT probe_id, signal_type, payload FROM probe_signals ORDER BY captured_at DESC;

# Depot state
duckdb .aqueduct/depot.duckdb
SELECT key, value, updated_at FROM depot_kv;

Architecture

Blueprint (YAML)
    │
    ▼
┌─────────┐
│ Parser  │  Validate schema, resolve ${ctx.*}, detect cycles
└────┬────┘
     │ Blueprint AST
     ▼
┌──────────┐
│ Compiler │  Resolve @aq.* tokens, expand Arcades, compile-away passive Regulators
└────┬─────┘
     │ Manifest (JSON)
     ▼
┌──────────┐
│ Executor │  Topo-sort → frame_store → Spark actions
└────┬─────┘
     │ ExecutionResult
     ▼
┌──────────┐
│ Surveyor │  Write run record, FailureContext, fire webhooks
└──────────┘

Development

git clone https://github.com/your-org/aqueduct
cd aqueduct
pip install -e ".[spark,dev]"

# Run tests (requires Java 17)
source ~/.bashrc && use_java17
pytest tests/

Tests include unit tests and blueprint integration tests that run real Spark (local[*]). Coverage target: 80%.


License

Apache 2.0. See LICENSE.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aqueduct_core-1.0.0a0.tar.gz (92.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

aqueduct_core-1.0.0a0-py3-none-any.whl (100.1 kB view details)

Uploaded Python 3

File details

Details for the file aqueduct_core-1.0.0a0.tar.gz.

File metadata

  • Download URL: aqueduct_core-1.0.0a0.tar.gz
  • Upload date:
  • Size: 92.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.4

File hashes

Hashes for aqueduct_core-1.0.0a0.tar.gz
Algorithm Hash digest
SHA256 694e20c28be201dc616738718828ba9f613575c6f184b8a691903b00e81f635c
MD5 ca077507bb88c3791fb23f7648d77030
BLAKE2b-256 02a5124749f8732aad841d3fbdaff9520d2533a5230e0627b85fa1cf0e9b0477

See more details on using hashes here.

File details

Details for the file aqueduct_core-1.0.0a0-py3-none-any.whl.

File metadata

File hashes

Hashes for aqueduct_core-1.0.0a0-py3-none-any.whl
Algorithm Hash digest
SHA256 81c7967f68f22bff2da6cd994df86985f43660d17b78fd289dda27582217a2a7
MD5 99c06504d939ba947a15389336d00fb2
BLAKE2b-256 af6c23a066754d3b762de258b88bed112248dbc6e5437f71d27e89ce94a7ab97

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page