Skip to main content

Common utils for trading platform (Kafka/Postgres/outbox/inbox)

Project description

Trading Common

Async utilities shared across the trading platform for working with PostgreSQL inbox/outbox tables, Kafka producers/consumers, and schema validation via trading-contracts.

Components

  • trading_common.db.DB — connection pool with inbox/outbox bootstrap, idempotent helpers, and convenience wrappers for writing to the outbox.
  • trading_common.kafka.Producer — resilient aiokafka producer prepared for SASL/SSL clusters and controlled retry behaviour.
  • trading_common.consumer_app.ConsumerApp — batching Kafka consumer that wraps message handling in PostgreSQL transactions with inbox-based deduplication.
  • trading_common.outbox.OutboxProcessor — helpers for reading, acknowledging, and vacuuming the outbox table.
  • trading_common.schema.ensure — thin wrapper around trading-contracts JSON schema validation.
  • trading_common.settings_kafka — baseline Kafka client settings hydrated from environment variables.

Installation

# Inside a virtualenv; install runtime dependencies only
pip install -e .

# Install with development tooling (pytest, black, mypy, ...)
pip install -e ".[dev]"

Configuration

PostgreSQL

Provide a DSN string understood by asyncpg. The DB.start() method creates (or verifies) the core.inbox and core.outbox tables so that services can boot against an empty database.

from trading_common.db import DB

db = DB("postgresql://postgres:postgres@127.0.0.1:55432/trading")
await db.start()

Kafka

Baseline Kafka options come from trading_common.settings_kafka. They honour the following environment variables (defaults in parentheses):

Variable Purpose
KAFKA_BOOTSTRAP_SERVERS (localhost:9092) Broker connection string
KAFKA_SECURITY_PROTOCOL (SASL_SSL) PLAINTEXT, SASL_SSL, ...
KAFKA_SASL_MECHANISM (PLAIN) Auth mechanism
KAFKA_SASL_USERNAME / KAFKA_SASL_PASSWORD (empty) SASL credentials
KAFKA_CLIENT_ID (market-data-service) Default client id
KAFKA_ENABLE_IDEMPOTENCE (true) Producer idempotence flag
KAFKA_LINGER_MS, KAFKA_BATCH_SIZE, KAFKA_COMPRESSION_TYPE, ... Producer tuning knobs
KAFKA_ENABLE_AUTO_COMMIT (false) Consumer offset mode
KAFKA_AUTO_OFFSET_RESET, KAFKA_MAX_POLL_INTERVAL_MS, ... Consumer runtime tuning

Use dict(...) to clone and tweak the defaults before passing them into the Kafka helpers so that you do not mutate the shared module-level dictionaries:

from trading_common import settings_kafka

producer = Producer(
    base_cfg={**settings_kafka.KAFKA_COMMON, "client_id": "strategy-service"},
    tuning=settings_kafka.PRODUCER_TUNING,
)

Usage Examples

Producer

import asyncio
from trading_common import settings_kafka
from trading_common.kafka import Producer

async def publish() -> None:
    producer = Producer(
        base_cfg={**settings_kafka.KAFKA_COMMON, "client_id": "md-publisher"},
        tuning=settings_kafka.PRODUCER_TUNING,
    )
    await producer.start()
    try:
        await producer.send(
            topic="md.candles",
            key="BTCUSDT",
            payload={"event_id": "...", "open": 52000, "close": 52120},
        )
    finally:
        await producer.stop()

asyncio.run(publish())

Consumer with Inbox/Outbox

import asyncio
from trading_common import settings_kafka
from trading_common.consumer_app import ConsumerApp
from trading_common.db import DB
from trading_common.schema import ensure

async def handle(con, topic, key, payload):
    ensure("md.candle.closed@v1", payload)
    # Business logic ...

async def main() -> None:
    db = DB("postgresql://postgres:postgres@127.0.0.1:55432/trading")
    consumer = ConsumerApp(
        name="market-data-service",
        db=db,
        base_cfg={**settings_kafka.KAFKA_COMMON, "client_id": "market-data-service"},
        tuning=settings_kafka.CONSUMER_TUNING,
        topics=["md.candles"],
        group_id="md-service",
        handler=handle,
    )
    await consumer.start()
    try:
        await consumer.run()
    finally:
        await consumer.stop()

asyncio.run(main())

Outbox Processing

from trading_common.outbox import OutboxProcessor

processor = OutboxProcessor(db)
async with db.pool.acquire() as con:  # pool is available after db.start()
    events = await processor.get_pending_events(con, limit=100)
    for event_id, topic, key, payload in events:
        await producer.send(topic, key, payload)
        await processor.mark_published(con, event_id)
    await processor.cleanup_old_events(con, days_old=7)

Development Workflow

pytest -m "not slow"       # Run fast tests
black src tests             # Format
isort src tests             # Import sorting
mypy src                    # Static typing
pytest --cov=trading_common --cov-report=term-missing

Testing

  • pytest — run the whole suite (asyncio tests use strict mode).
  • pytest -m "not slow" — focus on the fast checks used in CI.
  • pytest --cov=trading_common --cov-report=term-missing — optional coverage report.

These commands are mirrored in .github/workflows/test.yml, so keeping them green locally guarantees the CI job passes.

Related Projects

  • trading-contracts — JSON schemas for every event type; install alongside this package to validate messages with schema.ensure.
  • infra/ in the monorepo provides Kafka/PostgreSQL containers for local development (docker compose -f infra/docker-compose.yml up -d).

License

MIT License. See LICENSE for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

trading_common-0.2.5.tar.gz (24.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

trading_common-0.2.5-py3-none-any.whl (12.1 kB view details)

Uploaded Python 3

File details

Details for the file trading_common-0.2.5.tar.gz.

File metadata

  • Download URL: trading_common-0.2.5.tar.gz
  • Upload date:
  • Size: 24.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.13

File hashes

Hashes for trading_common-0.2.5.tar.gz
Algorithm Hash digest
SHA256 a8494cfaa6abd91c8b6e967bfeb64db5cfdabfb12d085628e35a7e0fffad6225
MD5 3d44ebb697203246f1ae21b63497b0d0
BLAKE2b-256 469f85ead204fe6aa5e5b53a491c2afa848ba1c22433291046f0e50ace6194f5

See more details on using hashes here.

File details

Details for the file trading_common-0.2.5-py3-none-any.whl.

File metadata

  • Download URL: trading_common-0.2.5-py3-none-any.whl
  • Upload date:
  • Size: 12.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.13

File hashes

Hashes for trading_common-0.2.5-py3-none-any.whl
Algorithm Hash digest
SHA256 600dfd9709f40626b449625d4367c41fca4072bd8eddf84dd3740426c9b5fc81
MD5 4dade0ccc091b0a445b656250af6eb12
BLAKE2b-256 ebb0aa2e4fb1286af1e6312897ef0f4f4f9d324153fe476036fd9db92dca9a31

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page