Skip to main content

Data store control-plane (schema, migrations, policies, ops) for market-data.

Project description

๐Ÿš€ market-data-store

High-performance market data infrastructure with TimescaleDB, RLS, production-ready client library, and async sinks

Hybrid architecture providing both control-plane and data-plane capabilities:

  • ๐Ÿ—„๏ธ Migrations & policies (TimescaleDB)
  • ๐Ÿ”ง Admin endpoints: health, readiness, schema/version, migrate, retention/compression, backfills, aggregates
  • ๐Ÿ“Š Prometheus metrics + observability
  • ๐Ÿ mds_client library: Production-ready Python client for Market Data Core with sync/async APIs, RLS, and tenant isolation
  • ๐Ÿšฐ Async sinks (Phase 4.1): High-throughput ingestion with backpressure awareness

๐Ÿ’ก The mds_client library provides direct in-process access for Market Data Core. No HTTP latency - Core imports and uses the library directly with connection pooling, RLS, and TimescaleDB integration.

โšก NEW: Async Sinks Layer (Phase 4.1) - Stream-oriented ingestion with automatic Prometheus metrics, error handling, and flow control readiness.

๐Ÿ”ฅ LATEST: Config-Driven Pipeline Support (Phase 11.3) - Provider-based OHLCV ingestion with bars_ohlcv table, StoreClient, audit-grade job tracking, diff-aware upserts, and compression policies. Supports 10K+ bars/sec throughput for live and backfill operations.


๐ŸŽฏ Dual Ingestion Architecture

This store supports two parallel ingestion paths:

Path 1: Tenant-Based System (Existing)

  • Tables: bars, fundamentals, news, options_snap
  • Client: mds_client (MDS/AMDS) with RLS
  • Use Case: Multi-tenant analytics platform
  • Features: Row-level security, tenant isolation, comprehensive data types

Path 2: Provider-Based Pipeline (NEW - Phase 11.3)

  • Tables: bars_ohlcv, job_runs
  • Client: datastore.StoreClient / AsyncStoreClient
  • Use Case: Config-driven market data pipeline (live + backfill)
  • Features:
    • ๐Ÿš€ High throughput (10K+ bars/sec)
    • ๐Ÿ”„ Diff-aware upserts (replay-safe)
    • ๐Ÿ“ฆ Smart batching (COPY for 1000+ rows)
    • ๐Ÿ“Š Job execution tracking with heartbeats
    • ๐Ÿ—œ๏ธ Automatic compression (90-day hot tier)
    • ๐Ÿ” Prometheus metrics

Architecture Diagram:

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                        market_data_store                             โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                                       โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”โ”‚
โ”‚  โ”‚ Tenant-Based (Existing)       โ”‚  โ”‚ Provider-Based (NEW)         โ”‚โ”‚
โ”‚  โ”‚ โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€      โ”‚  โ”‚ โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€        โ”‚โ”‚
โ”‚  โ”‚                               โ”‚  โ”‚                              โ”‚โ”‚
โ”‚  โ”‚ mds_client (AMDS)             โ”‚  โ”‚ StoreClient                  โ”‚โ”‚
โ”‚  โ”‚   โ†“                           โ”‚  โ”‚   โ†“                          โ”‚โ”‚
โ”‚  โ”‚ bars (tenant_id, RLS)         โ”‚  โ”‚ bars_ohlcv (provider-based)  โ”‚โ”‚
โ”‚  โ”‚ fundamentals                  โ”‚  โ”‚ job_runs (audit trail)       โ”‚โ”‚
โ”‚  โ”‚ news                          โ”‚  โ”‚                              โ”‚โ”‚
โ”‚  โ”‚ options_snap                  โ”‚  โ”‚ Features:                    โ”‚โ”‚
โ”‚  โ”‚                               โ”‚  โ”‚ โ€ข Diff-aware upserts         โ”‚โ”‚
โ”‚  โ”‚ Features:                     โ”‚  โ”‚ โ€ข Smart batching             โ”‚โ”‚
โ”‚  โ”‚ โ€ข Multi-tenant isolation      โ”‚  โ”‚ โ€ข Compression (90d)          โ”‚โ”‚
โ”‚  โ”‚ โ€ข RLS enforcement             โ”‚  โ”‚ โ€ข Heartbeat tracking         โ”‚โ”‚
โ”‚  โ”‚ โ€ข Comprehensive data types    โ”‚  โ”‚ โ€ข Config fingerprinting      โ”‚โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜โ”‚
โ”‚                                                                       โ”‚
โ”‚  TimescaleDB (Hypertables + Compression)                             โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

๐Ÿ“‚ Project Layout & Description

This repository is structured as a control-plane with clear separation between infrastructure, schema management, service layer, and automation rules.

Below is a snapshot of the repo's structure with logical groupings to help new contributors and automation tools (like Cursor) navigate effectively.

๐Ÿ—๏ธ Infra & Ops

โ”œโ”€โ”€ docker-compose.yml             # Docker services configuration
โ”œโ”€โ”€ Dockerfile                     # Container build instructions
โ”œโ”€โ”€ Makefile                       # Build and deployment automation
โ”œโ”€โ”€ docker/                        # Docker-related files
โ”‚   โ””โ”€โ”€ initdb.d/                  # Initial SQL scripts for DB setup
โ”‚       โ””โ”€โ”€ 00_timescale.sql       # TimescaleDB initialization script
โ””โ”€โ”€ tools/                         # Auxiliary scripts, CLI utilities
    โ””โ”€โ”€ build_solution_manifest.py # Solution manifest builder

๐Ÿ—„๏ธ Schema & Migrations

โ”œโ”€โ”€ alembic.ini                         # Alembic configuration for migrations
โ”œโ”€โ”€ migrations/                         # Alembic migration files
โ”‚   โ”œโ”€โ”€ env.py                          # Migration environment config
โ”‚   โ”œโ”€โ”€ script.py.mako                  # Migration template
โ”‚   โ””โ”€โ”€ versions/                       # Migration version files
โ”œโ”€โ”€ src/datastore/aggregates.py         # Continuous aggregates definitions
โ””โ”€โ”€ src/datastore/timescale_policies.py # TimescaleDB retention/compression policies

๐Ÿš€ Service Layer

โ”œโ”€โ”€ src/datastore/                 # Control-plane: migrations, policies, admin endpoints
โ”‚   โ”œโ”€โ”€ __init__.py                # Package init
โ”‚   โ”œโ”€โ”€ cli.py                     # CLI for migrations, policies, seeds
โ”‚   โ”œโ”€โ”€ config.py                  # App configuration
โ”‚   โ”œโ”€โ”€ idempotency.py             # Conflict/idempotency helpers
โ”‚   โ”œโ”€โ”€ reads.py                   # Read ops (ops/tests support)
โ”‚   โ”œโ”€โ”€ writes.py                  # Write ops (batch/upserts)
โ”‚   โ””โ”€โ”€ service/                   # FastAPI service layer
โ”‚       โ””โ”€โ”€ app.py                 # FastAPI app with admin endpoints
โ””โ”€โ”€ src/mds_client/                # Client library for Market Data Core
    โ”œโ”€โ”€ __init__.py                # Library exports (MDS, AMDS, models, batch processors)
    โ”œโ”€โ”€ client.py                  # MDS (sync client facade) with psycopg 3 + ConnectionPool
    โ”œโ”€โ”€ aclient.py                 # AMDS (async client facade) with AsyncConnectionPool
    โ”œโ”€โ”€ models.py                  # Pydantic data models with validation
    โ”œโ”€โ”€ sql.py                     # Canonical SQL with named parameters and ON CONFLICT upserts
    โ”œโ”€โ”€ rls.py                     # Row Level Security helpers (DSN options + context managers)
    โ”œโ”€โ”€ errors.py                  # Structured exception hierarchy with psycopg error mapping
    โ”œโ”€โ”€ utils.py                   # NDJSON processing with gzip support and model coercion
    โ”œโ”€โ”€ batch.py                   # Production-safe batch processing (sync + async)
    โ””โ”€โ”€ cli.py                     # Comprehensive operational CLI with environment variables

๐Ÿค– Automation Rules

โ”œโ”€โ”€ cursorrules/                   # Cursor rules (automation home base)
โ”‚   โ”œโ”€โ”€ index.mdc                  # Main rules index
โ”‚   โ”œโ”€โ”€ README.md                  # Rules documentation
โ”‚   โ”œโ”€โ”€ solution_manifest.json     # Asset lookup configuration
โ”‚   โ””โ”€โ”€ rules/                     # Task-specific rule definitions

๐Ÿงญ How to Navigate

๐Ÿ—„๏ธ DB Migrations โ†’ /migrations/versions/

๐Ÿš€ Admin Endpoints โ†’ /src/datastore/service/app.py

# Run FastAPI service (admin endpoints)
uvicorn datastore.service.app:app --host 0.0.0.0 --port 8000 --factory

๐Ÿ“Š Policies & Aggregates โ†’ /src/datastore/timescale_policies.py, /src/datastore/aggregates.py

๐Ÿ› ๏ธ Control-plane CLI โ†’ /src/datastore/cli.py

๐Ÿ“ฆ Client Library โ†’ /src/mds_client/ - For Market Data Core integration

๐Ÿ”ง Client CLI โ†’ /src/mds_client/cli.py - Operational commands (mds command)

๐Ÿค– Cursor Rules & Automation โ†’ /cursorrules/ (Cursor's self-bootstrap home)

๐Ÿ—๏ธ Infra & Deployment โ†’ /docker/, Dockerfile, docker-compose.yml

โš™๏ธ Project Config โ†’ pyproject.toml

๐Ÿ“‹ Releases

๐Ÿท๏ธ Current Release: [v0.4.0]

What's included:

  • โœ… Core v1.1.0 contract adoption - FeedbackEvent extends Core DTO
  • โœ… Adapter pattern - Preserves Store fields while maintaining Core compatibility
  • โœ… Health DTOs - /healthz and /readyz return Core HealthStatus
  • โœ… Complete mds_client library with sync/async APIs
  • โœ… Production-ready batch processing and backup/restore
  • โœ… Comprehensive CLI with all operational commands
  • โœ… Full documentation and troubleshooting guides
  • โœ… RLS security and tenant isolation

Previous Release: [v0.3.0]

  • โœ… Write coordinator with backpressure feedback
  • โœ… Async sinks layer
  • โœ… HTTP feedback broadcaster

๐Ÿ“ฆ Installation from Release

# Install specific version
pip install git+https://github.com/mjdevaccount/market-data-store.git@v0.1.0#subdirectory=src

# Install latest version
pip install git+https://github.com/mjdevaccount/market-data-store.git#subdirectory=src

๐Ÿš€ Quick Start

๐Ÿ“ฆ Installation

Option 1: Install from Git (Recommended)

# Install the mds_client library directly from this repository
pip install git+https://github.com/mjdevaccount/market-data-store.git@v0.1.0#subdirectory=src

# Or install the latest version
pip install git+https://github.com/mjdevaccount/market-data-store.git#subdirectory=src

Option 2: Development Setup

# Clone and setup for development
git clone https://github.com/mjdevaccount/market-data-store.git
cd market-data-store

# Create and activate virtual environment
python -m venv .venv
.\.venv\Scripts\Activate.ps1

# Install dependencies
pip install -r requirements.txt

# For development
pip install -r requirements-dev.txt

Prerequisites

  • Python 3.11+
  • PostgreSQL 13+ with TimescaleDB extension (required)
  • Virtual environment

๐ŸŽฏ Using the Released Package

Once installed, you can use the mds_client library in your projects:

# Basic usage with cross-platform compatibility
from mds_client import MDS, Bar
from mds_client.runtime import boot_event_loop
from datetime import datetime, timezone

# Configure event loop for Windows/Docker compatibility
boot_event_loop()

# Configure client
mds = MDS({
    "dsn": "postgresql://user:pass@host:port/db",
    "tenant_id": "your-tenant-uuid"
})

# Write market data
bar = Bar(
    tenant_id="your-tenant-uuid",
    vendor="ibkr",
    symbol="AAPL",
    timeframe="1m",
    ts=datetime.now(timezone.utc),
    close_price=150.5,
    volume=1000
)

mds.upsert_bars([bar])
# Use the CLI
export MDS_DSN="postgresql://user:pass@host:port/db"
export MDS_TENANT_ID="your-tenant-uuid"

# Test connection
mds ping

# Write data
mds write-bar --vendor ibkr --symbol AAPL --timeframe 1m --ts "2024-01-01T10:00:00Z" --close-price 150.5 --volume 1000

๐Ÿšฐ Async Sinks Layer (Phase 4.1 - NEW)

Status: โœ… Production Ready | Version: v0.2.0 | Released: October 2025

The async sinks layer provides high-throughput, observable ingestion with automatic Prometheus metrics and backpressure readiness.

Key Features

  • โšก Async-first: Non-blocking I/O with asyncio and asyncpg
  • ๐Ÿ“Š Auto-metrics: Prometheus counters and histograms for all writes
  • ๐Ÿ”„ Context managers: Clean resource management with async with
  • ๐ŸŽฏ Type-safe: Strong typing with Pydantic models
  • ๐Ÿ›ก๏ธ Error handling: Graceful failures with metric recording
  • ๐Ÿงช Tested: 12/12 unit tests passing, integration-ready

Available Sinks

Sink Purpose Model Wraps
BarsSink OHLCV market data Bar AMDS.upsert_bars()
OptionsSink Options snapshots OptionSnap AMDS.upsert_options()
FundamentalsSink Company financials Fundamentals AMDS.upsert_fundamentals()
NewsSink News & sentiment News AMDS.upsert_news()

Quick Start

import asyncio
from datetime import datetime, timezone
from mds_client import AMDS
from mds_client.models import Bar
from market_data_store.sinks import BarsSink

async def main():
    # Configure AMDS client
    config = {
        "dsn": "postgresql://user:pass@localhost:5432/marketdata",
        "tenant_id": "your-tenant-uuid",
        "pool_max": 10
    }

    # Create sample data
    bars = [
        Bar(
            tenant_id=config["tenant_id"],
            vendor="ibkr",
            symbol="AAPL",
            timeframe="1m",
            ts=datetime.now(timezone.utc),
            close_price=150.5,
            volume=1000
        )
    ]

    # Write via sink (auto-metrics + error handling)
    async with AMDS(config) as amds:
        async with BarsSink(amds) as sink:
            await sink.write(bars)

    print("โœ… Bars written successfully")

if __name__ == "__main__":
    asyncio.run(main())

๐Ÿ†• Config-Driven Pipeline Usage (Phase 11.3)

StoreClient - Provider-Based Ingestion

For config-driven pipeline operations (live/backfill), use StoreClient instead of mds_client:

from datetime import datetime, timezone
from dataclasses import dataclass
from datastore import StoreClient, JobRunTracker, compute_config_fingerprint

# Your provider returns bars matching this protocol
@dataclass
class Bar:
    provider: str
    symbol: str
    interval: str  # "5min", "1d", etc.
    ts: datetime
    open: float
    high: float
    low: float
    close: float
    volume: float

# Example: Write bars from IBKR provider
def ingest_live_bars(config, bars):
    """Ingest bars with job tracking and config fingerprinting."""

    # Start tracking job run
    tracker = JobRunTracker(config.database_url)
    fingerprint = compute_config_fingerprint(config.dict())

    run_id = tracker.start_run(
        job_name="live_us_equities_5min",
        dataset_name="us_equities_5min",
        provider="ibkr_primary",
        mode="live",
        config_fingerprint=fingerprint,
        pipeline_version="v1.2.0",
        metadata={"git_hash": "abc123", "container_id": "xyz"}
    )

    try:
        # Write bars with diff-aware upserts
        with StoreClient(config.database_url) as client:
            count = client.write_bars(bars, batch_size=1000)

        # Update progress with heartbeat
        symbols = list(set(b.symbol for b in bars))
        min_ts = min(b.ts for b in bars)
        max_ts = max(b.ts for b in bars)

        tracker.update_progress(
            run_id=run_id,
            rows_written=count,
            symbols=symbols,
            min_ts=min_ts,
            max_ts=max_ts,
            heartbeat=True
        )

        # Mark as success
        tracker.complete_run(run_id, status="success")
        print(f"โœ… Wrote {count} bars (run_id={run_id})")

    except Exception as e:
        tracker.complete_run(run_id, status="failure", error_message=str(e))
        raise

# Example bars from IBKR
bars = [
    Bar(
        provider="ibkr_primary",
        symbol="SPY",
        interval="5min",
        ts=datetime(2025, 1, 1, 9, 30, tzinfo=timezone.utc),
        open=450.0,
        high=451.0,
        low=449.0,
        close=450.5,
        volume=1000000
    ),
    # ... more bars
]

ingest_live_bars(config, bars)

AsyncStoreClient - High-Performance Async Ingestion

import asyncio
from datastore import AsyncStoreClient, JobRunTracker

async def ingest_bars_async(bars, db_uri):
    """Async ingestion with automatic batching."""

    async with AsyncStoreClient(db_uri) as client:
        count = await client.write_bars(bars, batch_size=1000)

    print(f"โœ… Wrote {count} bars asynchronously")

# Run async
asyncio.run(ingest_bars_async(bars, "postgresql://..."))

Key Features

Feature Description Benefit
Diff-aware upserts IS DISTINCT FROM in SQL Only updates when values change โ†’ true idempotency
Smart batching COPY for 1000+, executemany otherwise Optimal performance for any batch size
Protocol-based Duck typing via Bar protocol No hard dependency on specific classes
Job tracking Full lifecycle with heartbeats Audit trail + stuck job detection
Compression 90-day hot tier policy Automatic disk savings for historical data
Metrics Prometheus counters/histograms Observability out of the box

CLI - Job Management

# List recent job runs
datastore job-runs-list --limit 50

# Inspect specific run
datastore job-runs-inspect 123

# Find stuck jobs (no heartbeat for 15m)
datastore job-runs-stuck --timeout-minutes 15

# View 24h summary
datastore job-runs-summary

# Cleanup old runs
datastore job-runs-cleanup --older-than-days 90 --confirm

Tables Created by Migration 0002

bars_ohlcv - Provider-Based OHLCV Storage

CREATE TABLE bars_ohlcv (
    provider   TEXT NOT NULL,
    symbol     TEXT NOT NULL CHECK (symbol = UPPER(symbol)),
    interval   TEXT NOT NULL,
    ts         TIMESTAMPTZ NOT NULL,
    open       DOUBLE PRECISION NOT NULL,
    high       DOUBLE PRECISION NOT NULL,
    low        DOUBLE PRECISION NOT NULL,
    close      DOUBLE PRECISION NOT NULL,
    volume     DOUBLE PRECISION NOT NULL,
    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    PRIMARY KEY (provider, symbol, interval, ts)
);

Features:

  • TimescaleDB hypertable (7-day chunks)
  • Compression after 90 days (segmentby provider,symbol,interval)
  • Uppercase symbol constraint
  • No tenant isolation (system-wide)

job_runs - Audit-Grade Job Tracking

CREATE TABLE job_runs (
    id                  BIGSERIAL PRIMARY KEY,
    job_name            TEXT NOT NULL,
    provider            TEXT,
    status              TEXT NOT NULL CHECK (status IN ('running', 'success', 'failure', 'cancelled')),
    config_fingerprint  TEXT,
    pipeline_version    TEXT,
    rows_written        BIGINT DEFAULT 0,
    symbols             TEXT[],
    min_ts              TIMESTAMPTZ,
    max_ts              TIMESTAMPTZ,
    metadata            JSONB DEFAULT '{}'::jsonb,
    started_at          TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    completed_at        TIMESTAMPTZ,
    elapsed_ms          BIGINT GENERATED ALWAYS AS (...) STORED  -- auto-computed
);

Features:

  • Heartbeat tracking via metadata->>'last_heartbeat'
  • Config fingerprinting for reproducibility
  • Derived elapsed_ms column
  • GIN index on metadata for fast heartbeat queries

Metrics Exported

Sinks Metrics (Tenant-Based System)

Sinks automatically register metrics to the global Prometheus registry:

# Total write attempts (counter)
sink_writes_total{sink="bars|options|fundamentals|news", status="success|failure"}

# Write latency (histogram)
sink_write_latency_seconds{sink="bars|options|fundamentals|news"}

StoreClient Metrics (Provider-Based Pipeline)

# Total bars written (counter)
store_bars_written_total{method="COPY|UPSERT", status="success|failure"}

# Write latency (histogram)
store_bars_write_latency_seconds{method="COPY|UPSERT"}

Key Insight: method label shows whether COPY (1000+ rows) or UPSERT (< 1000 rows) was used, enabling performance tuning.

JobRunTracker Metrics (Pipeline Audit)

# Total job runs tracked (counter)
store_job_runs_total{job_name="...", provider="...", mode="live|backfill", status="started|success|failure|cancelled"}

# Job run duration (histogram)
store_job_runs_duration_seconds{job_name="...", provider="...", mode="live|backfill", status="success|failure|cancelled"}

Key Insight: Track job lifecycle from status="started" through final status (success, failure, cancelled). Duration histogram only recorded on completion.

Scrape at: http://localhost:8081/metrics (FastAPI control-plane)

Example: All Sinks

See examples/run_store_pipeline.py for a complete example using all four sinks.

# Set environment variables
$env:MDS_DSN="postgresql://user:pass@localhost:5432/marketdata"
$env:MDS_TENANT_ID="your-tenant-uuid"

# Run pipeline example
python examples/run_store_pipeline.py

Output:

๐Ÿš€ market_data_store Sink Pipeline Example
   Tenant: 6b6a6a8a...

๐Ÿ“Š BarsSink Example
  โœ… Wrote 2 bars
๐Ÿ“ˆ OptionsSink Example
  โœ… Wrote 1 options
๐Ÿ“‹ FundamentalsSink Example
  โœ… Wrote 1 fundamentals
๐Ÿ“ฐ NewsSink Example
  โœ… Wrote 1 news items

โœ… All sinks completed successfully!

Benchmarks

Run performance benchmarks with examples/benchmark_sinks.py:

python examples/benchmark_sinks.py --batches 50 --batch-size 1000 --parallel 4

Example Results (Mock mode, Windows):

========================================================================
Benchmark Results (Phase 4.1)
========================================================================
BarsSink                 13,674 rec/s   avg latency   14.0 ms   total    2,000
OptionsSink              12,899 rec/s   avg latency   14.2 ms   total    2,000
FundamentalsSink         12,886 rec/s   avg latency   15.0 ms   total    2,000
NewsSink                 12,947 rec/s   avg latency   14.9 ms   total    2,000
========================================================================

Overall: 8,000 records in 0.61s (13,093 rec/s aggregate)

Migration from AsyncBatchProcessor

If you're currently using mds_client.batch.AsyncBatchProcessor:

Before:

from mds_client import AMDS, AsyncBatchProcessor, BatchConfig

async with AsyncBatchProcessor(amds, BatchConfig(max_rows=1000)) as processor:
    for bar in stream:
        await processor.add_bar(bar)

After (with sinks):

from market_data_store.sinks import BarsSink

async with BarsSink(amds) as sink:
    await sink.write(batch_of_bars)

Key Differences:

  • โœ… Sinks provide automatic Prometheus metrics
  • โœ… Sinks use standardized logging (loguru)
  • โš ๏ธ Sinks expect pre-batched data (no auto-flushing)
  • โš ๏ธ AsyncBatchProcessor provides incremental adds + auto-flush

When to Use:

  • Sinks: Pre-batched data, need metrics/observability
  • AsyncBatchProcessor: Streaming data, need auto-batching

Testing

# Unit tests (fast, no DB)
pytest -v tests/unit/sinks/

# Smoke test
python tests/smoke_test_sinks.py

# Integration tests (requires DB)
$env:MDS_DSN="postgresql://..."
$env:MDS_TENANT_ID="uuid"
pytest -v tests/integration/ -m integration

# All tests
pytest -v tests/

Test Coverage:

  • โœ… 12/12 unit tests passing (0.51s)
  • โœ… 6/6 smoke test checks passing
  • โœ… 0 linter errors
  • โœ… Integration tests ready (DB required)

Architecture

The sinks layer is part of the hybrid architecture:

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ market-data-store (Hybrid)              โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚ Control-plane (datastore/)              โ”‚
โ”‚  โ€ข Migrations, policies, admin API      โ”‚
โ”‚  โ€ข Health, readiness, metrics endpoints โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚ Data-plane (market_data_store/sinks/)   โ”‚
โ”‚  โ€ข BarsSink, OptionsSink, etc.          โ”‚
โ”‚  โ€ข Prometheus metrics integration       โ”‚
โ”‚  โ€ข Backpressure readiness (Phase 4.2+)  โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚ Client library (mds_client/)            โ”‚
โ”‚  โ€ข MDS (sync) + AMDS (async) facades    โ”‚
โ”‚  โ€ข Connection pooling, RLS, validation  โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Documentation

Roadmap

Phase Status Description
4.1 โœ… Complete Async sinks with metrics
4.2 โธ๏ธ Deferred Write coordinator + queue
4.3 ๐Ÿšซ Blocked Backpressure integration

Phase 4.2+ deferred pending architecture decisions and external dependencies (market-data-pipeline v0.8.0).


๐Ÿ”ง Windows/Docker Compatibility

This project includes comprehensive cross-platform compatibility for both Windows development and Linux/Docker production environments with zero resource leaks and automatic cleanup:

Event Loop Configuration

  • Windows: Automatically uses WindowsSelectorEventLoopPolicy for psycopg compatibility
  • Linux/macOS: Uses uvloop for enhanced performance when available
  • Automatic: No manual configuration required - just call boot_event_loop() early in your application

Connection Pool Management

  • Context managers: All clients support with and async with for automatic cleanup
  • Zero pool warnings: Explicit pool lifecycle management eliminates cleanup warnings
  • Resource management: Proper timeout-based cleanup prevents hanging threads
  • Production ready: Guaranteed clean shutdown in all scenarios

Production Features

  • Health monitoring: Comprehensive database health checks and Prometheus metrics
  • Resource management: Centralized resource cleanup with context managers
  • CLI tools: Cross-platform command-line interface with health and metrics commands
  • Performance optimized: 200+ bars/second processing with clean resource management

Usage Examples

Sync Client with Context Manager:

from mds_client.runtime import boot_event_loop
from mds_client import MDS

boot_event_loop()  # Configure event loop for your platform

with MDS({'dsn': 'postgresql://...', 'tenant_id': '...'}) as mds:
    result = mds.upsert_bars([bar])
    # Pool automatically closed on exit - NO warnings!

Async Client with Context Manager:

from mds_client.runtime import boot_event_loop
from mds_client import AMDS

boot_event_loop()  # Configure event loop for your platform

async with AMDS({'dsn': 'postgresql://...', 'tenant_id': '...'}) as amds:
    result = await amds.upsert_bars([bar])
    # Pool automatically closed on exit - NO warnings!

Batch Processing with Context Manager:

from mds_client.batch import BatchProcessor, BatchConfig

with MDS({'dsn': 'postgresql://...', 'tenant_id': '...'}) as mds:
    with BatchProcessor(mds, BatchConfig(max_rows=100)) as processor:
        processor.add_bar(bar)
        # Both mds and processor automatically closed on exit - NO warnings!

Health Monitoring:

# CLI Health Check
mds health --dsn "postgresql://..." --tenant-id "..."

# CLI Metrics
mds metrics --format prometheus

Performance Benchmarks

  • Sync processing: 84+ bars/second with clean shutdown
  • Async processing: 77+ bars/second with clean shutdown
  • Batch processing: 200+ bars/second with clean shutdown
  • Zero resource leaks: All pools properly closed with context managers

Testing Quickstart

# Run NDJSON round-trip tests
pytest -q tests/test_ndjson_roundtrip.py

# Run all tests (cross-platform)
pytest tests/ -v

# Run Windows compatibility tests
pytest tests/test_windows_compatibility.py -v

Development Commands

# Format and lint code
make fmt
make lint

# Run tests
make test

# Database operations
make migrate
make seed
make policies

Database Setup

Option 1: Using Docker initdb (Recommended for fresh setup)

# The schema will be automatically applied when the database container starts
# if using docker-compose with the initdb.d scripts

Option 2: Manual setup

# Run migrations (for existing databases)
python -m datastore.cli migrate

# Apply seed data
python -m datastore.cli seed

# Apply TimescaleDB policies (optional)
python -m datastore.cli policies

Option 3: Fresh schema setup

# For a completely fresh database, you can use the production schema directly
# After a fresh initdb bootstrap, stamp Alembic to prevent migration conflicts:
python -m datastore.cli stamp-head

# See DATABASE_SETUP.md for detailed instructions

๐Ÿ“ฆ Client Library Usage

The mds_client library provides production-ready APIs for Market Data Core:

For Market Data Core (Async)

from mds_client import AMDS, Bar

# Configuration with tenant isolation
amds = AMDS({
    "dsn": "postgresql://user:pass@host:port/db?options=-c%20app.tenant_id%3D<uuid>",
    "pool_max": 10
})

# Write market data
await amds.upsert_bars([Bar(
    tenant_id="uuid", vendor="ibkr", symbol="AAPL", timeframe="1m",
    ts=now, close_price=150.5, volume=1000
)])

# Get latest prices for hot cache
prices = await amds.latest_prices(["AAPL", "MSFT"], vendor="ibkr")

For Operations (Sync CLI)

# Health check
mds ping --dsn "postgresql://..." --tenant-id "uuid"

# Write data
mds write-bar --dsn "..." --tenant-id "uuid" --vendor "ibkr" \
  --symbol "AAPL" --timeframe "1m" --ts "2024-01-01T10:00:00" \
  --close-price 150.5

# Get latest prices
mds latest-prices --dsn "..." --vendor "ibkr" --symbols "AAPL,MSFT"

# Environment variable support
export MDS_DSN="postgresql://user:pass@host:port/db"
export MDS_TENANT_ID="uuid-string"
mds ping  # Uses env vars automatically

๐Ÿ”— Market Data Store Integration

The market-data-store package is a core dependency for Market Data Core, providing:

๐Ÿ“ฆ Python Package Integration

# Import the market data store package
import market_data_store

# Access version information
print(f"Market Data Store version: {market_data_store.__version__}")

# The package provides access to all CLI operations and Python library
from mds_client import MDS, AMDS, Bar, Fundamentals, News, OptionSnap

๐Ÿ› ๏ธ Available Operations

The market-data-store package provides comprehensive data persistence capabilities:

Data Types Supported

  • Bars/OHLCV: Time-series price data with multiple timeframes
  • Fundamentals: Company financial data (assets, liabilities, earnings)
  • News: Market news with sentiment analysis
  • Options: Options market data with Greeks (delta, gamma, IV)

CLI Operations (via mds command)

# Health & Schema
mds ping                    # Database connectivity check
mds schema-version         # Get current schema version
mds latest-prices          # Get latest prices for symbols

# Individual Write Operations
mds write-bar              # Write single OHLCV bar
mds write-fundamental      # Write company fundamentals
mds write-news             # Write news article
mds write-option           # Write options data

# Bulk Operations
mds ingest-ndjson          # Bulk ingest from NDJSON files
mds ingest-ndjson-async    # Async bulk ingest

# Export/Import Operations
mds dump                    # Export to CSV
mds restore                 # Import from CSV
mds restore-async           # Async CSV import
mds dump-ndjson             # Export to NDJSON
mds dump-ndjson-async       # Async NDJSON export

# Job Queue Operations
mds enqueue-job             # Queue background jobs

Python Library Usage

# Synchronous operations
from mds_client import MDS
mds = MDS({"dsn": "postgresql://...", "tenant_id": "uuid"})

# Write market data
mds.upsert_bars([bar_data])
mds.upsert_fundamentals([fundamental_data])
mds.upsert_news([news_data])
mds.upsert_options([option_data])

# Read operations
latest_prices = mds.latest_prices(["AAPL", "MSFT"], vendor="ibkr")

# Async operations
from mds_client import AMDS, AsyncBatchProcessor
amds = AMDS({"dsn": "postgresql://...", "tenant_id": "uuid", "pool_max": 10})

๐Ÿ—๏ธ Architecture Benefits

  • Tenant Isolation: Row Level Security (RLS) ensures data separation
  • TimescaleDB Integration: Optimized for time-series data
  • Connection Pooling: High-performance async/sync connection management
  • Batch Processing: Efficient bulk operations with configurable batching
  • Idempotent Operations: Safe retry and upsert semantics
  • Production Ready: Comprehensive error handling, logging, and monitoring

๐Ÿ“‹ Quick Reference

For detailed operation documentation, see:

๐Ÿ“š Client Library Documentation

๐Ÿ—๏ธ Architecture Overview

The mds_client library provides a production-ready Python client for Market Data Core with two main facades:

  • MDS - Synchronous client for operations and simple integrations
  • AMDS - Asynchronous client for high-performance Market Data Core

Both clients support:

  • Row Level Security (RLS) with tenant isolation via DSN options or context managers
  • Connection pooling with psycopg 3 + psycopg_pool (ConnectionPool/AsyncConnectionPool)
  • TimescaleDB integration with time-first composite primary keys and idempotent upserts
  • Statement timeouts with per-connection configuration
  • Structured error handling with psycopg error mapping and retry logic
  • Job outbox pattern with idempotency key support
  • Performance optimization with multiple write modes: executemany (default), execute_values (sync), and COPY (fastest)

๐Ÿ“Š Data Models

The library provides strict Pydantic models for all market data types:

Bar - OHLCV Market Data

class Bar(BaseModel):
    tenant_id: str                    # UUID for tenant isolation (tenants.id)
    vendor: str                       # Data provider (e.g., "ibkr", "alpha_vantage")
    symbol: str                       # Trading symbol (auto-uppercased)
    timeframe: str                    # Time aggregation ("1m", "5m", "1h", "1d")
    ts: datetime                      # Timestamp (UTC)
    open_price: Optional[float] = None
    high_price: Optional[float] = None
    low_price: Optional[float] = None
    close_price: Optional[float] = None
    volume: Optional[int] = None
    id: Optional[str] = None          # UUID (not globally unique)

Fundamentals - Company Financials

class Fundamentals(BaseModel):
    tenant_id: str                    # UUID for tenant isolation (tenants.id)
    vendor: str
    symbol: str
    asof: datetime                    # As-of date for financial data
    total_assets: Optional[float] = None
    total_liabilities: Optional[float] = None
    net_income: Optional[float] = None
    eps: Optional[float] = None       # Earnings per share
    id: Optional[str] = None

News - Market News & Sentiment

class News(BaseModel):
    tenant_id: str                    # UUID for tenant isolation (tenants.id)
    vendor: str
    published_at: datetime            # Publication timestamp
    title: str                        # News headline
    id: Optional[str] = None
    symbol: Optional[str] = None      # Related symbol (if applicable)
    url: Optional[str] = None         # Source URL
    sentiment_score: Optional[float] = None  # -1.0 to 1.0 sentiment

OptionSnap - Options Market Data

class OptionSnap(BaseModel):
    tenant_id: str                    # UUID for tenant isolation (tenants.id)
    vendor: str
    symbol: str
    expiry: date                      # Option expiration date
    option_type: str                  # "C" for call, "P" for put
    strike: float                     # Strike price
    ts: datetime                      # Snapshot timestamp
    iv: Optional[float] = None        # Implied volatility
    delta: Optional[float] = None     # Option delta
    gamma: Optional[float] = None     # Option gamma
    oi: Optional[int] = None          # Open interest
    volume: Optional[int] = None      # Trading volume
    spot: Optional[float] = None      # Underlying spot price
    id: Optional[str] = None

LatestPrice - Real-time Price Snapshots

class LatestPrice(BaseModel):
    tenant_id: str                    # UUID for tenant isolation (tenants.id)
    vendor: str
    symbol: str
    price: float                      # Latest price
    price_timestamp: datetime         # When price was recorded

๐Ÿ”ง Configuration

Client Configuration

# MDS (sync) configuration with performance tuning
mds = MDS({
    "dsn": "postgresql://user:pass@host:port/db?options=-c%20app.tenant_id%3D<uuid>",
    "tenant_id": "uuid-string",        # Optional: overrides DSN tenant_id
    "app_name": "mds_client",          # Application name for pg_stat_activity
    "connect_timeout": 10.0,           # Connection timeout in seconds
    "statement_timeout_ms": 30000,     # Query timeout in milliseconds
    "pool_min": 1,                     # Minimum connections in pool
    "pool_max": 10,                    # Maximum connections in pool
    # Performance optimization settings
    "write_mode": "auto",              # "auto" | "executemany" | "values" | "copy"
    "values_min_rows": 500,           # Use execute_values for >= N rows
    "values_page_size": 1000,         # Page size for execute_values
    "copy_min_rows": 5000,            # Use COPY for >= N rows
})

# AMDS (async) configuration
amds = AMDS({
    "dsn": "postgresql://user:pass@host:port/db",
    "tenant_id": "uuid-string",
    "app_name": "mds_client_async",
    "pool_max": 10,                    # Async pool typically larger
    "write_mode": "auto",              # "auto" | "executemany" | "copy"
    "copy_min_rows": 5000,            # Use COPY for >= N rows
})

๐Ÿš€ API Reference

Synchronous Client (MDS)

Connection & Health:

Write Operations (Idempotent Upserts):

Read Operations:

Job Operations:

Asynchronous Client (AMDS)

The async client provides identical methods with async/await syntax:

๐Ÿ”’ Row Level Security (RLS)

The library automatically handles tenant isolation through PostgreSQL's Row Level Security:

DSN Options (Recommended)

# Tenant ID embedded in connection string
dsn = "postgresql://user:pass@host:port/db?options=-c%20app.tenant_id%3D<uuid>"
mds = MDS({"dsn": dsn})

Context Manager (Fallback)

# Explicit tenant context for operations (if not using DSN options)
# Note: Current implementation uses SET app.tenant_id per connection
# Context managers would be implemented in rls.py if needed

โš ๏ธ Error Handling

The library provides structured error handling with automatic retry logic:

MDSOperationalError - Base operational error

RetryableError - Temporary errors (network, deadlocks, serialization failures)

ConstraintViolation - Database constraint violations (unique, foreign key, check)

RLSDenied - Row Level Security policy violations

TimeoutExceeded - Query or connection timeouts

All errors are automatically mapped from psycopg.errors exceptions for precise error handling.

๐Ÿ› ๏ธ Operational CLI

The library includes a comprehensive CLI for operations and debugging:

Exit Codes: All CLI commands return non-zero exit codes on failure for CI/CD integration.

# Health and connectivity
mds ping --dsn "postgresql://..." --tenant-id "uuid"

# Schema information
mds schema-version --dsn "postgresql://..."

# Write operations
mds write-bar --dsn "..." --tenant-id "uuid" --vendor "ibkr" \
  --symbol "AAPL" --timeframe "1m" --ts "2024-01-01T10:00:00" \
  --close-price 150.5 --volume 1000

mds write-fundamental --dsn "..." --tenant-id "uuid" --vendor "alpha" \
  --symbol "AAPL" --asof "2024-01-01" --eps 1.25

mds write-news --dsn "..." --tenant-id "uuid" --vendor "reuters" \
  --title "AAPL Reports Strong Q4" --published-at "2024-01-01T10:00:00" \
  --symbol "AAPL" --sentiment-score 0.8

mds write-option --dsn "..." --tenant-id "uuid" --vendor "ibkr" \
  --symbol "AAPL" --expiry "2024-12-20" --option-type "C" --strike 200 \
  --ts "2024-01-01T10:00:00" --iv 0.25 --delta 0.55
# Note: write-option targets the options_snap table (model: OptionSnap)

# Read operations
mds latest-prices --dsn "..." --vendor "ibkr" --symbols "AAPL,MSFT"

# Job queue operations
mds enqueue-job --dsn "..." --tenant-id "uuid" \
  --idempotency-key "job-123" --job-type "backfill" \
  --payload '{"symbol": "AAPL", "start": "2024-01-01"}' --priority "high"

# Sync NDJSON ingest (stdin or file, .gz supported)
mds ingest-ndjson bars ./bars.ndjson \
  --dsn "postgresql://..." --tenant-id "uuid" \
  --max-rows 2000 --max-ms 3000

# Or from stdin
cat bars.ndjson | mds ingest-ndjson bars - \
  --dsn "postgresql://..." --tenant-id "uuid"

# Async NDJSON ingest (uses AMDS + AsyncBatchProcessor)
mds ingest-ndjson-async bars ./bars.ndjson \
  --dsn "postgresql://..." --tenant-id "uuid" \
  --max-rows 2000 --max-ms 3000

# Backup/Export operations (tenant-aware, RLS-enforced)
mds dump bars ./bars_export.csv.gz \
  --dsn "postgresql://..." --tenant-id "uuid" \
  --vendor "ibkr" --symbol "AAPL" --timeframe "1m" \
  --start "2024-01-01T00:00:00Z" --end "2024-02-01T00:00:00Z"

# Restore/Import operations (idempotent upserts)
# Sync CSV restore
mds restore bars ./bars_export.csv.gz \
  --dsn "postgresql://..." --tenant-id "uuid"

# Async CSV restore (for large files)
mds restore-async bars ./bars_export.csv.gz \
  --dsn "postgresql://..." --tenant-id "uuid" \
  --delimiter "," --header

# Async CSV restore from STDIN (shell pipelines)
zcat bars_export.csv.gz | mds restore-async-stdin bars \
  --dsn "postgresql://..." --tenant-id "uuid"

# NDJSON dump operations (round-trip with ingest-ndjson)
mds dump-ndjson bars ./bars_export.ndjson.gz \
  --dsn "postgresql://..." --tenant-id "uuid" \
  --vendor "ibkr" --symbol "AAPL" --timeframe "1m" \
  --start "2024-01-01T00:00:00Z" --end "2024-02-01T00:00:00Z"

# Async NDJSON dump for large exports
mds dump-ndjson-async bars ./bars_export.ndjson.gz \
  --dsn "postgresql://..." --tenant-id "uuid" \
  --vendor "ibkr" --symbol "AAPL"

โšก Performance Optimization

The library provides multiple write modes for optimal performance across different batch sizes:

Write Mode Selection (Auto)

# Automatic mode selection based on batch size
mds = MDS({
    "write_mode": "auto",              # Default: intelligent selection
    "values_min_rows": 500,           # Use execute_values for >= 500 rows
    "copy_min_rows": 5000,            # Use COPY for >= 5000 rows
})

# Behavior:
# len(rows) >= 5000 โ†’ COPY (fastest, sync + async)
# len(rows) >= 500  โ†’ execute_values (fast, sync only)
# len(rows) < 500   โ†’ executemany (safe default)

Manual Mode Selection

# Force specific write modes
mds = MDS({"write_mode": "executemany"})  # Always use executemany
mds = MDS({"write_mode": "values"})       # Force execute_values (sync only)
mds = MDS({"write_mode": "copy"})         # Force COPY path

Environment Variable Configuration

# Set via environment variables
export MDS_WRITE_MODE=auto
export MDS_VALUES_MIN_ROWS=500
export MDS_VALUES_PAGE_SIZE=1000
export MDS_COPY_MIN_ROWS=5000

Environment Variables Reference

Var Meaning
MDS_DSN PostgreSQL DSN
MDS_TENANT_ID Tenant UUID for RLS (must be tenants.id, not tenants.tenant_id)
MDS_WRITE_MODE auto | executemany | values | copy
MDS_VALUES_MIN_ROWS Threshold for execute_values
MDS_COPY_MIN_ROWS Threshold for COPY

Performance Characteristics

  • executemany: Safe default, good for small batches (< 500 rows)
  • execute_values: Fast for mid-size batches (500-5000 rows), sync only
    • Install extras: pip install "psycopg[pool,extras]"
  • COPY: Fastest for large batches (5000+ rows), works with RLS and maintains idempotency

Troubleshooting

  • Tenant ID errors: Use tenants.id (UUID), not tenants.tenant_id (VARCHAR)
  • Windows async issues: Use sync MDS client; async pools need SelectorEventLoop
  • Foreign key violations: Ensure tenant exists in tenants table with correct UUID
  • RLS denied: Verify app.tenant_id is set correctly in connection context

๐Ÿ’พ Backup & Restore Operations

The library provides tenant-aware backup and restore operations using PostgreSQL's COPY command:

Export Operations (Tenant-Aware Dumps)

from mds_client import MDS
from psycopg import sql as psql

mds = MDS({"dsn": "...", "tenant_id": "uuid"})

# Export bars for specific vendor/symbol/timeframe
sel = psql.SQL("""
    SELECT {cols}
    FROM bars
    WHERE vendor = {v} AND symbol = {s} AND timeframe = '1m'
      AND ts >= {start} AND ts < {end}
    ORDER BY ts
""").format(
    cols=psql.SQL(", ").join(psql.Identifier(c) for c in mds.TABLE_PRESETS["bars"]["cols"]),
    v=psql.Literal("ibkr"),
    s=psql.Literal("AAPL"),
    start=psql.Literal("2024-01-01T00:00:00Z"),
    end=psql.Literal("2024-02-01T00:00:00Z"),
)

# Export to gzipped CSV
mds.copy_out_csv(select_sql=sel, out_path="bars_aapl_2024-01.csv.gz")

Import Operations (Idempotent Upserts)

# Restore from CSV with upsert semantics
preset = MDS.TABLE_PRESETS["bars"]
mds.copy_restore_csv(
    target="bars",
    cols=preset["cols"],
    conflict_cols=preset["conflict"],
    update_cols=preset["update"],
    src_path="bars_aapl_2024-01.csv.gz",
)

CLI Operations

# Export with filters
mds dump bars ./bars_export.csv.gz \
  --dsn "postgresql://..." --tenant-id "uuid" \
  --vendor "ibkr" --symbol "AAPL" --timeframe "1m" \
  --start "2024-01-01T00:00:00Z" --end "2024-02-01T00:00:00Z"

# Import with upsert (sync)
mds restore bars ./bars_export.csv.gz \
  --dsn "postgresql://..." --tenant-id "uuid"

# Import with upsert (async - for large files)
mds restore-async bars ./bars_export.csv.gz \
  --dsn "postgresql://..." --tenant-id "uuid" \
  --delimiter "," --header

# Import from STDIN (shell pipelines)
zcat bars_export.csv.gz | mds restore-async-stdin bars \
  --dsn "postgresql://..." --tenant-id "uuid"

Key Features

  • RLS Enforcement: All operations respect Row Level Security via SET app.tenant_id
  • Consistent Snapshots: Multiple COPY operations in single transaction
  • Idempotent Restores: INSERT ... ON CONFLICT DO UPDATE preserves existing data
  • Gzip Support: Automatic compression for .gz files
  • CSV with Headers: Self-describing format for easy inspection
  • Streaming: Memory-efficient for large datasets

๐Ÿ“„ NDJSON Export Operations

The library provides NDJSON export functionality that perfectly round-trips with the existing ingest commands:

Export Operations (JSON Streaming)

from mds_client import MDS
from psycopg import sql as psql

mds = MDS({"dsn": "...", "tenant_id": "uuid"})

# Export bars as NDJSON
sel = psql.SQL("""
    SELECT {cols}
    FROM bars
    WHERE vendor = {v} AND symbol = {s} AND timeframe = '1m'
      AND ts >= {start} AND ts < {end}
    ORDER BY ts
""").format(
    cols=psql.SQL(", ").join(psql.Identifier(c) for c in mds.TABLE_PRESETS["bars"]["cols"]),
    v=psql.Literal("ibkr"),
    s=psql.Literal("AAPL"),
    start=psql.Literal("2024-01-01T00:00:00Z"),
    end=psql.Literal("2024-02-01T00:00:00Z"),
)

# Export to gzipped NDJSON
mds.copy_out_ndjson(select_sql=sel, out_path="bars_aapl_2024-01.ndjson.gz")

CLI Operations

# Sync NDJSON export
mds dump-ndjson bars ./bars_export.ndjson.gz \
  --dsn "postgresql://..." --tenant-id "uuid" \
  --vendor "ibkr" --symbol "AAPL" --timeframe "1m" \
  --start "2024-01-01T00:00:00Z" --end "2024-02-01T00:00:00Z"

# Async NDJSON export for large datasets
mds dump-ndjson-async bars ./bars_export.ndjson.gz \
  --dsn "postgresql://..." --tenant-id "uuid" \
  --vendor "ibkr" --symbol "AAPL"

# Round-trip: export then import
mds dump-ndjson bars ./bars.ndjson --dsn "..." --tenant-id "uuid"
mds ingest-ndjson bars ./bars.ndjson --dsn "..." --tenant-id "uuid"

# Multi-table exports with template naming
mds dump-ndjson-all \
  --dsn "postgresql://..." --tenant-id "uuid" \
  --vendor ibkr --symbol AAPL --timeframe 1m \
  --start 2024-01-01T00:00:00Z --end 2024-02-01T00:00:00Z

# Custom naming template with directory structure
mds dump-ndjson-all "./exports/{table}/{vendor}-{symbol}-{start}-{end}.ndjson.gz" \
  --dsn "postgresql://..." --tenant-id "uuid" --vendor ibkr

# Async multi-table export for large datasets
mds dump-ndjson-async-all \
  --dsn "postgresql://..." --tenant-id "uuid" \
  --start 2024-01-01 --end 2024-02-01

Key Features

  • Round-trip compatibility: Perfect compatibility with ingest-ndjson commands
  • JSON streaming: Uses to_jsonb() for efficient PostgreSQL JSON serialization
  • RLS enforcement: All operations respect tenant isolation
  • Gzip support: Automatic compression for .ndjson.gz files
  • Async support: High-performance async exports for large datasets
  • ISO timestamps: Timestamps serialized in ISO-8601 format for clean parsing
  • Multi-table exports: Export all tables at once with template-based naming
  • Template system: Flexible file naming with variables {table}, {vendor}, {symbol}, {timeframe}, {start}, {end}
  • Directory creation: Automatic parent directory creation for organized exports

Multi-Table Export Operations

# Export all tables with default naming
mds dump-ndjson-all \
  --dsn "postgresql://..." --tenant-id "uuid" \
  --vendor ibkr --symbol AAPL --timeframe 1m \
  --start 2024-01-01T00:00:00Z --end 2024-02-01T00:00:00Z
# Creates: ./bars-AAPL-2024-01-01T00:00:00Z-2024-02-01T00:00:00Z.ndjson.gz
#          ./fundamentals-AAPL-2024-01-01T00:00:00Z-2024-02-01T00:00:00Z.ndjson.gz
#          ./news-AAPL-2024-01-01T00:00:00Z-2024-02-01T00:00:00Z.ndjson.gz
#          ./options_snap-AAPL-2024-01-01T00:00:00Z-2024-02-01T00:00:00Z.ndjson.gz
# Note: {timeframe} is ignored for tables without that column (fundamentals/news/options)

# Custom template with directory structure
mds dump-ndjson-all "./exports/{table}/{vendor}-{symbol}-{start}-{end}.ndjson.gz" \
  --dsn "postgresql://..." --tenant-id "uuid" --vendor ibkr
# Creates: ./exports/bars/ibkr-AAPL-2024-01-01-2024-02-01.ndjson.gz
#          ./exports/fundamentals/ibkr-AAPL-2024-01-01-2024-02-01.ndjson.gz
#          etc.

# Async version for large datasets
mds dump-ndjson-async-all \
  --dsn "postgresql://..." --tenant-id "uuid" \
  --start 2024-01-01 --end 2024-02-01

Template Variables

  • {table}: Table name (bars, fundamentals, news, options_snap)
  • {vendor}: Data vendor (e.g., ibkr, reuters) or "ALL" if not specified
  • {symbol}: Symbol (e.g., AAPL) or "ALL" if not specified
  • {timeframe}: Timeframe (e.g., 1m, 1h) or "ALL" if not specified
  • {start}: Start timestamp or "MIN" if not specified
  • {end}: End timestamp or "MAX" if not specified

๐Ÿ”„ Batch Processing

For high-throughput scenarios, the library supports both sync and async batch processing:

Sync Batch Processing

from mds_client import MDS, BatchProcessor, BatchConfig, Bar

mds = MDS({"dsn": "...", "tenant_id": "..."})
bp = BatchProcessor(mds, BatchConfig(max_rows=1000, max_ms=5000))
for bar in big_set:
    bp.add_bar(bar)
bp.flush()

Async Batch Processing

from mds_client import AMDS, AsyncBatchProcessor, BatchConfig, Bar

amds = AMDS({"dsn": "...", "tenant_id": "...", "pool_max": 10})
async with AsyncBatchProcessor(amds, BatchConfig(max_rows=1000, max_ms=5000)) as bp:
    for bar in big_set:
        await bp.add_bar(bar)
# Auto-flush on context exit

Key Features

  • Dual API: Sync (MDS) and async (AMDS) facades with identical interfaces
  • RLS Integration: Automatic tenant isolation via DSN options or per-connection SET
  • TimescaleDB Compatible: Time-first composite primary keys with idempotent upserts
  • Connection Pooling: Production-ready with psycopg 3 + psycopg_pool
  • Performance Optimization: Multiple write modes with automatic selection:
    • executemany: Safe default for small batches
    • execute_values: Fast mid-size batches (sync only, requires psycopg extras)
    • COPY: Fastest for large batches (sync + async)
  • Batch Processing: High-throughput ingestion with byte-accurate sizing and auto-flush tickers
  • Structured Errors: Comprehensive exception hierarchy with psycopg error mapping
  • Environment Variables: CLI support for MDS_DSN and MDS_TENANT_ID
  • NDJSON Support: Gzip compression, stdin input, and model coercion
  • Job Outbox: Idempotent job enqueueing with conflict-free guarantees
  • Backup/Restore: Tenant-aware CSV export/import with RLS enforcement and idempotent upserts
  • NDJSON Export: Round-trip compatible JSON dumps with to_jsonb() streaming

Dependencies

Cursor: You can regenerate this section automatically whenever the folder structure changes. The /cursorrules/ directory is your home base for self-bootstrapping rules and automation.

License

MIT License - see LICENSE file for details.

Contributing

Contributions welcome! Please open an issue or submit a pull request.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

market_data_store-0.6.49.tar.gz (110.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

market_data_store-0.6.49-py3-none-any.whl (92.8 kB view details)

Uploaded Python 3

File details

Details for the file market_data_store-0.6.49.tar.gz.

File metadata

  • Download URL: market_data_store-0.6.49.tar.gz
  • Upload date:
  • Size: 110.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.13

File hashes

Hashes for market_data_store-0.6.49.tar.gz
Algorithm Hash digest
SHA256 38b26f73b6f1b671c50fe097449c476090890b18e81a96eda1b755f4e62aec26
MD5 f6783a9fa0569177614e01920f517827
BLAKE2b-256 22293a48dda95589bc92bcf32f3af4e71a0723ab6dd1e18845ba0fff4ab1f190

See more details on using hashes here.

File details

Details for the file market_data_store-0.6.49-py3-none-any.whl.

File metadata

File hashes

Hashes for market_data_store-0.6.49-py3-none-any.whl
Algorithm Hash digest
SHA256 4c203257027af6863e6b4001c7f044f322e62d2855b6ab129f03c8a2cfa2c938
MD5 da2b42bd7fd79a8e3c4225862d551ee2
BLAKE2b-256 7aa1f96532043e1829bf2572e1e7ff70c56ea457c265b8e82f89ecab4f7d2637

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page