Orchestration layer for market data pipelines
Project description
๐ Market Data Pipeline
Enterprise-grade streaming DAG engine for real-time and batch market data with adaptive autoscaling
Features โข Quick Start โข Architecture โข Documentation โข Examples
๐ Table of Contents
- Overview
- Features
- Technology Stack
- Quick Start
- Architecture
- Installation
- Usage
- Configuration
- DAG Runtime
- KEDA Autoscaling
- Development
- Testing
- Production Deployment
- Monitoring
- Extending
- Contributing
- License
๐ฏ Overview
Market Data Pipeline is a production-grade orchestration system for real-time and batch market data processing. It provides a flexible, scalable platform for ingesting, transforming, and delivering market data from multiple sources (IBKR, replay, synthetic) to various sinks (TimescaleDB, Kafka, databases).
What It Does
Moves data from sources โ through operators โ into sinks with:
- โ Flow control & backpressure management
- โ Pacing compliance for API rate limits
- โ Event-time processing with watermarks
- โ Adaptive autoscaling via KEDA
- โ Comprehensive observability (Prometheus, Grafana)
- โ Production-ready deployment patterns
Use Cases
- Real-time market data ingestion from IBKR
- Historical data replay for backtesting
- High-frequency data aggregation (ticks โ bars)
- Options chain processing with Greeks calculation
- Multi-provider data fusion and normalization
- Cost-optimized autoscaling deployments
โจ Features
Core Capabilities
-
๐ Dual Runtime Modes
- Classic Mode: Traditional pipeline orchestration
- DAG Mode: Streaming directed acyclic graph engine
-
๐ Data Sources
- IBKR: Live market data with reconnection handling
- Replay: Historical data from Parquet files
- Synthetic: Predictable data generation for testing
-
โ๏ธ Operators
- Bar Aggregation: OHLCV bars with event-time correctness
- Options Processing: Chain snapshots with Greeks
- Windowing: Tumbling, sliding, and session windows
- Partitioning: Hash-based data distribution
- Custom: Extensible operator framework
-
๐ค Sinks
- Store: TimescaleDB via
market_data_store - Kafka: High-throughput streaming
- Database: Direct SQL writes with retries
- Custom: Pluggable sink architecture
- Store: TimescaleDB via
Advanced Features (Phase 5-6)
-
๐ฏ Streaming DAG Engine
- Compose operators into directed acyclic graphs
- Event-time windowing with watermarks
- Hash partitioning for parallelism
- Channel-based backpressure
-
๐ Adaptive Autoscaling
- Dynamic rate adjustment based on downstream pressure
- KEDA-based horizontal pod autoscaling
- Prometheus metrics integration
- Zero-touch operations in Kubernetes
-
๐ง Backpressure Feedback Loop
- Closed-loop control from store to pipeline
- Policy-based scaling (OK/SOFT/HARD)
- Graceful degradation under load
Operational Excellence
- ๐๏ธ Flow Control: Bounded queues, drop policies, hybrid batching
- โฑ๏ธ Pacing: Token-bucket rate limiting with provider-specific error codes
- ๐ Retry Logic: Exponential backoff for transient failures
- ๐ Observability: 50+ Prometheus metrics, health checks, structured logs
- ๐ณ Container-Native: Multi-stage Docker builds, Kubernetes manifests
- ๐งช Comprehensive Testing: 176 tests (unit + integration + load)
๐ ๏ธ Technology Stack
Core Technologies
| Category | Technologies |
|---|---|
| Language | Python 3.11+ |
| Async Runtime | asyncio, aiofiles, aiokafka |
| Web Framework | FastAPI, uvicorn |
| Data Processing | pandas, numpy, pyarrow |
| Configuration | Pydantic, pydantic-settings, YAML |
| Observability | Prometheus, OpenTelemetry, structlog |
| Message Queue | Kafka (aiokafka), MQTT |
| Database | TimescaleDB, PostgreSQL (via market_data_store) |
Development Tools
| Category | Technologies |
|---|---|
| Testing | pytest, pytest-asyncio, pytest-cov, freezegun |
| Code Quality | black, ruff, mypy |
| CI/CD | GitHub Actions, Docker, docker-compose |
| Container | Docker, Kubernetes, KEDA |
| Deployment | Helm (planned), docker-compose |
Optional Dependencies
- DAG Runtime:
mmh3for fast hash partitioning - IBKR:
market_data_ibkrfor live data - Store:
market_data_storefor persistence - Core:
market_data_corefor data models
๐ Quick Start
Prerequisites
- Python 3.11 or higher
- Docker (optional, for full stack)
- Kubernetes cluster (optional, for KEDA autoscaling)
Local Development Setup
# Clone the repository
git clone https://github.com/mjdevaccount/market_data_pipeline.git
cd market_data_pipeline
# Create virtual environment
python -m venv .venv
# Activate (Windows PowerShell)
.\.venv\Scripts\Activate.ps1
# Activate (Linux/macOS)
source .venv/bin/activate
# Install with development dependencies
pip install -e ".[dev,dag]"
# Run tests
pytest tests/ -v
# Start the API server
mdp run --config configs/dag/bars.yaml
Docker Quick Start
# Start complete stack (pipeline + TimescaleDB)
docker compose up --build -d
# Run smoke test
./scripts/smoke_test.sh # Linux/macOS
.\scripts\smoke_test.ps1 # Windows
# Check health
curl http://localhost:8083/health
# View metrics
curl http://localhost:8083/metrics
# Stop stack
docker compose down
Simple Python Example
from market_data_pipeline import create_pipeline
# Create a simple synthetic pipeline
pipeline = create_pipeline(
tenant_id="demo",
pipeline_id="quickstart",
source="synthetic",
symbols=["AAPL", "MSFT"],
operator="bars",
sink="store",
duration_sec=30.0
)
# Run the pipeline
await pipeline.run()
await pipeline.close()
๐๏ธ Architecture
High-Level Architecture
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Market Data Pipeline โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โผ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Unified Runtime (v0.8.1) โ
โ โโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโ โ
โ โ Classic Mode โ DAG Mode โ โ
โ โโโโโโโโโโโโโโโโดโโโโโโโโโโโโโโโ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โโโโโโโโโโโโโโโโโโโโโโโดโโโโโโโโโโโโโโโโโโโโโโ
โผ โผ โผ
โโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโ
โ Sources โ โ Operators โ โ Sinks โ
โ โโโโโโถโ โโโโโถโ โ
โ โข IBKR โ โ โข Bars โ โ โข Store โ
โ โข Replay โ โ โข Options โ โ โข Kafka โ
โ โข Synthetic โ โ โข Windows โ โ โข Database โ
โ โ โ โข Partitions โ โ โ
โโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโ
โ โ โ
โโโโโโโโโโโโโโโโโโโโโโโดโโโโโโโโโโโโโโโโโโโโโโ
โ
โผ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Observability Layer โ
โ โ
โ โข Prometheus Metrics โ
โ โข Health Checks โ
โ โข Structured Logs โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โผ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Kubernetes (KEDA Autoscaling) โ
โ โ
โ โโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ โ
โ โ Feedback โโโโโถโ Rate โ โ
โ โ Loop โ โ Coordinator โ โ
โ โโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ โ
โ โ โ
โ โผ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ KEDA ScaledObject โ โ
โ โ (Horizontal Autoscaling) โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Data Flow
Sources Operators Batchers Sinks
โโโโโโโโโโ โโโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโ
IBKR API โโโถ SecondBar โโโถ Hybrid โโโถ Store
Replay File โโโถ Aggregator โโโถ Batcher โโโถ (TimescaleDB)
Synthetic โโโถ Options โโโถ (Row/Byte/ โโโถ Kafka
Chain Time limits)
โ Event Time
โ Watermarks
โ Windowing
Phase 5: Streaming DAG Engine
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ DAG Runtime (v0.8.0) โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ Provider โโโถ Channel โโโถ Operator โโโถ Channel โโโถ Sink โ
โ โ โ โ โ โ โ
โ โ โ โ โ โ โ
โ (IBKR) (bounded) (transform) (bounded) (persist) โ
โ โฒ โฒ โ
โ โ โ โ
โ Backpressure Watermarks โ
โ โ
โ Features: โ
โ โข Event-time windowing โ
โ โข Hash partitioning โ
โ โข Contrib operators (OHLC, dedupe, throttle, router) โ
โ โข Provider adapters (IBKR โ DAG) โ
โ โข Unified CLI (mdp run --config) โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Phase 6: Adaptive Autoscaling
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Backpressure Feedback Loop (6.0A) โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ Store Queue โโโถ FeedbackEvent โโโถ FeedbackHandler โ
โ (9000/10000) (level=HARD) (scale=0.0) โ
โ โ โ
โ โผ โ
โ RateCoordinator โ
โ (pause ingestion) โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ KEDA Autoscaling (6.0B) โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ Metrics โโโถ Prometheus โโโถ KEDA โโโถ Kubernetes HPA โ
โ โข queue_depth Query Scale: 1 โ 8 pods โ
โ โข backpressure_state threshold โ
โ โข rate_scale_factor (5000) โ
โ โ
โ Result: Zero-touch horizontal scaling โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Key Design Principles
- Separation of Concerns: Clean abstractions between sources, operators, and sinks
- Composability: Any source โ operator โ sink combination works
- Resilience: Built-in retry, backpressure, and graceful degradation
- Observability: Metrics at every stage, health endpoints, clear bottleneck detection
- Performance: Async I/O, bounded queues, efficient batching
- Type Safety: Pydantic models, mypy validation
๐ฆ Installation
Standard Installation
# Basic installation
pip install -e .
# With development tools
pip install -e ".[dev]"
# With DAG runtime support
pip install -e ".[dag]"
# With all extras
pip install -e ".[dev,dag]"
Optional Dependencies
# IBKR live data (separate repository)
pip install market-data-ibkr
# TimescaleDB persistence (separate repository)
pip install market-data-store
# Core data models (separate repository)
pip install market-data-core
From Requirements
# Production dependencies (pinned versions)
pip install -r requirements.txt
# Development dependencies
pip install -r requirements.txt
pip install -e ".[dev,dag]"
Docker Installation
# Build image
docker build -t mdp-pipeline:latest .
# Or use docker-compose
docker compose up --build
๐ป Usage
1. CLI (mdp command)
Unified Runtime
# Run DAG mode with YAML config
mdp run --config configs/dag/bars.yaml
# Run classic mode
mdp run --config configs/classic/bars.yaml
# List available commands
mdp --help
New Job Execution System
# Validate configuration
mdp validate --config configs/sample.yaml
# Execute a live job
mdp job --config configs/sample.yaml --job synthetic_live
# Execute a backfill job
mdp job --config configs/sample.yaml --job synthetic_backfill
# Execute with profile override
mdp job --config configs/sample.yaml --job synthetic_live --profile prod
Stream Processing System (Phase 13.0)
# Start synthetic data producer
mdp stream produce --config configs/streaming.yaml --provider synthetic
# Start IBKR data producer
mdp stream produce --config configs/streaming.yaml --provider ibkr
# Start micro-batcher for windowing
mdp stream micro-batch --config configs/streaming.yaml --window 2s
# Start inference engine
mdp stream infer --config configs/streaming.yaml --adapter rules
# Tail stream events
mdp stream tail --topic mdp.events --limit 50
# Tail signals
mdp stream tail --topic mdp.signals --limit 20
# Replay historical data
mdp stream replay --config configs/streaming.yaml --dataset spy_1day_history --from "2025-10-01" --to "2025-10-02"
Legacy CLI
# Simple synthetic pipeline
mdp-legacy run --tenant T1 --symbols AAPL,MSFT --rate 100 --duration 30
# Using JSON spec
mdp-legacy runspec --spec examples/synthetic.json
# Start API server
mdp-legacy api --port 8000
2. Python API
Simple API
from market_data_pipeline import create_pipeline
# Quick start
pipeline = create_pipeline(
tenant_id="demo",
pipeline_id="simple",
source="synthetic",
symbols=["AAPL", "GOOGL"],
operator="bars",
sink="store",
duration_sec=60.0
)
await pipeline.run()
Advanced Configuration
from market_data_pipeline import PipelineBuilder, PipelineSpec, PipelineOverrides
builder = PipelineBuilder()
spec = PipelineSpec(
tenant_id="production",
pipeline_id="hft_bars",
source="ibkr",
symbols=["SPY", "QQQ", "IWM"],
operator="bars",
sink="store",
overrides=PipelineOverrides(
ticks_per_sec=1000,
pacing_max_per_sec=5000,
batch_size=2000,
sink_workers=8,
bar_window_sec=5,
bar_allowed_lateness_sec=2
)
)
pipeline = builder.build(spec)
await pipeline.run()
Unified Runtime API
from market_data_pipeline.runtime import UnifiedRuntime
from market_data_pipeline.settings import UnifiedRuntimeSettings
# DAG mode
settings = UnifiedRuntimeSettings(
mode="dag",
dag={
"graph": {
"nodes": [
{"id": "src", "type": "provider.ibkr.stream", "params": {...}},
{"id": "op", "type": "operator.buffer", "params": {...}},
{"id": "sink", "type": "operator.map", "params": {...}}
],
"edges": [["src", "op"], ["op", "sink"]]
}
},
feedback={"enable_feedback": True},
metrics={"enable": True}
)
async with UnifiedRuntime(settings) as rt:
await rt.run("my-job")
3. FastAPI Server
# Start server
uvicorn market_data_pipeline.runners.api:app --port 8083
# Health check
curl http://localhost:8083/health
# Create pipeline
curl -X POST http://localhost:8083/pipelines \
-H "Content-Type: application/json" \
-d '{
"tenant_id": "T1",
"pipeline_id": "test1",
"source_type": "synthetic",
"symbols": ["AAPL"],
"rate": 100,
"duration": 30
}'
# Get metrics
curl http://localhost:8083/metrics
4. Docker Compose
# Start full stack
docker compose up -d
# View logs
docker compose logs -f pipeline
# Scale pipeline
docker compose up --scale pipeline=3
# Stop
docker compose down
โ๏ธ Configuration
Configuration Methods
- Environment Variables (
MDP_*prefix) - YAML Files (
config.yaml, custom configs) - JSON Specs (
examples/*.json) - Python Code (PipelineOverrides, settings objects)
- New Job Configuration System (Layered configs with profiles)
New Job Configuration System
The new job execution system uses a layered configuration approach with support for profiles, environment variable expansion, and provider-specific settings.
Example Configuration
# configs/sample.yaml
version: 2
profile: dev
# Providers
providers:
synthetic_1:
type: synthetic
enabled: true
seed: 42
# Storage
storage:
primary:
type: timescaledb
uri: "postgresql://user:pass@localhost:5432/marketdata"
schema: "public"
table_bars: "bars_ohlcv"
write:
batch_size: 1000
upsert_keys: ["provider", "symbol", "interval", "ts"]
deduplicate: true
# Datasets
datasets:
synthetic_smoke:
provider: synthetic_1
symbols: ["SPY", "AAPL", "MSFT"]
interval: "1d"
fields: ["open", "high", "low", "close", "volume"]
adjustments:
splits: true
dividends: true
# Jobs
jobs:
synthetic_live:
dataset: synthetic_smoke
mode: live
execution:
concurrency: 1
retry:
max_attempts: 3
backoff_seconds: 5
synthetic_backfill:
dataset: synthetic_smoke
mode: backfill
backfill:
from: "now-30d"
to: "now"
chunk: "7d"
concurrency: 1
execution:
concurrency: 1
retry:
max_attempts: 3
backoff_seconds: 5
# Features
features:
dry_run: false
write_enabled: true
export_enabled: false
# Telemetry
telemetry:
log_level: INFO
metrics:
enabled: true
port: 9090
Key Features
- Layered Configuration: Base config + includes + profile overlays
- Environment Variable Expansion:
${VAR:-default}syntax - Provider Abstraction: Pluggable providers (IBKR, synthetic, etc.)
- Job Modes: Live streaming and historical backfill
- Storage Routing: Primary DB + optional lake export
- Feature Flags: Runtime control of write/export operations
- Profile Support: dev/staging/prod environment switching
Environment Variables
# Pipeline settings
export MDP_MODE=dag
export MDP_FB_ENABLE_FEEDBACK=true
export MDP_FB_PROVIDER_NAME=ibkr
export MDP_METRICS_ENABLE=true
# Database
export DATABASE_URL="postgresql://user:pass@localhost:5432/market_data"
# Kafka
export KAFKA_BOOTSTRAP_SERVERS="localhost:9092"
export KAFKA_TOPIC="market_data"
Example YAML Config
# configs/dag/bars.yaml
mode: dag
dag:
nodes:
- id: ibkr_source
type: provider.ibkr.stream
params:
stream: "bars"
symbols: ["AAPL", "MSFT"]
resolution: "5s"
- id: buffer
type: operator.buffer
params:
max_items: 500
- id: store_sink
type: operator.map
params:
fn_name: "store_bars"
edges:
- [ibkr_source, buffer]
- [buffer, store_sink]
feedback:
enable_feedback: true
provider_name: ibkr
metrics:
enable: true
Configuration Overrides
PipelineOverrides(
# Source
ticks_per_sec=500,
pacing_max_per_sec=2000,
# Operator
bar_window_sec=5,
bar_allowed_lateness_sec=1,
# Batcher
batch_size=1000,
max_bytes=1024_000,
flush_ms=50,
drop_policy="oldest",
# Sink
sink_workers=4,
sink_queue_max=500
)
๐ฏ DAG Runtime (Phase 5)
The DAG runtime provides a powerful streaming engine for composing data pipelines.
Core Components
- Channels: Bounded queues with backpressure (high/low watermarks)
- Operators: Transform data (map, filter, buffer, window, partition)
- Registry: Component discovery and instantiation
- Builder: Graph construction from configuration
Example DAG
from market_data_pipeline.orchestration.dag import DagRuntime, Dag, Node, Edge
# Build DAG
dag = Dag()
dag.add_node(Node("source", meta={"type": "provider.ibkr.stream"}))
dag.add_node(Node("window", meta={"type": "operator.tumbling_window"}))
dag.add_node(Node("sink", meta={"type": "operator.map"}))
dag.add_edge(Edge("source", "window"))
dag.add_edge(Edge("window", "sink"))
# Run
runtime = DagRuntime(dag)
await runtime.run()
Windowing
from market_data_pipeline.orchestration.dag.windowing import (
tumbling_window_event_time,
TumblingWindowSpec
)
# 1-minute tumbling windows
async for frame in tumbling_window_event_time(
source=data_stream,
spec=TumblingWindowSpec(
window_sec=60,
watermark_lag_sec=5,
allowed_lateness_sec=2
)
):
print(f"Window {frame.window_start} - {frame.window_end}: {len(frame.items)} items")
Partitioning
from market_data_pipeline.orchestration.dag.partitioning import hash_partition
# Partition by symbol
channels = await hash_partition(
source=data_stream,
key_fn=lambda item: item.symbol,
num_partitions=4
)
# Process each partition
for i, channel in enumerate(channels):
asyncio.create_task(process_partition(i, channel))
Contrib Operators
from market_data_pipeline.orchestration.dag.contrib.operators_contrib import (
resample_ohlc,
deduplicate,
throttle,
router
)
# OHLC resampling
bars = resample_ohlc(ticks, interval="1m")
# Deduplication
unique = deduplicate(stream, key_fn=lambda x: x.symbol, window_sec=10)
# Rate limiting
throttled = throttle(stream, max_per_sec=100)
# Routing
output_channels = router(stream, routes={
"spy": lambda x: x.symbol == "SPY",
"tech": lambda x: x.symbol in ["AAPL", "MSFT", "GOOGL"]
})
Complete DAG Example
See examples/run_dag_to_store.py for a full example with IBKR โ DAG operators โ Store sink.
๐ KEDA Autoscaling (Phase 6)
Phase 6.0A: Backpressure Feedback Loop
Dynamically adjusts pipeline ingestion rate based on downstream pressure.
# Configuration
feedback:
enable_feedback: true
provider_name: ibkr
policy:
ok: 1.0 # Full rate
soft: 0.5 # Half rate
hard: 0.0 # Paused
How It Works:
- Store queue fills โ
FeedbackEventpublished FeedbackHandlerreceives event โ adjustsRateCoordinator- Pipeline slows ingestion โ queue drains
- Feedback returns to OK โ rate restored
Phase 6.0B: KEDA Horizontal Autoscaling
Automatically scales Kubernetes pods based on Prometheus metrics.
Metrics
# Queue depth (primary scaling metric)
pipeline_feedback_queue_depth{source="store_coordinator"}
# Backpressure state (0=ok, 1=soft, 2=hard)
pipeline_backpressure_state{provider="ibkr"}
# Rate scale factor (0.0-1.0)
pipeline_rate_scale_factor{provider="ibkr"}
Deployment
# Deploy to Kubernetes
kubectl apply -n market-data -f deploy/keda/deployment-pipeline.yaml
kubectl apply -n market-data -f deploy/keda/scaledobject-pipeline.yaml
# Watch scaling
kubectl get hpa -w
kubectl get pods -w
Scaling Behavior
| Condition | Queue | Backpressure | Pods |
|---|---|---|---|
| Idle | < 2K | OK (0) | 1 |
| Moderate | 5-7K | SOFT (1) | 3-5 |
| High | 8-9K | SOFT (1) | 6-8 |
| Critical | 9K+ | HARD (2) | 10 (max) |
Documentation: See docs/PHASE_6.0B_KEDA_AUTOSCALING.md for complete guide.
๐งโ๐ป Development
Setup Development Environment
# Clone repository
git clone https://github.com/mjdevaccount/market_data_pipeline.git
cd market_data_pipeline
# Create virtual environment
python -m venv .venv
source .venv/bin/activate # Linux/macOS
.\.venv\Scripts\Activate.ps1 # Windows
# Install with dev dependencies
pip install -e ".[dev,dag]"
# Setup pre-commit (optional)
pre-commit install
Development Tools
# Code formatting
black src/ tests/
# or: ./scripts/dev.sh fmt
# Linting
ruff src/ tests/
# or: ./scripts/dev.sh lint
# Type checking
mypy src/
# Run tests
pytest tests/ -v
# or: ./scripts/dev.sh test
# Test coverage
pytest tests/ --cov=src --cov-report=html
# Clean caches
./scripts/dev.sh clean
Project Structure
market_data_pipeline/
โโโ src/market_data_pipeline/ # Source code
โ โโโ source/ # Data sources (IBKR, Synthetic, Replay)
โ โโโ operator/ # Transformations (Bars, Options)
โ โโโ batcher/ # Flow control (HybridBatcher)
โ โโโ sink/ # Outputs (Store, Kafka, Database)
โ โโโ orchestration/ # DAG runtime, RateCoordinator
โ โ โโโ dag/ # DAG engine (Phase 5)
โ โ โโโ feedback/ # Backpressure feedback (Phase 6.0A)
โ โ โโโ coordinator.py # Rate coordination
โ โโโ runtime/ # Unified runtime (Phase 5.0.5)
โ โโโ cli/ # CLI commands
โ โโโ runners/ # Service runners (API, CLI)
โ โโโ settings/ # Configuration management
โ โโโ metrics.py # Prometheus metrics
โโโ tests/ # Test suite
โ โโโ unit/ # Unit tests
โ โโโ integration/ # Integration tests
โ โโโ load/ # Load tests
โโโ examples/ # Usage examples
โโโ docs/ # Documentation
โโโ deploy/ # Deployment manifests
โ โโโ keda/ # KEDA autoscaling (Phase 6.0B)
โโโ scripts/ # Development scripts
โโโ configs/ # Configuration files
๐งช Testing
Run Tests
# All tests
pytest tests/ -v
# Unit tests only
pytest tests/unit/ -v
# Integration tests (requires database)
export DATABASE_URL="postgresql://user:pass@localhost:5432/market_data"
pytest tests/integration/ -v
# Load tests
pytest tests/load/ -v
# With coverage
pytest tests/ --cov=src --cov-report=html
open htmlcov/index.html
# Contract tests (Core compatibility)
pytest tests/contracts/ -v
Test Categories
- Unit Tests (
tests/unit/): 150+ tests covering individual components - Integration Tests (
tests/integration/): End-to-end workflows - Load Tests (
tests/load/): Performance benchmarks - Contract Tests (
tests/contracts/): Core v1.1.0 compatibility validation
Contract Tests
Contract tests verify compatibility with market-data-core protocol contracts:
# Run contract tests only
pytest tests/contracts/ -v
# Expected: 10 tests, ~4s duration
Purpose: These tests validate that Pipeline remains compatible with Core's data transfer objects (DTOs) and protocols. They are automatically triggered when Core publishes contract changes.
Categories:
test_core_install.py: Core package imports and version compatibilitytest_feedback_flow.py: FeedbackEvent โ RateAdjustment transformationstest_protocol_conformance.py: Protocol implementations (RateController, FeedbackPublisher)
Note: Contract tests are a minimal, fast subset of the full integration test suite.
See tests/contracts/README.md for details.
Cross-Repo Testing: These tests are triggered automatically by market-data-core
via GitHub Actions when Core's contract schemas change. See .github/workflows/README.md
for workflow documentation
Pulse Integration (Phase 10.1)
The Pipeline subscribes to telemetry.feedback events from the Store via the Pulse event bus.
Configuration:
# Environment variables
PULSE_ENABLED=true # Enable Pulse (default: true)
EVENT_BUS_BACKEND=inmem # inmem or redis (default: inmem)
REDIS_URL=redis://localhost:6379/0 # Redis connection (if backend=redis)
MD_NAMESPACE=mdp # Stream namespace
SCHEMA_TRACK=v1 # Schema track (v1|v2)
Running Pulse Tests:
# Unit tests (inmem backend)
pytest tests/pulse/test_pulse_consumer.py -v
# Integration tests (redis backend, requires Redis)
EVENT_BUS_BACKEND=redis REDIS_URL=redis://localhost:6379/0 \
pytest tests/pulse/test_redis_integration.py -v
Metrics:
pulse_consume_total{stream,track,outcome}โ Events consumed (success/error/duplicate)pulse_lag_ms{stream}โ Consumer lag from event timestamp
Key Test Files
tests/unit/
โโโ orchestration/
โ โโโ test_coordinator_feedback.py # Phase 6.0A (13 tests)
โ โโโ test_feedback_handler.py # Phase 6.0A (12 tests)
โโโ metrics/
โ โโโ test_pipeline_metrics.py # Phase 6.0B (8 tests)
โโโ dag/
โ โโโ test_channel_backpressure.py # Phase 5.0.1
โ โโโ test_windowing_event_time.py # Phase 5.0.2
โ โโโ test_contrib_operators.py # Phase 5.0.3
โโโ ...
tests/integration/
โโโ test_feedback_integration.py # Phase 6.0A (5 tests)
โโโ test_e2e_synthetic_store.py # End-to-end
โโโ ...
Test Features
- โ Async Support: pytest-asyncio for async tests
- โ Time Control: freezegun for time-based testing
- โ Windows-Safe: Custom event loop fixtures
- โ Optional Deps: Graceful skipping when dependencies unavailable
- โ Real Code Paths: No mocks, tests exercise actual components
๐ Production Deployment
Docker Deployment
# Build image
export REG="your-registry.example.com"
export TAG="0.8.1"
docker build -t $REG/mdp-pipeline:$TAG .
docker push $REG/mdp-pipeline:$TAG
Kubernetes Deployment
# Create namespace
kubectl create namespace market-data
# Deploy pipeline
kubectl apply -n market-data -f deploy/keda/deployment-pipeline.yaml
# Deploy KEDA autoscaling
kubectl apply -n market-data -f deploy/keda/scaledobject-pipeline.yaml
# Deploy Prometheus ServiceMonitor
kubectl apply -n market-data -f deploy/keda/prometheus-servicemonitor.yaml
# Verify
kubectl -n market-data get pods
kubectl -n market-data get scaledobject
kubectl -n market-data get hpa
Configuration
# Update deployment image
kubectl -n market-data set image deployment/mdp-pipeline \
pipeline=$REG/mdp-pipeline:$TAG
# Update environment variables
kubectl -n market-data set env deployment/mdp-pipeline \
MDP_FB_ENABLE_FEEDBACK=true \
MDP_METRICS_ENABLE=true
Rollback
# Quick rollback
kubectl -n market-data rollout undo deployment/mdp-pipeline
# Rollback to specific version
kubectl -n market-data rollout undo deployment/mdp-pipeline --to-revision=2
Production Guides
- Deployment: See
PHASE_6.0_PRODUCTION_ROLLOUT.md - Quick Reference: See
PHASE_6.0_SHIP_IT_CHECKLIST.md - Verification: See
PHASE_6.0_VERIFICATION_REPORT.md
๐ Monitoring & Observability
Prometheus Metrics
50+ metrics exposed at /metrics:
# Source metrics
mdp_source_items_total{source,tenant,symbol}
mdp_source_status{source,tenant}
mdp_source_pacing_blocked_total{source,tenant}
# Operator metrics
operator_bars_generated_total{tenant,pipeline,operator}
# Batcher metrics
batcher_items_added_total{tenant,pipeline,batcher}
batcher_queue_depth{tenant,pipeline,batcher}
# Sink metrics
mdp_sink_batches_written_total{sink,tenant}
mdp_sink_commit_seconds{sink,tenant}
mdp_sink_queue_depth{sink,tenant}
# Phase 6.0B KEDA metrics
pipeline_rate_scale_factor{provider}
pipeline_backpressure_state{provider}
pipeline_feedback_queue_depth{source}
Grafana Dashboard
# Import dashboard
kubectl -n monitoring apply -f monitoring/grafana-dashboard.json
Panels:
- Queue Depth (time series)
- Rate Scale Factor (gauge)
- Backpressure State (stat)
- Pod Count (stat)
- Throughput (graph)
- Latency (heatmap)
Health Checks
# Application health
curl http://localhost:8083/health
# {"status":"healthy","service":"market_data_pipeline"}
# Kubernetes liveness
kubectl -n market-data get pods
# Shows READY 1/1
# Detailed status
curl http://localhost:8083/status
Alerts
Recommended Prometheus alerts:
# High queue depth
- alert: PipelineQueueHigh
expr: pipeline_feedback_queue_depth > 8000
for: 5m
# Hard backpressure
- alert: PipelineHardBackpressure
expr: pipeline_backpressure_state == 2
for: 2m
# Low rate scale
- alert: PipelineRateScaleLow
expr: pipeline_rate_scale_factor < 0.3
for: 10m
๐ง Extending the Pipeline
Add a Custom Operator
from market_data_pipeline.operator.base import StatefulOperator, EventTimePolicy
class VWAPOperator(StatefulOperator):
def __init__(self, window_sec: int = 60):
policy = EventTimePolicy(window_sec=window_sec)
super().__init__(policy=policy)
self._volume_prices = {}
self._volumes = {}
def _on_tick(self, tick: Quote) -> None:
symbol = tick.symbol
if symbol not in self._volume_prices:
self._volume_prices[symbol] = 0.0
self._volumes[symbol] = 0
self._volume_prices[symbol] += tick.price * tick.size
self._volumes[symbol] += tick.size
# Check if window complete (via EventTimePolicy)
if self._should_flush(tick.timestamp):
vwap = self._volume_prices[symbol] / self._volumes[symbol]
self._ready.append({"symbol": symbol, "vwap": vwap})
self._volume_prices[symbol] = 0.0
self._volumes[symbol] = 0
Add a Custom Sink
from market_data_pipeline.sink.base import Sink
from market_data_pipeline.sink.capabilities import SinkCapabilities
class S3Sink(Sink):
def __init__(self, bucket: str, prefix: str):
self.bucket = bucket
self.prefix = prefix
self.s3_client = None
@property
def capabilities(self) -> SinkCapabilities:
return SinkCapabilities.BATCH_WRITES
async def start(self) -> None:
import aioboto3
self.session = aioboto3.Session()
self.s3_client = await self.session.client('s3')
async def write(self, batch: List[Bar]) -> None:
key = f"{self.prefix}/{batch[0].timestamp.date()}.parquet"
df = pd.DataFrame([b.model_dump() for b in batch])
buffer = io.BytesIO()
df.to_parquet(buffer, engine='pyarrow')
buffer.seek(0)
await self.s3_client.put_object(
Bucket=self.bucket,
Key=key,
Body=buffer.getvalue()
)
async def close(self, drain: bool = True) -> None:
if self.s3_client:
await self.s3_client.close()
Register Components
from market_data_pipeline.orchestration.dag.registry import default_registry
registry = default_registry()
registry.register_operator("operator.vwap", VWAPOperator)
registry.register_sink("sink.s3", S3Sink)
๐ค Contributing
We welcome contributions! Please follow these guidelines:
Getting Started
- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-feature) - Make your changes
- Run tests (
pytest tests/ -v) - Run linters (
black .,ruff .,mypy src/) - Commit changes (
git commit -m 'Add amazing feature') - Push to branch (
git push origin feature/amazing-feature) - Open a Pull Request
Code Standards
- Style: Follow black formatting (88 char line length)
- Linting: Pass ruff checks
- Type Hints: Full type annotations (mypy strict mode)
- Tests: Add tests for new features (maintain >95% coverage)
- Documentation: Update README and docstrings
Commit Messages
Follow conventional commits:
feat:New featurefix:Bug fixdocs:Documentation changestest:Test changesrefactor:Code refactoringperf:Performance improvements
Pull Request Process
- Update README.md with details of changes if applicable
- Update documentation in
docs/if needed - Add tests for new functionality
- Ensure all tests pass
- Request review from maintainers
๐ Documentation
Core Documentation
- README.md - This file
- PIPELINE_BUILDER.md - Pipeline builder guide
- PRODUCTION.md - Production deployment guide
Phase Documentation
-
Phase 5: Streaming DAG Engine
-
Phase 6: Adaptive Autoscaling
Examples
- examples/ - Code examples
run_dag_to_store.py- Complete DAG examplerun_unified_runtime_basic.py- Unified runtime examplesynthetic.json- JSON spec example- And 15+ more examples
๐บ๏ธ Roadmap
Completed โ
- v0.1-0.2: Core abstractions, pipeline builder, JSON-spec
- v0.7-0.8: DAG runtime, unified runtime, adaptive autoscaling
Current Version: v0.9.0
- โ Streaming DAG engine
- โ Event-time windowing
- โ Hash partitioning
- โ Backpressure feedback loop
- โ KEDA autoscaling
- โ 176 tests passing
Future (v0.9+)
-
v0.9: Enhanced IBKR integration
- Reconnection handling
- Pacing improvements
- Error recovery
-
v1.0: Multi-provider composite sources
- Cross-exchange arbitrage
- Provider failover
- Data fusion
-
v1.1: Advanced analytics
- Volatility surfaces
- Greeks aggregation
- ML feature engineering
-
v1.2: Horizontal scaling
- Distributed DAG execution
- Coordinator sharding
- Cross-region deployment
๐ Related Projects
- market_data_core - Core data models and provider SDKs
- market_data_store - TimescaleDB persistence layer
- market_data_ibkr - IBKR provider implementation
๐ License
MIT License - see LICENSE for details
๐ฅ Authors
- Primary Developer - @mjdevaccount
๐ Acknowledgments
- FastAPI - Modern web framework
- Pydantic - Data validation
- Prometheus - Metrics collection
- KEDA - Kubernetes autoscaling
- AsyncIO - Asynchronous I/O
๐ Support
- Issues: GitHub Issues
- Discussions: GitHub Discussions
Made with โค๏ธ for the trading community
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file market_data_pipeline-0.8.30.tar.gz.
File metadata
- Download URL: market_data_pipeline-0.8.30.tar.gz
- Upload date:
- Size: 485.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0900df565e668b328a95f94c3dc9ca6214450b924dcef41af5162bd0664a4b5f
|
|
| MD5 |
7aae3e89a39e7ec218512f768c234a2f
|
|
| BLAKE2b-256 |
7d086d80f7a803d86f7cd970120c903c657af6947da3e67addab01ec8a36c8bc
|
File details
Details for the file market_data_pipeline-0.8.30-py3-none-any.whl.
File metadata
- Download URL: market_data_pipeline-0.8.30-py3-none-any.whl
- Upload date:
- Size: 172.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a3754eea7849b65594766ff1afb0d6d47d22f348a8d3f96e49399a7a77387059
|
|
| MD5 |
303b73132b02cd9412119dc28b64a1d0
|
|
| BLAKE2b-256 |
8196446602d675fdd5feb48bb70df84db35aadf4d9eb87acabdd883b668c8192
|