Skip to main content

Common utils for trading platform (Kafka/Postgres/outbox/inbox)

Project description

Trading Common

Async utilities shared across the trading platform for working with PostgreSQL inbox/outbox tables, Kafka producers/consumers, and schema validation via trading-contracts.

Components

  • trading_common.db.DB — connection pool with inbox/outbox bootstrap, idempotent helpers, and convenience wrappers for writing to the outbox.
  • trading_common.kafka.Producer — resilient aiokafka producer prepared for SASL/SSL clusters and controlled retry behaviour.
  • trading_common.consumer_app.ConsumerApp — batching Kafka consumer that wraps message handling in PostgreSQL transactions with inbox-based deduplication.
  • trading_common.outbox.OutboxProcessor — helpers for reading, acknowledging, and vacuuming the outbox table.
  • trading_common.schema.ensure — thin wrapper around trading-contracts JSON schema validation.
  • trading_common.settings_kafka — baseline Kafka client settings hydrated from environment variables.

Installation

# Inside a virtualenv; install runtime dependencies only
pip install -e .

# Install with development tooling (pytest, black, mypy, ...)
pip install -e ".[dev]"

Configuration

PostgreSQL

Provide a DSN string understood by asyncpg. The DB.start() method creates (or verifies) the core.inbox and core.outbox tables so that services can boot against an empty database.

from trading_common.db import DB

db = DB("postgresql://postgres:postgres@127.0.0.1:55432/trading")
await db.start()

Kafka

Baseline Kafka options come from trading_common.settings_kafka. They honour the following environment variables (defaults in parentheses):

Variable Purpose
KAFKA_BOOTSTRAP_SERVERS (localhost:9092) Broker connection string
KAFKA_SECURITY_PROTOCOL (SASL_SSL) PLAINTEXT, SASL_SSL, ...
KAFKA_SASL_MECHANISM (PLAIN) Auth mechanism
KAFKA_SASL_USERNAME / KAFKA_SASL_PASSWORD (empty) SASL credentials
KAFKA_CLIENT_ID (market-data-service) Default client id
KAFKA_ENABLE_IDEMPOTENCE (true) Producer idempotence flag
KAFKA_LINGER_MS, KAFKA_BATCH_SIZE, KAFKA_COMPRESSION_TYPE, ... Producer tuning knobs
KAFKA_ENABLE_AUTO_COMMIT (false) Consumer offset mode
KAFKA_AUTO_OFFSET_RESET, KAFKA_MAX_POLL_INTERVAL_MS, ... Consumer runtime tuning

Use dict(...) to clone and tweak the defaults before passing them into the Kafka helpers so that you do not mutate the shared module-level dictionaries:

from trading_common import settings_kafka

producer = Producer(
    base_cfg={**settings_kafka.KAFKA_COMMON, "client_id": "strategy-service"},
    tuning=settings_kafka.PRODUCER_TUNING,
)

Usage Examples

Producer

import asyncio
from trading_common import settings_kafka
from trading_common.kafka import Producer

async def publish() -> None:
    producer = Producer(
        base_cfg={**settings_kafka.KAFKA_COMMON, "client_id": "md-publisher"},
        tuning=settings_kafka.PRODUCER_TUNING,
    )
    await producer.start()
    try:
        await producer.send(
            topic="md.candles",
            key="BTCUSDT",
            payload={"event_id": "...", "open": 52000, "close": 52120},
        )
    finally:
        await producer.stop()

asyncio.run(publish())

Consumer with Inbox/Outbox

import asyncio
from trading_common import settings_kafka
from trading_common.consumer_app import ConsumerApp
from trading_common.db import DB
from trading_common.schema import ensure

async def handle(con, topic, key, payload):
    ensure("md.candle.closed@v1", payload)
    # Business logic ...

async def main() -> None:
    db = DB("postgresql://postgres:postgres@127.0.0.1:55432/trading")
    consumer = ConsumerApp(
        name="market-data-service",
        db=db,
        base_cfg={**settings_kafka.KAFKA_COMMON, "client_id": "market-data-service"},
        tuning=settings_kafka.CONSUMER_TUNING,
        topics=["md.candles"],
        group_id="md-service",
        handler=handle,
    )
    await consumer.start()
    try:
        await consumer.run()
    finally:
        await consumer.stop()

asyncio.run(main())

Outbox Processing

from trading_common.outbox import OutboxProcessor

processor = OutboxProcessor(db)
async with db.pool.acquire() as con:  # pool is available after db.start()
    events = await processor.get_pending_events(con, limit=100)
    for event_id, topic, key, payload in events:
        await producer.send(topic, key, payload)
        await processor.mark_published(con, event_id)
    await processor.cleanup_old_events(con, days_old=7)

Development Workflow

pytest -m "not slow"       # Run fast tests
black src tests             # Format
isort src tests             # Import sorting
mypy src                    # Static typing
pytest --cov=trading_common --cov-report=term-missing

Testing

  • pytest — run the whole suite (asyncio tests use strict mode).
  • pytest -m "not slow" — focus on the fast checks used in CI.
  • pytest --cov=trading_common --cov-report=term-missing — optional coverage report.

These commands are mirrored in .github/workflows/test.yml, so keeping them green locally guarantees the CI job passes.

Related Projects

  • trading-contracts — JSON schemas for every event type; install alongside this package to validate messages with schema.ensure.
  • infra/ in the monorepo provides Kafka/PostgreSQL containers for local development (docker compose -f infra/docker-compose.yml up -d).

License

MIT License. See LICENSE for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

trading_common-0.2.2.tar.gz (23.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

trading_common-0.2.2-py3-none-any.whl (11.7 kB view details)

Uploaded Python 3

File details

Details for the file trading_common-0.2.2.tar.gz.

File metadata

  • Download URL: trading_common-0.2.2.tar.gz
  • Upload date:
  • Size: 23.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.13

File hashes

Hashes for trading_common-0.2.2.tar.gz
Algorithm Hash digest
SHA256 664c1b29e96c73bddf6eaf5b13e94d437e06628a74d6eeffa6f2724fdf13efb1
MD5 e50d8d7ade95b014d3d051b6a9dbfa66
BLAKE2b-256 58ab00543e6de2669b1c159c15836ec2ecde5dadb46d210b0722e9672f2fde49

See more details on using hashes here.

File details

Details for the file trading_common-0.2.2-py3-none-any.whl.

File metadata

  • Download URL: trading_common-0.2.2-py3-none-any.whl
  • Upload date:
  • Size: 11.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.13

File hashes

Hashes for trading_common-0.2.2-py3-none-any.whl
Algorithm Hash digest
SHA256 b68877c323e52e15c96ed742a01257af990c1118baa9785ed8909cb1582c0438
MD5 7c30cd66b26b8fa264f33cf8e1c79610
BLAKE2b-256 c9712069aed8bd8f36cca41b74d8b04feda7d3e7bb890b2bc4f728bf935e2a8e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page