Skip to main content

Common utils for trading platform (Kafka/Postgres/outbox/inbox)

Project description

Trading Common

Async utilities shared across the trading platform for working with PostgreSQL inbox/outbox tables, Kafka producers/consumers, and schema validation via trading-contracts.

Components

  • trading_common.db.DB — connection pool with inbox/outbox bootstrap, idempotent helpers, and convenience wrappers for writing to the outbox.
  • trading_common.kafka.Producer — resilient aiokafka producer prepared for SASL/SSL clusters and controlled retry behaviour.
  • trading_common.consumer_app.ConsumerApp — batching Kafka consumer that wraps message handling in PostgreSQL transactions with inbox-based deduplication.
  • trading_common.outbox.OutboxProcessor — helpers for reading, acknowledging, and vacuuming the outbox table.
  • trading_common.schema.ensure — thin wrapper around trading-contracts JSON schema validation.
  • trading_common.settings_kafka — baseline Kafka client settings hydrated from environment variables.

Installation

# Inside a virtualenv; install runtime dependencies only
pip install -e .

# Install with development tooling (pytest, black, mypy, ...)
pip install -e ".[dev]"

Configuration

PostgreSQL

Provide a DSN string understood by asyncpg. The DB.start() method creates (or verifies) the core.inbox and core.outbox tables so that services can boot against an empty database.

from trading_common.db import DB

db = DB("postgresql://postgres:postgres@127.0.0.1:55432/trading")
await db.start()

Kafka

Baseline Kafka options come from trading_common.settings_kafka. They honour the following environment variables (defaults in parentheses):

Variable Purpose
KAFKA_BOOTSTRAP_SERVERS (localhost:9092) Broker connection string
KAFKA_SECURITY_PROTOCOL (SASL_SSL) PLAINTEXT, SASL_SSL, ...
KAFKA_SASL_MECHANISM (PLAIN) Auth mechanism
KAFKA_SASL_USERNAME / KAFKA_SASL_PASSWORD (empty) SASL credentials
KAFKA_CLIENT_ID (market-data-service) Default client id
KAFKA_ENABLE_IDEMPOTENCE (true) Producer idempotence flag
KAFKA_LINGER_MS, KAFKA_BATCH_SIZE, KAFKA_COMPRESSION_TYPE, ... Producer tuning knobs
KAFKA_ENABLE_AUTO_COMMIT (false) Consumer offset mode
KAFKA_AUTO_OFFSET_RESET, KAFKA_MAX_POLL_INTERVAL_MS, ... Consumer runtime tuning

Use dict(...) to clone and tweak the defaults before passing them into the Kafka helpers so that you do not mutate the shared module-level dictionaries:

from trading_common import settings_kafka

producer = Producer(
    base_cfg={**settings_kafka.KAFKA_COMMON, "client_id": "strategy-service"},
    tuning=settings_kafka.PRODUCER_TUNING,
)

Usage Examples

Producer

import asyncio
from trading_common import settings_kafka
from trading_common.kafka import Producer

async def publish() -> None:
    producer = Producer(
        base_cfg={**settings_kafka.KAFKA_COMMON, "client_id": "md-publisher"},
        tuning=settings_kafka.PRODUCER_TUNING,
    )
    await producer.start()
    try:
        await producer.send(
            topic="md.candles",
            key="BTCUSDT",
            payload={"event_id": "...", "open": 52000, "close": 52120},
        )
    finally:
        await producer.stop()

asyncio.run(publish())

Consumer with Inbox/Outbox

import asyncio
from trading_common import settings_kafka
from trading_common.consumer_app import ConsumerApp
from trading_common.db import DB
from trading_common.schema import ensure

async def handle(con, topic, key, payload):
    ensure("md.candle.closed@v1", payload)
    # Business logic ...

async def main() -> None:
    db = DB("postgresql://postgres:postgres@127.0.0.1:55432/trading")
    consumer = ConsumerApp(
        name="market-data-service",
        db=db,
        base_cfg={**settings_kafka.KAFKA_COMMON, "client_id": "market-data-service"},
        tuning=settings_kafka.CONSUMER_TUNING,
        topics=["md.candles"],
        group_id="md-service",
        handler=handle,
    )
    await consumer.start()
    try:
        await consumer.run()
    finally:
        await consumer.stop()

asyncio.run(main())

Outbox Processing

from trading_common.outbox import OutboxProcessor

processor = OutboxProcessor(db)
async with db.pool.acquire() as con:  # pool is available after db.start()
    events = await processor.get_pending_events(con, limit=100)
    for event_id, topic, key, payload in events:
        await producer.send(topic, key, payload)
        await processor.mark_published(con, event_id)
    await processor.cleanup_old_events(con, days_old=7)

Development Workflow

pytest -m "not slow"       # Run fast tests
black src tests             # Format
isort src tests             # Import sorting
mypy src                    # Static typing
pytest --cov=trading_common --cov-report=term-missing

Testing

  • pytest — run the whole suite (asyncio tests use strict mode).
  • pytest -m "not slow" — focus on the fast checks used in CI.
  • pytest --cov=trading_common --cov-report=term-missing — optional coverage report.

These commands are mirrored in .github/workflows/test.yml, so keeping them green locally guarantees the CI job passes.

Related Projects

  • trading-contracts — JSON schemas for every event type; install alongside this package to validate messages with schema.ensure.
  • infra/ in the monorepo provides Kafka/PostgreSQL containers for local development (docker compose -f infra/docker-compose.yml up -d).

License

MIT License. See LICENSE for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

trading_common-0.2.4.tar.gz (23.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

trading_common-0.2.4-py3-none-any.whl (12.0 kB view details)

Uploaded Python 3

File details

Details for the file trading_common-0.2.4.tar.gz.

File metadata

  • Download URL: trading_common-0.2.4.tar.gz
  • Upload date:
  • Size: 23.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.13

File hashes

Hashes for trading_common-0.2.4.tar.gz
Algorithm Hash digest
SHA256 67e4ca3094717affe6f6dc81e637b177654373eb1bd95c9e7e705cd8f8ab1cbf
MD5 413586292988ff6310a12e55ece60f48
BLAKE2b-256 84b430ae0497e8e7a87506c54bb51823108abf3a3dff95d0177f2df9aa3f7003

See more details on using hashes here.

File details

Details for the file trading_common-0.2.4-py3-none-any.whl.

File metadata

  • Download URL: trading_common-0.2.4-py3-none-any.whl
  • Upload date:
  • Size: 12.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.13

File hashes

Hashes for trading_common-0.2.4-py3-none-any.whl
Algorithm Hash digest
SHA256 17df890bd14187d52976a147674a31e6061f3e8598179112202b5bffc800496d
MD5 a55df6aa169131e6a20c3f63083debaf
BLAKE2b-256 c0a9930eeb8dc83da66296422f60eab9604d9a5ee88eefcea2761718a88f452e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page