Skip to main content

**Unified ingestion, caching, and audit layer for the Money Ex Machina ecosystem.**

Project description

mxm-dataio

Version License Python Checked with pyright

Unified ingestion, caching, and audit layer for the Money Ex Machina (MXM) ecosystem. mxm-dataio records every interaction with an external system—who/what/when, the exact bytes returned, and optional transport metadata—so downstream packages are reproducible and auditable.

Purpose & scope

What it is

  • A lightweight, protocol-agnostic layer that models external interactions as Session → Request → Response, persists metadata in SQLite, and payloads as files.
  • A small registry + adapter interface so applications can plug in sources (web APIs, files, brokers).
  • A simple runtime API (DataIoSession) to run fetch/send operations with automatic persistence and optional caching.

What it’s not

  • Not a domain database for market data or reference data.
  • Not a parsing/ETL framework. Domain packages (e.g., mxm-marketdata, mxm-refdata) parse/normalize and store into their own schemas, while linking back to mxm-dataio for provenance.

Architecture at a glance

App (e.g. mxm-datakraken)
  │
  ├─ mxm-config → cfg (machine/env/profile-specific paths)
  │
  └─ mxm-dataio
      ├─ models.py        (Session, Request, Response)
      ├─ store.py         (SQLite metadata + filesystem payloads)
      ├─ adapters.py      (MXMDataIoAdapter + Fetcher/Sender/Streamer, AdapterResult)
      ├─ registry.py      (register/resolve adapters)
      └─ api.py           (DataIoSession: runtime orchestration)

Storage layout

${paths.data_root}/
  dataio.sqlite
  responses/
    <sha256>.bin         # raw bytes (exact payload)
    <sha256>.meta.json   # optional sidecar metadata (transport info)

Core concepts

  • Session — groups related Requests (e.g., a daily run). Fields include source, mode, as_of, started_at, ended_at.
  • Request — deterministic identity of an external call: kind, method, params, optional body, and a stable hash used for caching.
  • Response — what came back: status, checksum, size_bytes, sequence (for future streaming), path of payload.
  • Adapter — small class that knows how to talk to a system, via capabilities:
    • Fetcher.fetch(request) -> bytes | AdapterResult
    • Sender.send(request, payload: bytes) -> bytes | dict | AdapterResult
    • Streamer.stream(request) (future)
  • Registry — process-local map of source → adapter instance.
  • DataIoSession — context manager that opens a Session, creates Requests, resolves the adapter, executes I/O, persists Responses, and optionally returns cached results by Request hash.

Design principles

  • Protocol-agnostic & dependency-light: stdlib only (sqlite3, pathlib, json, hashlib).
  • Deterministic & auditable: stable hashing, checksums, reproducible layout.
  • Queryable & fast: SQLite indexes on hot paths (requests.hash, requests.session_id, responses.request_id, responses.created_at, responses.checksum) for snappy cache lookups and listing.
  • Separation of concerns:
    • mxm-dataio archives raw bytes + provenance.
    • Domain packages parse/normalize into queryable schemas, while storing response_id/checksum for provenance.
  • Extensible: adapters are tiny; sidecar metadata via AdapterResult requires no DB migration.

Adapters & registry

# adapters.py (excerpt)
from dataclasses import dataclass
from typing import Any, Protocol, runtime_checkable
from mxm_dataio.models import Request

@runtime_checkable
class MXMDataIoAdapter(Protocol):
    source: str
    def describe(self) -> str: ...
    def close(self) -> None: ...

@runtime_checkable
class Fetcher(MXMDataIoAdapter, Protocol):
    def fetch(self, request: Request) -> bytes: ...

@runtime_checkable
class Sender(MXMDataIoAdapter, Protocol):
    # DataIoSession.send accepts any of these:
    # - bytes               (raw)
    # - dict[str, Any]      (stored as deterministic JSON)
    # - AdapterResult       (bytes + metadata sidecar)
    def send(self, request: Request, payload: bytes) -> Any: ...

@dataclass(slots=True)
class AdapterResult:
    data: bytes
    content_type: str | None = None
    encoding: str | None = None
    transport_status: int | None = None
    url: str | None = None
    elapsed_ms: int | None = None
    headers: dict[str, str] | None = None
    adapter_meta: dict[str, Any] | None = None
    def meta_dict(self) -> dict[str, Any]:
        return {
            k: v for k, v in {
                "content_type": self.content_type,
                "encoding": self.encoding,
                "transport_status": self.transport_status,
                "url": self.url,
                "elapsed_ms": self.elapsed_ms,
                "headers": self.headers,
                "adapter_meta": self.adapter_meta,
            }.items() if v is not None
        }
# registry.py (excerpt)
from mxm_dataio.adapters import MXMDataIoAdapter

def register(name: str, adapter: MXMDataIoAdapter) -> None: ...
def resolve_adapter(name: str) -> MXMDataIoAdapter: ...
def unregister(name: str) -> None: ...
def clear_registry() -> None: ...
def list_registered() -> list[str]: ...
def describe_registry() -> str: ...

Runtime API: DataIoSession

from mxm_dataio.api import DataIoSession
from mxm_dataio.models import RequestMethod

with DataIoSession(source="justetf", cfg=cfg) as io:
    req = io.request(kind="etf_profile", params={"isin": "LU0274211480"})
    resp = io.fetch(req)  # Adapter.fetch → bytes or AdapterResult
    print(resp.status, resp.path)  # 'ok', .../responses/<checksum>.bin
  • Capability checks.fetch() requires a Fetcher adapter; .send() requires a Sender. Raises TypeError otherwise.
  • Caching — enabled by default (use_cache=True). If a previous Request with the same hash exists, returns the most recent Response (no duplicate I/O).
  • Sidecar metadata — if an adapter returns AdapterResult, its meta_dict() is written to responses/<checksum>.meta.json (deterministic JSON, readable Unicode).

Store helpers (used internally by DataIoSession):

  • Store.mark_session_ended(session_id, ended_at) — finalize session end time.
  • Store.get_cached_response_by_request_hash(request_hash) — return the latest cached Response for an identical Request.

Configuration (via mxm-config)

Each application owns where its data lives.

from mxm_config import load_config
cfg = load_config("mxm-datakraken", env="dev", profile="default")

# cfg["paths"] should include:
#   data_root      -> base folder for this app/env/profile
#   db_path        -> e.g. ${data_root}/dataio.sqlite
#   responses_dir  -> e.g. ${data_root}/responses

Recommended defaults inside the app’s config/default.yaml:

paths:
  data_root: ${paths.data_root_base}/${mxm_env}/datakraken/${mxm_profile}
  db_path: ${paths.data_root}/dataio.sqlite
  responses_dir: ${paths.data_root}/responses

Quick start examples

1) Register an adapter and fetch

from mxm_config import load_config
from mxm_dataio.registry import register
from mxm_dataio.api import DataIoSession
from mxm_dataio.adapters import AdapterResult
from mxm_dataio.models import Request

class JustETFFetcher:
    source = "justetf"
    def fetch(self, request: Request) -> AdapterResult:
        data = b'{"name":"Example ETF"}'
        return AdapterResult(
            data=data,
            content_type="application/json",
            transport_status=200,
            url="https://api.justetf.example/etf?isin=LU0274211480",
            headers={"x-ratelimit-remaining": "99"},
        )
    def describe(self) -> str: return "JustETF demo fetcher"
    def close(self) -> None: pass

cfg = load_config("mxm-datakraken", env="dev", profile="default")
register("justetf", JustETFFetcher())

with DataIoSession(source="justetf", cfg=cfg) as io:
    req = io.request(kind="etf_profile", params={"isin": "LU0274211480"})
    resp = io.fetch(req)

print(resp.status, resp.path)
# Sidecar metadata at .../responses/<checksum>.meta.json

2) Send with metadata

from mxm_dataio.models import RequestMethod, Request

class BrokerSender:
    source = "ibkr"
    def send(self, request: Request, payload: bytes) -> AdapterResult:
        # pretend we placed an order
        return AdapterResult(
            data=b'{"order_id":12345,"status":"accepted"}',
            content_type="application/json",
            transport_status=202,
            adapter_meta={"env": "paper"},
        )
    def describe(self) -> str: return "Broker demo sender"
    def close(self) -> None: pass

register("ibkr", BrokerSender())
with DataIoSession(source="ibkr", cfg=cfg) as io:
    req = io.request(kind="place_order", method=RequestMethod.POST, body={"symbol":"CLZ5"})
    resp = io.send(req, payload=b'{"qty":1,"side":"BUY"}')

3) Read payload + sidecar metadata

from pathlib import Path
from mxm_dataio.store import Store

store = Store.get_instance(cfg)
raw = Path(resp.path).read_bytes()
meta = store.read_metadata(resp.checksum)  # raises if none was written

4) Caching behavior

with DataIoSession(source="justetf", cfg=cfg, use_cache=True) as io:
    r1 = io.request(kind="k", params={"x": 1}); resp1 = io.fetch(r1)
    r2 = io.request(kind="k", params={"x": 1}); resp2 = io.fetch(r2)
assert resp2.id == resp1.id  # reused cached Response

Testing & quality

  • Test suites (fast, isolated, tmp paths):
    • tests/test_store.py + test_store_extended.py — schema, atomicity, payload I/O, singleton, robustness.
    • tests/test_registry.py — register/resolve/duplicate/unregister/describe.
    • tests/test_api.py — lifecycle, fetch/send persistence, cache, capability guards.
    • tests/test_store_metadata.py + tests/test_api_adapterresult.py — sidecar metadata roundtrip, AdapterResult paths.
  • Style & Type: Black, Ruff, Pyright --strict.
  • No network in tests; adapters are dummies.

Design decisions (why this way?)

  • No SQLAlchemy in mxm-dataio: the metadata model is tiny, stable, and well-served by sqlite3. Domain packages may use ORMs for their richer schemas.
  • Sidecar metadata (AdapterResult): captures transport facts (status, headers, URL, elapsed) without DB migrations; JSON is deterministic and Unicode-friendly.
  • One adapter per DataIoSession: keeps audit trail clear and deterministic.
  • Caching by Request hash: avoids duplicate I/O for identical requests; still returns the raw bytes exactly as previously archived.

Roadmap

  • Replay helpers in Store (e.g., list/iterate responses by request; as-of replay).
  • CLI tools: list sessions/requests, inspect responses, dump payloads.
  • Migrations: schema versioning via a small meta table.
  • Compression / TTL: optional payload compression, retention policies.
  • Streaming: finalize Streamer as AsyncIterator[bytes], persist sequenced frames.
  • Reference adapters: LocalFileFetcher, minimal HttpFetcher (stdlib).

Repository layout

mxm_dataio/
  __init__.py
  api.py
  adapters.py
  models.py
  registry.py
  store.py
tests/
  test_models.py
  test_store.py
  test_store_extended.py
  test_registry.py
  test_api.py
  test_store_metadata.py
  test_api_adapterresult.py
config/
  default.yaml    # (in consumer apps; shown here for reference)

Versioning & compatibility

  • Python 3.11+ recommended (tested with latest stable).
  • No external runtime dependencies besides mxm-config for path resolution.
  • Semantic versioning once published; internal changes guarded by tests.

Using with domain packages

  • mxm-datakraken (reference data): adapters fetch raw web/regulator data; package parses into normalized entities; every entity stores source_response_id.
  • mxm-marketdata (prices/volumes/CA): adapters fetch OHLCV; package writes columnar/DB; rows carry source_response_id for audit/replay.
  • mxm-refdata: reconciliation logic; may replay past Responses by as_of.

License

MIT License. See LICENSE.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mxm_dataio-0.2.0.tar.gz (19.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

mxm_dataio-0.2.0-py3-none-any.whl (20.6 kB view details)

Uploaded Python 3

File details

Details for the file mxm_dataio-0.2.0.tar.gz.

File metadata

  • Download URL: mxm_dataio-0.2.0.tar.gz
  • Upload date:
  • Size: 19.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for mxm_dataio-0.2.0.tar.gz
Algorithm Hash digest
SHA256 77546fbc9fe67624f49f047166b9cf666ad922435505e1f0e81ce30b6c11b6d4
MD5 cbf31269238a9f3a2939a2df906d51a2
BLAKE2b-256 73ee7f74531adc56bac0b87ffbd6bc243fd271282a626ac4b24f5ce1c113b8d5

See more details on using hashes here.

Provenance

The following attestation bundles were made for mxm_dataio-0.2.0.tar.gz:

Publisher: release.yml on moneyexmachina/mxm-dataio

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file mxm_dataio-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: mxm_dataio-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 20.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for mxm_dataio-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 a7ae99e963e50a092efeaf00f5d48d7c8cfca01c1e90e5ff03f45973f3bf368c
MD5 8378b7b15501e20f5c34da073b024e33
BLAKE2b-256 3f475d3e819c9cf16f81b700df91860c4d4e059a56bf3bb8e29cb4bd68e0c55a

See more details on using hashes here.

Provenance

The following attestation bundles were made for mxm_dataio-0.2.0-py3-none-any.whl:

Publisher: release.yml on moneyexmachina/mxm-dataio

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page