Skip to main content

Common utils for trading platform (Kafka/Postgres/outbox/inbox)

Project description

Trading Common

Async utilities shared across the trading platform for working with PostgreSQL inbox/outbox tables, Kafka producers/consumers, and schema validation via trading-contracts.

Components

  • trading_common.db.DB — connection pool with inbox/outbox bootstrap, idempotent helpers, and convenience wrappers for writing to the outbox.
  • trading_common.kafka.Producer — resilient aiokafka producer prepared for SASL/SSL clusters and controlled retry behaviour.
  • trading_common.consumer_app.ConsumerApp — batching Kafka consumer that wraps message handling in PostgreSQL transactions with inbox-based deduplication.
  • trading_common.outbox.OutboxProcessor — helpers for reading, acknowledging, and vacuuming the outbox table.
  • trading_common.schema.ensure — thin wrapper around trading-contracts JSON schema validation.
  • trading_common.settings_kafka — baseline Kafka client settings hydrated from environment variables.

Installation

# Inside a virtualenv; install runtime dependencies only
pip install -e .

# Install with development tooling (pytest, black, mypy, ...)
pip install -e ".[dev]"

Configuration

PostgreSQL

Provide a DSN string understood by asyncpg. The DB.start() method creates (or verifies) the core.inbox and core.outbox tables so that services can boot against an empty database.

from trading_common.db import DB

db = DB("postgresql://postgres:postgres@127.0.0.1:55432/trading")
await db.start()

Kafka

Baseline Kafka options come from trading_common.settings_kafka. They honour the following environment variables (defaults in parentheses):

Variable Purpose
KAFKA_BOOTSTRAP_SERVERS (localhost:9092) Broker connection string
KAFKA_SECURITY_PROTOCOL (SASL_SSL) PLAINTEXT, SASL_SSL, ...
KAFKA_SASL_MECHANISM (PLAIN) Auth mechanism
KAFKA_SASL_USERNAME / KAFKA_SASL_PASSWORD (empty) SASL credentials
KAFKA_CLIENT_ID (market-data-service) Default client id
KAFKA_ENABLE_IDEMPOTENCE (true) Producer idempotence flag
KAFKA_LINGER_MS, KAFKA_BATCH_SIZE, KAFKA_COMPRESSION_TYPE, ... Producer tuning knobs
KAFKA_ENABLE_AUTO_COMMIT (false) Consumer offset mode
KAFKA_AUTO_OFFSET_RESET, KAFKA_MAX_POLL_INTERVAL_MS, ... Consumer runtime tuning

Use dict(...) to clone and tweak the defaults before passing them into the Kafka helpers so that you do not mutate the shared module-level dictionaries:

from trading_common import settings_kafka

producer = Producer(
    base_cfg={**settings_kafka.KAFKA_COMMON, "client_id": "strategy-service"},
    tuning=settings_kafka.PRODUCER_TUNING,
)

Usage Examples

Producer

import asyncio
from trading_common import settings_kafka
from trading_common.kafka import Producer

async def publish() -> None:
    producer = Producer(
        base_cfg={**settings_kafka.KAFKA_COMMON, "client_id": "md-publisher"},
        tuning=settings_kafka.PRODUCER_TUNING,
    )
    await producer.start()
    try:
        await producer.send(
            topic="md.candles",
            key="BTCUSDT",
            payload={"event_id": "...", "open": 52000, "close": 52120},
        )
    finally:
        await producer.stop()

asyncio.run(publish())

Consumer with Inbox/Outbox

import asyncio
from trading_common import settings_kafka
from trading_common.consumer_app import ConsumerApp
from trading_common.db import DB
from trading_common.schema import ensure

async def handle(con, topic, key, payload):
    ensure("md.candle.closed@v1", payload)
    # Business logic ...

async def main() -> None:
    db = DB("postgresql://postgres:postgres@127.0.0.1:55432/trading")
    consumer = ConsumerApp(
        name="market-data-service",
        db=db,
        base_cfg={**settings_kafka.KAFKA_COMMON, "client_id": "market-data-service"},
        tuning=settings_kafka.CONSUMER_TUNING,
        topics=["md.candles"],
        group_id="md-service",
        handler=handle,
    )
    await consumer.start()
    try:
        await consumer.run()
    finally:
        await consumer.stop()

asyncio.run(main())

Outbox Processing

from trading_common.outbox import OutboxProcessor

processor = OutboxProcessor(db)
async with db.pool.acquire() as con:  # pool is available after db.start()
    events = await processor.get_pending_events(con, limit=100)
    for event_id, topic, key, payload in events:
        await producer.send(topic, key, payload)
        await processor.mark_published(con, event_id)
    await processor.cleanup_old_events(con, days_old=7)

Development Workflow

pytest -m "not slow"       # Run fast tests
black src tests             # Format
isort src tests             # Import sorting
mypy src                    # Static typing
pytest --cov=trading_common --cov-report=term-missing

Testing

  • pytest — run the whole suite (asyncio tests use strict mode).
  • pytest -m "not slow" — focus on the fast checks used in CI.
  • pytest --cov=trading_common --cov-report=term-missing — optional coverage report.

These commands are mirrored in .github/workflows/test.yml, so keeping them green locally guarantees the CI job passes.

Related Projects

  • trading-contracts — JSON schemas for every event type; install alongside this package to validate messages with schema.ensure.
  • infra/ in the monorepo provides Kafka/PostgreSQL containers for local development (docker compose -f infra/docker-compose.yml up -d).

License

MIT License. See LICENSE for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

trading_common-0.2.0.tar.gz (21.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

trading_common-0.2.0-py3-none-any.whl (10.9 kB view details)

Uploaded Python 3

File details

Details for the file trading_common-0.2.0.tar.gz.

File metadata

  • Download URL: trading_common-0.2.0.tar.gz
  • Upload date:
  • Size: 21.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.13

File hashes

Hashes for trading_common-0.2.0.tar.gz
Algorithm Hash digest
SHA256 12917060f81f377b3951ce3cc44d619d542c9910296b7439f05c9bbcb7c99bf8
MD5 bd266342493fe715f838ee1050efc70b
BLAKE2b-256 5aef790809fd334e9b5ba7f937bb8e151de3ce3d322883205989f23775d1955c

See more details on using hashes here.

File details

Details for the file trading_common-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: trading_common-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 10.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.13

File hashes

Hashes for trading_common-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 a132a439e39c67db8273e5a0f93fbcc62570cb7992f97ef865adce050098b6a0
MD5 cb36c2f07cdf6cbcfb84f8f2424c541e
BLAKE2b-256 0f0f28cc2fcb72327736aed142fe4c963dd0d654c84db33bc0424e1da735c745

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page