Data store control-plane (schema, migrations, policies, ops) for market-data.
Project description
๐ market-data-store
High-performance market data infrastructure with TimescaleDB, RLS, production-ready client library, and async sinks
Hybrid architecture providing both control-plane and data-plane capabilities:
- ๐๏ธ Migrations & policies (TimescaleDB)
- ๐ง Admin endpoints: health, readiness, schema/version, migrate, retention/compression, backfills, aggregates
- ๐ Prometheus metrics + observability
- ๐
mds_clientlibrary: Production-ready Python client for Market Data Core with sync/async APIs, RLS, and tenant isolation - ๐ฐ Async sinks (Phase 4.1): High-throughput ingestion with backpressure awareness
๐ก The
mds_clientlibrary provides direct in-process access for Market Data Core. No HTTP latency - Core imports and uses the library directly with connection pooling, RLS, and TimescaleDB integration.
โก NEW: Async Sinks Layer (Phase 4.1) - Stream-oriented ingestion with automatic Prometheus metrics, error handling, and flow control readiness.
๐ฅ LATEST: Config-Driven Pipeline Support (Phase 11.3) - Provider-based OHLCV ingestion with
bars_ohlcvtable,StoreClient, audit-grade job tracking, diff-aware upserts, and compression policies. Supports 10K+ bars/sec throughput for live and backfill operations.
๐ฏ Dual Ingestion Architecture
This store supports two parallel ingestion paths:
Path 1: Tenant-Based System (Existing)
- Tables:
bars,fundamentals,news,options_snap - Client:
mds_client(MDS/AMDS) with RLS - Use Case: Multi-tenant analytics platform
- Features: Row-level security, tenant isolation, comprehensive data types
Path 2: Provider-Based Pipeline (NEW - Phase 11.3)
- Tables:
bars_ohlcv,job_runs - Client:
datastore.StoreClient/AsyncStoreClient - Use Case: Config-driven market data pipeline (live + backfill)
- Features:
- ๐ High throughput (10K+ bars/sec)
- ๐ Diff-aware upserts (replay-safe)
- ๐ฆ Smart batching (COPY for 1000+ rows)
- ๐ Job execution tracking with heartbeats
- ๐๏ธ Automatic compression (90-day hot tier)
- ๐ Prometheus metrics
Architecture Diagram:
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ market_data_store โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ โ Tenant-Based (Existing) โ โ Provider-Based (NEW) โโ
โ โ โโโโโโโโโโโโโโโโโโโโโโโโ โ โ โโโโโโโโโโโโโโโโโโโโโ โโ
โ โ โ โ โโ
โ โ mds_client (AMDS) โ โ StoreClient โโ
โ โ โ โ โ โ โโ
โ โ bars (tenant_id, RLS) โ โ bars_ohlcv (provider-based) โโ
โ โ fundamentals โ โ job_runs (audit trail) โโ
โ โ news โ โ โโ
โ โ options_snap โ โ Features: โโ
โ โ โ โ โข Diff-aware upserts โโ
โ โ Features: โ โ โข Smart batching โโ
โ โ โข Multi-tenant isolation โ โ โข Compression (90d) โโ
โ โ โข RLS enforcement โ โ โข Heartbeat tracking โโ
โ โ โข Comprehensive data types โ โ โข Config fingerprinting โโ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ โ
โ TimescaleDB (Hypertables + Compression) โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
๐ Project Layout & Description
This repository is structured as a control-plane with clear separation between infrastructure, schema management, service layer, and automation rules.
Below is a snapshot of the repo's structure with logical groupings to help new contributors and automation tools (like Cursor) navigate effectively.
๐๏ธ Infra & Ops
โโโ docker-compose.yml # Docker services configuration
โโโ Dockerfile # Container build instructions
โโโ Makefile # Build and deployment automation
โโโ docker/ # Docker-related files
โ โโโ initdb.d/ # Initial SQL scripts for DB setup
โ โโโ 00_timescale.sql # TimescaleDB initialization script
โโโ tools/ # Auxiliary scripts, CLI utilities
โโโ build_solution_manifest.py # Solution manifest builder
๐๏ธ Schema & Migrations
โโโ alembic.ini # Alembic configuration for migrations
โโโ migrations/ # Alembic migration files
โ โโโ env.py # Migration environment config
โ โโโ script.py.mako # Migration template
โ โโโ versions/ # Migration version files
โโโ src/datastore/aggregates.py # Continuous aggregates definitions
โโโ src/datastore/timescale_policies.py # TimescaleDB retention/compression policies
๐ Service Layer
โโโ src/datastore/ # Control-plane: migrations, policies, admin endpoints
โ โโโ __init__.py # Package init
โ โโโ cli.py # CLI for migrations, policies, seeds
โ โโโ config.py # App configuration
โ โโโ idempotency.py # Conflict/idempotency helpers
โ โโโ reads.py # Read ops (ops/tests support)
โ โโโ writes.py # Write ops (batch/upserts)
โ โโโ service/ # FastAPI service layer
โ โโโ app.py # FastAPI app with admin endpoints
โโโ src/mds_client/ # Client library for Market Data Core
โโโ __init__.py # Library exports (MDS, AMDS, models, batch processors)
โโโ client.py # MDS (sync client facade) with psycopg 3 + ConnectionPool
โโโ aclient.py # AMDS (async client facade) with AsyncConnectionPool
โโโ models.py # Pydantic data models with validation
โโโ sql.py # Canonical SQL with named parameters and ON CONFLICT upserts
โโโ rls.py # Row Level Security helpers (DSN options + context managers)
โโโ errors.py # Structured exception hierarchy with psycopg error mapping
โโโ utils.py # NDJSON processing with gzip support and model coercion
โโโ batch.py # Production-safe batch processing (sync + async)
โโโ cli.py # Comprehensive operational CLI with environment variables
๐ค Automation Rules
โโโ cursorrules/ # Cursor rules (automation home base)
โ โโโ index.mdc # Main rules index
โ โโโ README.md # Rules documentation
โ โโโ solution_manifest.json # Asset lookup configuration
โ โโโ rules/ # Task-specific rule definitions
๐งญ How to Navigate
๐๏ธ DB Migrations โ /migrations/versions/
๐ Admin Endpoints โ /src/datastore/service/app.py
# Run FastAPI service (admin endpoints)
uvicorn datastore.service.app:app --host 0.0.0.0 --port 8000 --factory
๐ Policies & Aggregates โ /src/datastore/timescale_policies.py, /src/datastore/aggregates.py
๐ ๏ธ Control-plane CLI โ /src/datastore/cli.py
๐ฆ Client Library โ /src/mds_client/ - For Market Data Core integration
๐ง Client CLI โ /src/mds_client/cli.py - Operational commands (mds command)
๐ค Cursor Rules & Automation โ /cursorrules/ (Cursor's self-bootstrap home)
๐๏ธ Infra & Deployment โ /docker/, Dockerfile, docker-compose.yml
โ๏ธ Project Config โ pyproject.toml
๐ Releases
๐ท๏ธ Current Release: [v0.4.0]
What's included:
- โ Core v1.1.0 contract adoption - FeedbackEvent extends Core DTO
- โ Adapter pattern - Preserves Store fields while maintaining Core compatibility
- โ
Health DTOs -
/healthzand/readyzreturn CoreHealthStatus - โ
Complete
mds_clientlibrary with sync/async APIs - โ Production-ready batch processing and backup/restore
- โ Comprehensive CLI with all operational commands
- โ Full documentation and troubleshooting guides
- โ RLS security and tenant isolation
Previous Release: [v0.3.0]
- โ Write coordinator with backpressure feedback
- โ Async sinks layer
- โ HTTP feedback broadcaster
๐ฆ Installation from Release
# Install specific version
pip install git+https://github.com/mjdevaccount/market-data-store.git@v0.1.0#subdirectory=src
# Install latest version
pip install git+https://github.com/mjdevaccount/market-data-store.git#subdirectory=src
๐ Quick Start
๐ฆ Installation
Option 1: Install from Git (Recommended)
# Install the mds_client library directly from this repository
pip install git+https://github.com/mjdevaccount/market-data-store.git@v0.1.0#subdirectory=src
# Or install the latest version
pip install git+https://github.com/mjdevaccount/market-data-store.git#subdirectory=src
Option 2: Development Setup
# Clone and setup for development
git clone https://github.com/mjdevaccount/market-data-store.git
cd market-data-store
# Create and activate virtual environment
python -m venv .venv
.\.venv\Scripts\Activate.ps1
# Install dependencies
pip install -r requirements.txt
# For development
pip install -r requirements-dev.txt
Prerequisites
- Python 3.11+
- PostgreSQL 13+ with TimescaleDB extension (required)
- Virtual environment
๐ฏ Using the Released Package
Once installed, you can use the mds_client library in your projects:
# Basic usage with cross-platform compatibility
from mds_client import MDS, Bar
from mds_client.runtime import boot_event_loop
from datetime import datetime, timezone
# Configure event loop for Windows/Docker compatibility
boot_event_loop()
# Configure client
mds = MDS({
"dsn": "postgresql://user:pass@host:port/db",
"tenant_id": "your-tenant-uuid"
})
# Write market data
bar = Bar(
tenant_id="your-tenant-uuid",
vendor="ibkr",
symbol="AAPL",
timeframe="1m",
ts=datetime.now(timezone.utc),
close_price=150.5,
volume=1000
)
mds.upsert_bars([bar])
# Use the CLI
export MDS_DSN="postgresql://user:pass@host:port/db"
export MDS_TENANT_ID="your-tenant-uuid"
# Test connection
mds ping
# Write data
mds write-bar --vendor ibkr --symbol AAPL --timeframe 1m --ts "2024-01-01T10:00:00Z" --close-price 150.5 --volume 1000
๐ฐ Async Sinks Layer (Phase 4.1 - NEW)
Status: โ Production Ready | Version: v0.2.0 | Released: October 2025
The async sinks layer provides high-throughput, observable ingestion with automatic Prometheus metrics and backpressure readiness.
Key Features
- โก Async-first: Non-blocking I/O with
asyncioandasyncpg - ๐ Auto-metrics: Prometheus counters and histograms for all writes
- ๐ Context managers: Clean resource management with
async with - ๐ฏ Type-safe: Strong typing with Pydantic models
- ๐ก๏ธ Error handling: Graceful failures with metric recording
- ๐งช Tested: 12/12 unit tests passing, integration-ready
Available Sinks
| Sink | Purpose | Model | Wraps |
|---|---|---|---|
| BarsSink | OHLCV market data | Bar |
AMDS.upsert_bars() |
| OptionsSink | Options snapshots | OptionSnap |
AMDS.upsert_options() |
| FundamentalsSink | Company financials | Fundamentals |
AMDS.upsert_fundamentals() |
| NewsSink | News & sentiment | News |
AMDS.upsert_news() |
Quick Start
import asyncio
from datetime import datetime, timezone
from mds_client import AMDS
from mds_client.models import Bar
from market_data_store.sinks import BarsSink
async def main():
# Configure AMDS client
config = {
"dsn": "postgresql://user:pass@localhost:5432/marketdata",
"tenant_id": "your-tenant-uuid",
"pool_max": 10
}
# Create sample data
bars = [
Bar(
tenant_id=config["tenant_id"],
vendor="ibkr",
symbol="AAPL",
timeframe="1m",
ts=datetime.now(timezone.utc),
close_price=150.5,
volume=1000
)
]
# Write via sink (auto-metrics + error handling)
async with AMDS(config) as amds:
async with BarsSink(amds) as sink:
await sink.write(bars)
print("โ
Bars written successfully")
if __name__ == "__main__":
asyncio.run(main())
๐ Config-Driven Pipeline Usage (Phase 11.3)
StoreClient - Provider-Based Ingestion
For config-driven pipeline operations (live/backfill), use StoreClient instead of mds_client:
from datetime import datetime, timezone
from dataclasses import dataclass
from datastore import StoreClient, JobRunTracker, compute_config_fingerprint
# Your provider returns bars matching this protocol
@dataclass
class Bar:
provider: str
symbol: str
interval: str # "5min", "1d", etc.
ts: datetime
open: float
high: float
low: float
close: float
volume: float
# Example: Write bars from IBKR provider
def ingest_live_bars(config, bars):
"""Ingest bars with job tracking and config fingerprinting."""
# Start tracking job run
tracker = JobRunTracker(config.database_url)
fingerprint = compute_config_fingerprint(config.dict())
run_id = tracker.start_run(
job_name="live_us_equities_5min",
dataset_name="us_equities_5min",
provider="ibkr_primary",
mode="live",
config_fingerprint=fingerprint,
pipeline_version="v1.2.0",
metadata={"git_hash": "abc123", "container_id": "xyz"}
)
try:
# Write bars with diff-aware upserts
with StoreClient(config.database_url) as client:
count = client.write_bars(bars, batch_size=1000)
# Update progress with heartbeat
symbols = list(set(b.symbol for b in bars))
min_ts = min(b.ts for b in bars)
max_ts = max(b.ts for b in bars)
tracker.update_progress(
run_id=run_id,
rows_written=count,
symbols=symbols,
min_ts=min_ts,
max_ts=max_ts,
heartbeat=True
)
# Mark as success
tracker.complete_run(run_id, status="success")
print(f"โ
Wrote {count} bars (run_id={run_id})")
except Exception as e:
tracker.complete_run(run_id, status="failure", error_message=str(e))
raise
# Example bars from IBKR
bars = [
Bar(
provider="ibkr_primary",
symbol="SPY",
interval="5min",
ts=datetime(2025, 1, 1, 9, 30, tzinfo=timezone.utc),
open=450.0,
high=451.0,
low=449.0,
close=450.5,
volume=1000000
),
# ... more bars
]
ingest_live_bars(config, bars)
AsyncStoreClient - High-Performance Async Ingestion
import asyncio
from datastore import AsyncStoreClient, JobRunTracker
async def ingest_bars_async(bars, db_uri):
"""Async ingestion with automatic batching."""
async with AsyncStoreClient(db_uri) as client:
count = await client.write_bars(bars, batch_size=1000)
print(f"โ
Wrote {count} bars asynchronously")
# Run async
asyncio.run(ingest_bars_async(bars, "postgresql://..."))
Key Features
| Feature | Description | Benefit |
|---|---|---|
| Diff-aware upserts | IS DISTINCT FROM in SQL |
Only updates when values change โ true idempotency |
| Smart batching | COPY for 1000+, executemany otherwise | Optimal performance for any batch size |
| Protocol-based | Duck typing via Bar protocol |
No hard dependency on specific classes |
| Job tracking | Full lifecycle with heartbeats | Audit trail + stuck job detection |
| Compression | 90-day hot tier policy | Automatic disk savings for historical data |
| Metrics | Prometheus counters/histograms | Observability out of the box |
CLI - Job Management
# List recent job runs
datastore job-runs-list --limit 50
# Inspect specific run
datastore job-runs-inspect 123
# Find stuck jobs (no heartbeat for 15m)
datastore job-runs-stuck --timeout-minutes 15
# View 24h summary
datastore job-runs-summary
# Cleanup old runs
datastore job-runs-cleanup --older-than-days 90 --confirm
Tables Created by Migration 0002
bars_ohlcv - Provider-Based OHLCV Storage
CREATE TABLE bars_ohlcv (
provider TEXT NOT NULL,
symbol TEXT NOT NULL CHECK (symbol = UPPER(symbol)),
interval TEXT NOT NULL,
ts TIMESTAMPTZ NOT NULL,
open DOUBLE PRECISION NOT NULL,
high DOUBLE PRECISION NOT NULL,
low DOUBLE PRECISION NOT NULL,
close DOUBLE PRECISION NOT NULL,
volume DOUBLE PRECISION NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY (provider, symbol, interval, ts)
);
Features:
- TimescaleDB hypertable (7-day chunks)
- Compression after 90 days (segmentby
provider,symbol,interval) - Uppercase symbol constraint
- No tenant isolation (system-wide)
job_runs - Audit-Grade Job Tracking
CREATE TABLE job_runs (
id BIGSERIAL PRIMARY KEY,
job_name TEXT NOT NULL,
provider TEXT,
status TEXT NOT NULL CHECK (status IN ('running', 'success', 'failure', 'cancelled')),
config_fingerprint TEXT,
pipeline_version TEXT,
rows_written BIGINT DEFAULT 0,
symbols TEXT[],
min_ts TIMESTAMPTZ,
max_ts TIMESTAMPTZ,
metadata JSONB DEFAULT '{}'::jsonb,
started_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
completed_at TIMESTAMPTZ,
elapsed_ms BIGINT GENERATED ALWAYS AS (...) STORED -- auto-computed
);
Features:
- Heartbeat tracking via
metadata->>'last_heartbeat' - Config fingerprinting for reproducibility
- Derived
elapsed_mscolumn - GIN index on metadata for fast heartbeat queries
Metrics Exported
Sinks Metrics (Tenant-Based System)
Sinks automatically register metrics to the global Prometheus registry:
# Total write attempts (counter)
sink_writes_total{sink="bars|options|fundamentals|news", status="success|failure"}
# Write latency (histogram)
sink_write_latency_seconds{sink="bars|options|fundamentals|news"}
StoreClient Metrics (Provider-Based Pipeline)
# Total bars written (counter)
store_bars_written_total{method="COPY|UPSERT", status="success|failure"}
# Write latency (histogram)
store_bars_write_latency_seconds{method="COPY|UPSERT"}
Key Insight: method label shows whether COPY (1000+ rows) or UPSERT (< 1000 rows) was used, enabling performance tuning.
JobRunTracker Metrics (Pipeline Audit)
# Total job runs tracked (counter)
store_job_runs_total{job_name="...", provider="...", mode="live|backfill", status="started|success|failure|cancelled"}
# Job run duration (histogram)
store_job_runs_duration_seconds{job_name="...", provider="...", mode="live|backfill", status="success|failure|cancelled"}
Key Insight: Track job lifecycle from status="started" through final status (success, failure, cancelled). Duration histogram only recorded on completion.
Scrape at: http://localhost:8081/metrics (FastAPI control-plane)
Example: All Sinks
See examples/run_store_pipeline.py for a complete example using all four sinks.
# Set environment variables
$env:MDS_DSN="postgresql://user:pass@localhost:5432/marketdata"
$env:MDS_TENANT_ID="your-tenant-uuid"
# Run pipeline example
python examples/run_store_pipeline.py
Output:
๐ market_data_store Sink Pipeline Example
Tenant: 6b6a6a8a...
๐ BarsSink Example
โ
Wrote 2 bars
๐ OptionsSink Example
โ
Wrote 1 options
๐ FundamentalsSink Example
โ
Wrote 1 fundamentals
๐ฐ NewsSink Example
โ
Wrote 1 news items
โ
All sinks completed successfully!
Benchmarks
Run performance benchmarks with examples/benchmark_sinks.py:
python examples/benchmark_sinks.py --batches 50 --batch-size 1000 --parallel 4
Example Results (Mock mode, Windows):
========================================================================
Benchmark Results (Phase 4.1)
========================================================================
BarsSink 13,674 rec/s avg latency 14.0 ms total 2,000
OptionsSink 12,899 rec/s avg latency 14.2 ms total 2,000
FundamentalsSink 12,886 rec/s avg latency 15.0 ms total 2,000
NewsSink 12,947 rec/s avg latency 14.9 ms total 2,000
========================================================================
Overall: 8,000 records in 0.61s (13,093 rec/s aggregate)
Migration from AsyncBatchProcessor
If you're currently using mds_client.batch.AsyncBatchProcessor:
Before:
from mds_client import AMDS, AsyncBatchProcessor, BatchConfig
async with AsyncBatchProcessor(amds, BatchConfig(max_rows=1000)) as processor:
for bar in stream:
await processor.add_bar(bar)
After (with sinks):
from market_data_store.sinks import BarsSink
async with BarsSink(amds) as sink:
await sink.write(batch_of_bars)
Key Differences:
- โ Sinks provide automatic Prometheus metrics
- โ Sinks use standardized logging (loguru)
- โ ๏ธ Sinks expect pre-batched data (no auto-flushing)
- โ ๏ธ AsyncBatchProcessor provides incremental adds + auto-flush
When to Use:
- Sinks: Pre-batched data, need metrics/observability
- AsyncBatchProcessor: Streaming data, need auto-batching
Testing
# Unit tests (fast, no DB)
pytest -v tests/unit/sinks/
# Smoke test
python tests/smoke_test_sinks.py
# Integration tests (requires DB)
$env:MDS_DSN="postgresql://..."
$env:MDS_TENANT_ID="uuid"
pytest -v tests/integration/ -m integration
# All tests
pytest -v tests/
Test Coverage:
- โ 12/12 unit tests passing (0.51s)
- โ 6/6 smoke test checks passing
- โ 0 linter errors
- โ Integration tests ready (DB required)
Architecture
The sinks layer is part of the hybrid architecture:
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ market-data-store (Hybrid) โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ Control-plane (datastore/) โ
โ โข Migrations, policies, admin API โ
โ โข Health, readiness, metrics endpoints โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ Data-plane (market_data_store/sinks/) โ
โ โข BarsSink, OptionsSink, etc. โ
โ โข Prometheus metrics integration โ
โ โข Backpressure readiness (Phase 4.2+) โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ Client library (mds_client/) โ
โ โข MDS (sync) + AMDS (async) facades โ
โ โข Connection pooling, RLS, validation โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Documentation
- ๐ Implementation Guide:
PHASE_4_IMPLEMENTATION.md - ๐ Cursor Rules:
cursorrules/rules/sinks_layer.mdc - ๐ Examples:
examples/
Roadmap
| Phase | Status | Description |
|---|---|---|
| 4.1 | โ Complete | Async sinks with metrics |
| 4.2 | โธ๏ธ Deferred | Write coordinator + queue |
| 4.3 | ๐ซ Blocked | Backpressure integration |
Phase 4.2+ deferred pending architecture decisions and external dependencies (market-data-pipeline v0.8.0).
๐ง Windows/Docker Compatibility
This project includes comprehensive cross-platform compatibility for both Windows development and Linux/Docker production environments with zero resource leaks and automatic cleanup:
Event Loop Configuration
- Windows: Automatically uses
WindowsSelectorEventLoopPolicyfor psycopg compatibility - Linux/macOS: Uses
uvloopfor enhanced performance when available - Automatic: No manual configuration required - just call
boot_event_loop()early in your application
Connection Pool Management
- Context managers: All clients support
withandasync withfor automatic cleanup - Zero pool warnings: Explicit pool lifecycle management eliminates cleanup warnings
- Resource management: Proper timeout-based cleanup prevents hanging threads
- Production ready: Guaranteed clean shutdown in all scenarios
Production Features
- Health monitoring: Comprehensive database health checks and Prometheus metrics
- Resource management: Centralized resource cleanup with context managers
- CLI tools: Cross-platform command-line interface with health and metrics commands
- Performance optimized: 200+ bars/second processing with clean resource management
Usage Examples
Sync Client with Context Manager:
from mds_client.runtime import boot_event_loop
from mds_client import MDS
boot_event_loop() # Configure event loop for your platform
with MDS({'dsn': 'postgresql://...', 'tenant_id': '...'}) as mds:
result = mds.upsert_bars([bar])
# Pool automatically closed on exit - NO warnings!
Async Client with Context Manager:
from mds_client.runtime import boot_event_loop
from mds_client import AMDS
boot_event_loop() # Configure event loop for your platform
async with AMDS({'dsn': 'postgresql://...', 'tenant_id': '...'}) as amds:
result = await amds.upsert_bars([bar])
# Pool automatically closed on exit - NO warnings!
Batch Processing with Context Manager:
from mds_client.batch import BatchProcessor, BatchConfig
with MDS({'dsn': 'postgresql://...', 'tenant_id': '...'}) as mds:
with BatchProcessor(mds, BatchConfig(max_rows=100)) as processor:
processor.add_bar(bar)
# Both mds and processor automatically closed on exit - NO warnings!
Health Monitoring:
# CLI Health Check
mds health --dsn "postgresql://..." --tenant-id "..."
# CLI Metrics
mds metrics --format prometheus
Performance Benchmarks
- Sync processing: 84+ bars/second with clean shutdown
- Async processing: 77+ bars/second with clean shutdown
- Batch processing: 200+ bars/second with clean shutdown
- Zero resource leaks: All pools properly closed with context managers
Testing Quickstart
# Run NDJSON round-trip tests
pytest -q tests/test_ndjson_roundtrip.py
# Run all tests (cross-platform)
pytest tests/ -v
# Run Windows compatibility tests
pytest tests/test_windows_compatibility.py -v
Development Commands
# Format and lint code
make fmt
make lint
# Run tests
make test
# Database operations
make migrate
make seed
make policies
Database Setup
Option 1: Using Docker initdb (Recommended for fresh setup)
# The schema will be automatically applied when the database container starts
# if using docker-compose with the initdb.d scripts
Option 2: Manual setup
# Run migrations (for existing databases)
python -m datastore.cli migrate
# Apply seed data
python -m datastore.cli seed
# Apply TimescaleDB policies (optional)
python -m datastore.cli policies
Option 3: Fresh schema setup
# For a completely fresh database, you can use the production schema directly
# After a fresh initdb bootstrap, stamp Alembic to prevent migration conflicts:
python -m datastore.cli stamp-head
# See DATABASE_SETUP.md for detailed instructions
๐ฆ Client Library Usage
The mds_client library provides production-ready APIs for Market Data Core:
For Market Data Core (Async)
from mds_client import AMDS, Bar
# Configuration with tenant isolation
amds = AMDS({
"dsn": "postgresql://user:pass@host:port/db?options=-c%20app.tenant_id%3D<uuid>",
"pool_max": 10
})
# Write market data
await amds.upsert_bars([Bar(
tenant_id="uuid", vendor="ibkr", symbol="AAPL", timeframe="1m",
ts=now, close_price=150.5, volume=1000
)])
# Get latest prices for hot cache
prices = await amds.latest_prices(["AAPL", "MSFT"], vendor="ibkr")
For Operations (Sync CLI)
# Health check
mds ping --dsn "postgresql://..." --tenant-id "uuid"
# Write data
mds write-bar --dsn "..." --tenant-id "uuid" --vendor "ibkr" \
--symbol "AAPL" --timeframe "1m" --ts "2024-01-01T10:00:00" \
--close-price 150.5
# Get latest prices
mds latest-prices --dsn "..." --vendor "ibkr" --symbols "AAPL,MSFT"
# Environment variable support
export MDS_DSN="postgresql://user:pass@host:port/db"
export MDS_TENANT_ID="uuid-string"
mds ping # Uses env vars automatically
๐ Market Data Store Integration
The market-data-store package is a core dependency for Market Data Core, providing:
๐ฆ Python Package Integration
# Import the market data store package
import market_data_store
# Access version information
print(f"Market Data Store version: {market_data_store.__version__}")
# The package provides access to all CLI operations and Python library
from mds_client import MDS, AMDS, Bar, Fundamentals, News, OptionSnap
๐ ๏ธ Available Operations
The market-data-store package provides comprehensive data persistence capabilities:
Data Types Supported
- Bars/OHLCV: Time-series price data with multiple timeframes
- Fundamentals: Company financial data (assets, liabilities, earnings)
- News: Market news with sentiment analysis
- Options: Options market data with Greeks (delta, gamma, IV)
CLI Operations (via mds command)
# Health & Schema
mds ping # Database connectivity check
mds schema-version # Get current schema version
mds latest-prices # Get latest prices for symbols
# Individual Write Operations
mds write-bar # Write single OHLCV bar
mds write-fundamental # Write company fundamentals
mds write-news # Write news article
mds write-option # Write options data
# Bulk Operations
mds ingest-ndjson # Bulk ingest from NDJSON files
mds ingest-ndjson-async # Async bulk ingest
# Export/Import Operations
mds dump # Export to CSV
mds restore # Import from CSV
mds restore-async # Async CSV import
mds dump-ndjson # Export to NDJSON
mds dump-ndjson-async # Async NDJSON export
# Job Queue Operations
mds enqueue-job # Queue background jobs
Python Library Usage
# Synchronous operations
from mds_client import MDS
mds = MDS({"dsn": "postgresql://...", "tenant_id": "uuid"})
# Write market data
mds.upsert_bars([bar_data])
mds.upsert_fundamentals([fundamental_data])
mds.upsert_news([news_data])
mds.upsert_options([option_data])
# Read operations
latest_prices = mds.latest_prices(["AAPL", "MSFT"], vendor="ibkr")
# Async operations
from mds_client import AMDS, AsyncBatchProcessor
amds = AMDS({"dsn": "postgresql://...", "tenant_id": "uuid", "pool_max": 10})
๐๏ธ Architecture Benefits
- Tenant Isolation: Row Level Security (RLS) ensures data separation
- TimescaleDB Integration: Optimized for time-series data
- Connection Pooling: High-performance async/sync connection management
- Batch Processing: Efficient bulk operations with configurable batching
- Idempotent Operations: Safe retry and upsert semantics
- Production Ready: Comprehensive error handling, logging, and monitoring
๐ Quick Reference
For detailed operation documentation, see:
- CLI Operations:
cursorrules/rules/market_data_store_operations.mdc - Python Library:
src/mds_client/ - Data Models:
src/mds_client/models.py
๐ Client Library Documentation
๐๏ธ Architecture Overview
The mds_client library provides a production-ready Python client for Market Data Core with two main facades:
MDS- Synchronous client for operations and simple integrationsAMDS- Asynchronous client for high-performance Market Data Core
Both clients support:
- Row Level Security (RLS) with tenant isolation via DSN options or context managers
- Connection pooling with psycopg 3 + psycopg_pool (ConnectionPool/AsyncConnectionPool)
- TimescaleDB integration with time-first composite primary keys and idempotent upserts
- Statement timeouts with per-connection configuration
- Structured error handling with psycopg error mapping and retry logic
- Job outbox pattern with idempotency key support
- Performance optimization with multiple write modes:
executemany(default),execute_values(sync), andCOPY(fastest)
๐ Data Models
The library provides strict Pydantic models for all market data types:
Bar - OHLCV Market Data
class Bar(BaseModel):
tenant_id: str # UUID for tenant isolation (tenants.id)
vendor: str # Data provider (e.g., "ibkr", "alpha_vantage")
symbol: str # Trading symbol (auto-uppercased)
timeframe: str # Time aggregation ("1m", "5m", "1h", "1d")
ts: datetime # Timestamp (UTC)
open_price: Optional[float] = None
high_price: Optional[float] = None
low_price: Optional[float] = None
close_price: Optional[float] = None
volume: Optional[int] = None
id: Optional[str] = None # UUID (not globally unique)
Fundamentals - Company Financials
class Fundamentals(BaseModel):
tenant_id: str # UUID for tenant isolation (tenants.id)
vendor: str
symbol: str
asof: datetime # As-of date for financial data
total_assets: Optional[float] = None
total_liabilities: Optional[float] = None
net_income: Optional[float] = None
eps: Optional[float] = None # Earnings per share
id: Optional[str] = None
News - Market News & Sentiment
class News(BaseModel):
tenant_id: str # UUID for tenant isolation (tenants.id)
vendor: str
published_at: datetime # Publication timestamp
title: str # News headline
id: Optional[str] = None
symbol: Optional[str] = None # Related symbol (if applicable)
url: Optional[str] = None # Source URL
sentiment_score: Optional[float] = None # -1.0 to 1.0 sentiment
OptionSnap - Options Market Data
class OptionSnap(BaseModel):
tenant_id: str # UUID for tenant isolation (tenants.id)
vendor: str
symbol: str
expiry: date # Option expiration date
option_type: str # "C" for call, "P" for put
strike: float # Strike price
ts: datetime # Snapshot timestamp
iv: Optional[float] = None # Implied volatility
delta: Optional[float] = None # Option delta
gamma: Optional[float] = None # Option gamma
oi: Optional[int] = None # Open interest
volume: Optional[int] = None # Trading volume
spot: Optional[float] = None # Underlying spot price
id: Optional[str] = None
LatestPrice - Real-time Price Snapshots
class LatestPrice(BaseModel):
tenant_id: str # UUID for tenant isolation (tenants.id)
vendor: str
symbol: str
price: float # Latest price
price_timestamp: datetime # When price was recorded
๐ง Configuration
Client Configuration
# MDS (sync) configuration with performance tuning
mds = MDS({
"dsn": "postgresql://user:pass@host:port/db?options=-c%20app.tenant_id%3D<uuid>",
"tenant_id": "uuid-string", # Optional: overrides DSN tenant_id
"app_name": "mds_client", # Application name for pg_stat_activity
"connect_timeout": 10.0, # Connection timeout in seconds
"statement_timeout_ms": 30000, # Query timeout in milliseconds
"pool_min": 1, # Minimum connections in pool
"pool_max": 10, # Maximum connections in pool
# Performance optimization settings
"write_mode": "auto", # "auto" | "executemany" | "values" | "copy"
"values_min_rows": 500, # Use execute_values for >= N rows
"values_page_size": 1000, # Page size for execute_values
"copy_min_rows": 5000, # Use COPY for >= N rows
})
# AMDS (async) configuration
amds = AMDS({
"dsn": "postgresql://user:pass@host:port/db",
"tenant_id": "uuid-string",
"app_name": "mds_client_async",
"pool_max": 10, # Async pool typically larger
"write_mode": "auto", # "auto" | "executemany" | "copy"
"copy_min_rows": 5000, # Use COPY for >= N rows
})
๐ API Reference
Synchronous Client (MDS)
Connection & Health:
health()- Check database connectivityschema_version()- Get current schema versionclose()- Close connection pool
Write Operations (Idempotent Upserts):
upsert_bars(rows: Sequence[Bar])- Insert/update OHLCV data with time-first PKsupsert_fundamentals(rows: Sequence[Fundamentals])- Insert/update financial dataupsert_news(rows: Sequence[News])- Insert/update news data (auto-generates UUID if missing)upsert_options(rows: Sequence[OptionSnap])- Insert/update options data
Read Operations:
latest_prices(symbols: Sequence[str], vendor: str)- Get latest prices for symbolsbars_window(symbol, timeframe, start, end, vendor)- Get bars in time window
Job Operations:
enqueue_job(idempotency_key, job_type, payload, priority)- Enqueue job with idempotency
Asynchronous Client (AMDS)
The async client provides identical methods with async/await syntax:
async health()- Async health checkasync schema_version()- Async schema versionasync aclose()- Close async connection poolasync upsert_bars(rows)- Async bar upsertsasync upsert_fundamentals(rows)- Async fundamentals upsertsasync upsert_news(rows)- Async news upsertsasync upsert_options(rows)- Async options upsertsasync latest_prices(symbols, vendor)- Async price queriesasync bars_window(symbol, timeframe, start, end, vendor)- Async bar queriesasync enqueue_job(...)- Async job enqueueing
๐ Row Level Security (RLS)
The library automatically handles tenant isolation through PostgreSQL's Row Level Security:
DSN Options (Recommended)
# Tenant ID embedded in connection string
dsn = "postgresql://user:pass@host:port/db?options=-c%20app.tenant_id%3D<uuid>"
mds = MDS({"dsn": dsn})
Context Manager (Fallback)
# Explicit tenant context for operations (if not using DSN options)
# Note: Current implementation uses SET app.tenant_id per connection
# Context managers would be implemented in rls.py if needed
โ ๏ธ Error Handling
The library provides structured error handling with automatic retry logic:
MDSOperationalError - Base operational error
RetryableError - Temporary errors (network, deadlocks, serialization failures)
ConstraintViolation - Database constraint violations (unique, foreign key, check)
RLSDenied - Row Level Security policy violations
TimeoutExceeded - Query or connection timeouts
All errors are automatically mapped from psycopg.errors exceptions for precise error handling.
๐ ๏ธ Operational CLI
The library includes a comprehensive CLI for operations and debugging:
Exit Codes: All CLI commands return non-zero exit codes on failure for CI/CD integration.
# Health and connectivity
mds ping --dsn "postgresql://..." --tenant-id "uuid"
# Schema information
mds schema-version --dsn "postgresql://..."
# Write operations
mds write-bar --dsn "..." --tenant-id "uuid" --vendor "ibkr" \
--symbol "AAPL" --timeframe "1m" --ts "2024-01-01T10:00:00" \
--close-price 150.5 --volume 1000
mds write-fundamental --dsn "..." --tenant-id "uuid" --vendor "alpha" \
--symbol "AAPL" --asof "2024-01-01" --eps 1.25
mds write-news --dsn "..." --tenant-id "uuid" --vendor "reuters" \
--title "AAPL Reports Strong Q4" --published-at "2024-01-01T10:00:00" \
--symbol "AAPL" --sentiment-score 0.8
mds write-option --dsn "..." --tenant-id "uuid" --vendor "ibkr" \
--symbol "AAPL" --expiry "2024-12-20" --option-type "C" --strike 200 \
--ts "2024-01-01T10:00:00" --iv 0.25 --delta 0.55
# Note: write-option targets the options_snap table (model: OptionSnap)
# Read operations
mds latest-prices --dsn "..." --vendor "ibkr" --symbols "AAPL,MSFT"
# Job queue operations
mds enqueue-job --dsn "..." --tenant-id "uuid" \
--idempotency-key "job-123" --job-type "backfill" \
--payload '{"symbol": "AAPL", "start": "2024-01-01"}' --priority "high"
# Sync NDJSON ingest (stdin or file, .gz supported)
mds ingest-ndjson bars ./bars.ndjson \
--dsn "postgresql://..." --tenant-id "uuid" \
--max-rows 2000 --max-ms 3000
# Or from stdin
cat bars.ndjson | mds ingest-ndjson bars - \
--dsn "postgresql://..." --tenant-id "uuid"
# Async NDJSON ingest (uses AMDS + AsyncBatchProcessor)
mds ingest-ndjson-async bars ./bars.ndjson \
--dsn "postgresql://..." --tenant-id "uuid" \
--max-rows 2000 --max-ms 3000
# Backup/Export operations (tenant-aware, RLS-enforced)
mds dump bars ./bars_export.csv.gz \
--dsn "postgresql://..." --tenant-id "uuid" \
--vendor "ibkr" --symbol "AAPL" --timeframe "1m" \
--start "2024-01-01T00:00:00Z" --end "2024-02-01T00:00:00Z"
# Restore/Import operations (idempotent upserts)
# Sync CSV restore
mds restore bars ./bars_export.csv.gz \
--dsn "postgresql://..." --tenant-id "uuid"
# Async CSV restore (for large files)
mds restore-async bars ./bars_export.csv.gz \
--dsn "postgresql://..." --tenant-id "uuid" \
--delimiter "," --header
# Async CSV restore from STDIN (shell pipelines)
zcat bars_export.csv.gz | mds restore-async-stdin bars \
--dsn "postgresql://..." --tenant-id "uuid"
# NDJSON dump operations (round-trip with ingest-ndjson)
mds dump-ndjson bars ./bars_export.ndjson.gz \
--dsn "postgresql://..." --tenant-id "uuid" \
--vendor "ibkr" --symbol "AAPL" --timeframe "1m" \
--start "2024-01-01T00:00:00Z" --end "2024-02-01T00:00:00Z"
# Async NDJSON dump for large exports
mds dump-ndjson-async bars ./bars_export.ndjson.gz \
--dsn "postgresql://..." --tenant-id "uuid" \
--vendor "ibkr" --symbol "AAPL"
โก Performance Optimization
The library provides multiple write modes for optimal performance across different batch sizes:
Write Mode Selection (Auto)
# Automatic mode selection based on batch size
mds = MDS({
"write_mode": "auto", # Default: intelligent selection
"values_min_rows": 500, # Use execute_values for >= 500 rows
"copy_min_rows": 5000, # Use COPY for >= 5000 rows
})
# Behavior:
# len(rows) >= 5000 โ COPY (fastest, sync + async)
# len(rows) >= 500 โ execute_values (fast, sync only)
# len(rows) < 500 โ executemany (safe default)
Manual Mode Selection
# Force specific write modes
mds = MDS({"write_mode": "executemany"}) # Always use executemany
mds = MDS({"write_mode": "values"}) # Force execute_values (sync only)
mds = MDS({"write_mode": "copy"}) # Force COPY path
Environment Variable Configuration
# Set via environment variables
export MDS_WRITE_MODE=auto
export MDS_VALUES_MIN_ROWS=500
export MDS_VALUES_PAGE_SIZE=1000
export MDS_COPY_MIN_ROWS=5000
Environment Variables Reference
| Var | Meaning |
|---|---|
MDS_DSN |
PostgreSQL DSN |
MDS_TENANT_ID |
Tenant UUID for RLS (must be tenants.id, not tenants.tenant_id) |
MDS_WRITE_MODE |
auto | executemany | values | copy |
MDS_VALUES_MIN_ROWS |
Threshold for execute_values |
MDS_COPY_MIN_ROWS |
Threshold for COPY |
Performance Characteristics
executemany: Safe default, good for small batches (< 500 rows)execute_values: Fast for mid-size batches (500-5000 rows), sync only- Install extras:
pip install "psycopg[pool,extras]"
- Install extras:
COPY: Fastest for large batches (5000+ rows), works with RLS and maintains idempotency
Troubleshooting
- Tenant ID errors: Use
tenants.id(UUID), nottenants.tenant_id(VARCHAR) - Windows async issues: Use sync
MDSclient; async pools needSelectorEventLoop - Foreign key violations: Ensure tenant exists in
tenantstable with correct UUID - RLS denied: Verify
app.tenant_idis set correctly in connection context
๐พ Backup & Restore Operations
The library provides tenant-aware backup and restore operations using PostgreSQL's COPY command:
Export Operations (Tenant-Aware Dumps)
from mds_client import MDS
from psycopg import sql as psql
mds = MDS({"dsn": "...", "tenant_id": "uuid"})
# Export bars for specific vendor/symbol/timeframe
sel = psql.SQL("""
SELECT {cols}
FROM bars
WHERE vendor = {v} AND symbol = {s} AND timeframe = '1m'
AND ts >= {start} AND ts < {end}
ORDER BY ts
""").format(
cols=psql.SQL(", ").join(psql.Identifier(c) for c in mds.TABLE_PRESETS["bars"]["cols"]),
v=psql.Literal("ibkr"),
s=psql.Literal("AAPL"),
start=psql.Literal("2024-01-01T00:00:00Z"),
end=psql.Literal("2024-02-01T00:00:00Z"),
)
# Export to gzipped CSV
mds.copy_out_csv(select_sql=sel, out_path="bars_aapl_2024-01.csv.gz")
Import Operations (Idempotent Upserts)
# Restore from CSV with upsert semantics
preset = MDS.TABLE_PRESETS["bars"]
mds.copy_restore_csv(
target="bars",
cols=preset["cols"],
conflict_cols=preset["conflict"],
update_cols=preset["update"],
src_path="bars_aapl_2024-01.csv.gz",
)
CLI Operations
# Export with filters
mds dump bars ./bars_export.csv.gz \
--dsn "postgresql://..." --tenant-id "uuid" \
--vendor "ibkr" --symbol "AAPL" --timeframe "1m" \
--start "2024-01-01T00:00:00Z" --end "2024-02-01T00:00:00Z"
# Import with upsert (sync)
mds restore bars ./bars_export.csv.gz \
--dsn "postgresql://..." --tenant-id "uuid"
# Import with upsert (async - for large files)
mds restore-async bars ./bars_export.csv.gz \
--dsn "postgresql://..." --tenant-id "uuid" \
--delimiter "," --header
# Import from STDIN (shell pipelines)
zcat bars_export.csv.gz | mds restore-async-stdin bars \
--dsn "postgresql://..." --tenant-id "uuid"
Key Features
- RLS Enforcement: All operations respect Row Level Security via
SET app.tenant_id - Consistent Snapshots: Multiple
COPYoperations in single transaction - Idempotent Restores:
INSERT ... ON CONFLICT DO UPDATEpreserves existing data - Gzip Support: Automatic compression for
.gzfiles - CSV with Headers: Self-describing format for easy inspection
- Streaming: Memory-efficient for large datasets
๐ NDJSON Export Operations
The library provides NDJSON export functionality that perfectly round-trips with the existing ingest commands:
Export Operations (JSON Streaming)
from mds_client import MDS
from psycopg import sql as psql
mds = MDS({"dsn": "...", "tenant_id": "uuid"})
# Export bars as NDJSON
sel = psql.SQL("""
SELECT {cols}
FROM bars
WHERE vendor = {v} AND symbol = {s} AND timeframe = '1m'
AND ts >= {start} AND ts < {end}
ORDER BY ts
""").format(
cols=psql.SQL(", ").join(psql.Identifier(c) for c in mds.TABLE_PRESETS["bars"]["cols"]),
v=psql.Literal("ibkr"),
s=psql.Literal("AAPL"),
start=psql.Literal("2024-01-01T00:00:00Z"),
end=psql.Literal("2024-02-01T00:00:00Z"),
)
# Export to gzipped NDJSON
mds.copy_out_ndjson(select_sql=sel, out_path="bars_aapl_2024-01.ndjson.gz")
CLI Operations
# Sync NDJSON export
mds dump-ndjson bars ./bars_export.ndjson.gz \
--dsn "postgresql://..." --tenant-id "uuid" \
--vendor "ibkr" --symbol "AAPL" --timeframe "1m" \
--start "2024-01-01T00:00:00Z" --end "2024-02-01T00:00:00Z"
# Async NDJSON export for large datasets
mds dump-ndjson-async bars ./bars_export.ndjson.gz \
--dsn "postgresql://..." --tenant-id "uuid" \
--vendor "ibkr" --symbol "AAPL"
# Round-trip: export then import
mds dump-ndjson bars ./bars.ndjson --dsn "..." --tenant-id "uuid"
mds ingest-ndjson bars ./bars.ndjson --dsn "..." --tenant-id "uuid"
# Multi-table exports with template naming
mds dump-ndjson-all \
--dsn "postgresql://..." --tenant-id "uuid" \
--vendor ibkr --symbol AAPL --timeframe 1m \
--start 2024-01-01T00:00:00Z --end 2024-02-01T00:00:00Z
# Custom naming template with directory structure
mds dump-ndjson-all "./exports/{table}/{vendor}-{symbol}-{start}-{end}.ndjson.gz" \
--dsn "postgresql://..." --tenant-id "uuid" --vendor ibkr
# Async multi-table export for large datasets
mds dump-ndjson-async-all \
--dsn "postgresql://..." --tenant-id "uuid" \
--start 2024-01-01 --end 2024-02-01
Key Features
- Round-trip compatibility: Perfect compatibility with
ingest-ndjsoncommands - JSON streaming: Uses
to_jsonb()for efficient PostgreSQL JSON serialization - RLS enforcement: All operations respect tenant isolation
- Gzip support: Automatic compression for
.ndjson.gzfiles - Async support: High-performance async exports for large datasets
- ISO timestamps: Timestamps serialized in ISO-8601 format for clean parsing
- Multi-table exports: Export all tables at once with template-based naming
- Template system: Flexible file naming with variables
{table},{vendor},{symbol},{timeframe},{start},{end} - Directory creation: Automatic parent directory creation for organized exports
Multi-Table Export Operations
# Export all tables with default naming
mds dump-ndjson-all \
--dsn "postgresql://..." --tenant-id "uuid" \
--vendor ibkr --symbol AAPL --timeframe 1m \
--start 2024-01-01T00:00:00Z --end 2024-02-01T00:00:00Z
# Creates: ./bars-AAPL-2024-01-01T00:00:00Z-2024-02-01T00:00:00Z.ndjson.gz
# ./fundamentals-AAPL-2024-01-01T00:00:00Z-2024-02-01T00:00:00Z.ndjson.gz
# ./news-AAPL-2024-01-01T00:00:00Z-2024-02-01T00:00:00Z.ndjson.gz
# ./options_snap-AAPL-2024-01-01T00:00:00Z-2024-02-01T00:00:00Z.ndjson.gz
# Note: {timeframe} is ignored for tables without that column (fundamentals/news/options)
# Custom template with directory structure
mds dump-ndjson-all "./exports/{table}/{vendor}-{symbol}-{start}-{end}.ndjson.gz" \
--dsn "postgresql://..." --tenant-id "uuid" --vendor ibkr
# Creates: ./exports/bars/ibkr-AAPL-2024-01-01-2024-02-01.ndjson.gz
# ./exports/fundamentals/ibkr-AAPL-2024-01-01-2024-02-01.ndjson.gz
# etc.
# Async version for large datasets
mds dump-ndjson-async-all \
--dsn "postgresql://..." --tenant-id "uuid" \
--start 2024-01-01 --end 2024-02-01
Template Variables
{table}: Table name (bars, fundamentals, news, options_snap){vendor}: Data vendor (e.g., ibkr, reuters) or "ALL" if not specified{symbol}: Symbol (e.g., AAPL) or "ALL" if not specified{timeframe}: Timeframe (e.g., 1m, 1h) or "ALL" if not specified{start}: Start timestamp or "MIN" if not specified{end}: End timestamp or "MAX" if not specified
๐ Batch Processing
For high-throughput scenarios, the library supports both sync and async batch processing:
Sync Batch Processing
from mds_client import MDS, BatchProcessor, BatchConfig, Bar
mds = MDS({"dsn": "...", "tenant_id": "..."})
bp = BatchProcessor(mds, BatchConfig(max_rows=1000, max_ms=5000))
for bar in big_set:
bp.add_bar(bar)
bp.flush()
Async Batch Processing
from mds_client import AMDS, AsyncBatchProcessor, BatchConfig, Bar
amds = AMDS({"dsn": "...", "tenant_id": "...", "pool_max": 10})
async with AsyncBatchProcessor(amds, BatchConfig(max_rows=1000, max_ms=5000)) as bp:
for bar in big_set:
await bp.add_bar(bar)
# Auto-flush on context exit
Key Features
- Dual API: Sync (
MDS) and async (AMDS) facades with identical interfaces - RLS Integration: Automatic tenant isolation via DSN options or per-connection SET
- TimescaleDB Compatible: Time-first composite primary keys with idempotent upserts
- Connection Pooling: Production-ready with psycopg 3 + psycopg_pool
- Performance Optimization: Multiple write modes with automatic selection:
executemany: Safe default for small batchesexecute_values: Fast mid-size batches (sync only, requires psycopg extras)COPY: Fastest for large batches (sync + async)
- Batch Processing: High-throughput ingestion with byte-accurate sizing and auto-flush tickers
- Structured Errors: Comprehensive exception hierarchy with psycopg error mapping
- Environment Variables: CLI support for MDS_DSN and MDS_TENANT_ID
- NDJSON Support: Gzip compression, stdin input, and model coercion
- Job Outbox: Idempotent job enqueueing with conflict-free guarantees
- Backup/Restore: Tenant-aware CSV export/import with RLS enforcement and idempotent upserts
- NDJSON Export: Round-trip compatible JSON dumps with
to_jsonb()streaming
Dependencies
- Production:
requirements.txt- Core runtime dependencies - Development:
requirements-dev.txt- Includes dev tools (ruff, black, pre-commit) - Project Config:
pyproject.toml- Full project metadata and build configuration
Cursor: You can regenerate this section automatically whenever the folder structure changes. The
/cursorrules/directory is your home base for self-bootstrapping rules and automation.
License
MIT License - see LICENSE file for details.
Contributing
Contributions welcome! Please open an issue or submit a pull request.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file market_data_store-0.6.28.tar.gz.
File metadata
- Download URL: market_data_store-0.6.28.tar.gz
- Upload date:
- Size: 110.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
60a15c762f29e77cf91e8b2bbf1c0688f4ec4daf0db2e32da90e625b119358d2
|
|
| MD5 |
65bbeacc53ec15f51b6c53d73c41d847
|
|
| BLAKE2b-256 |
d2e5a52528f43f06be7eff808575cad73fcb60d23555643609523ccdf12d676d
|
File details
Details for the file market_data_store-0.6.28-py3-none-any.whl.
File metadata
- Download URL: market_data_store-0.6.28-py3-none-any.whl
- Upload date:
- Size: 92.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
46f6d461be63e8aef1718dc7eb96f95ebc37092b09281edc736d75f75edb02b3
|
|
| MD5 |
d78d93b4562c604b5eedd757e444f9f0
|
|
| BLAKE2b-256 |
b0c3b7623ec8e3c800f3e2e62ad2ad0e905d66986678fc5b5777914efa511ef3
|