Common utils for trading platform (Kafka/Postgres/outbox/inbox)
Project description
Trading Common
Async utilities shared across the trading platform for working with PostgreSQL inbox/outbox tables, Kafka producers/consumers, and schema validation via trading-contracts.
Components
trading_common.db.DB— connection pool with inbox/outbox bootstrap, idempotent helpers, and convenience wrappers for writing to the outbox.trading_common.kafka.Producer— resilientaiokafkaproducer prepared for SASL/SSL clusters and controlled retry behaviour.trading_common.consumer_app.ConsumerApp— batching Kafka consumer that wraps message handling in PostgreSQL transactions with inbox-based deduplication.trading_common.outbox.OutboxProcessor— helpers for reading, acknowledging, and vacuuming the outbox table.trading_common.schema.ensure— thin wrapper aroundtrading-contractsJSON schema validation.trading_common.settings_kafka— baseline Kafka client settings hydrated from environment variables.
Installation
# Inside a virtualenv; install runtime dependencies only
pip install -e .
# Install with development tooling (pytest, black, mypy, ...)
pip install -e ".[dev]"
Configuration
PostgreSQL
Provide a DSN string understood by asyncpg. The DB.start() method creates (or verifies) the core.inbox and core.outbox tables so that services can boot against an empty database.
from trading_common.db import DB
db = DB("postgresql://postgres:postgres@127.0.0.1:55432/trading")
await db.start()
Kafka
Baseline Kafka options come from trading_common.settings_kafka. They honour the following environment variables (defaults in parentheses):
| Variable | Purpose |
|---|---|
KAFKA_BOOTSTRAP_SERVERS (localhost:9092) |
Broker connection string |
KAFKA_SECURITY_PROTOCOL (SASL_SSL) |
PLAINTEXT, SASL_SSL, ... |
KAFKA_SASL_MECHANISM (PLAIN) |
Auth mechanism |
KAFKA_SASL_USERNAME / KAFKA_SASL_PASSWORD (empty) |
SASL credentials |
KAFKA_CLIENT_ID (market-data-service) |
Default client id |
KAFKA_ENABLE_IDEMPOTENCE (true) |
Producer idempotence flag |
KAFKA_LINGER_MS, KAFKA_BATCH_SIZE, KAFKA_COMPRESSION_TYPE, ... |
Producer tuning knobs |
KAFKA_ENABLE_AUTO_COMMIT (false) |
Consumer offset mode |
KAFKA_AUTO_OFFSET_RESET, KAFKA_MAX_POLL_INTERVAL_MS, ... |
Consumer runtime tuning |
Use dict(...) to clone and tweak the defaults before passing them into the Kafka helpers so that you do not mutate the shared module-level dictionaries:
from trading_common import settings_kafka
producer = Producer(
base_cfg={**settings_kafka.KAFKA_COMMON, "client_id": "strategy-service"},
tuning=settings_kafka.PRODUCER_TUNING,
)
Usage Examples
Producer
import asyncio
from trading_common import settings_kafka
from trading_common.kafka import Producer
async def publish() -> None:
producer = Producer(
base_cfg={**settings_kafka.KAFKA_COMMON, "client_id": "md-publisher"},
tuning=settings_kafka.PRODUCER_TUNING,
)
await producer.start()
try:
await producer.send(
topic="md.candles",
key="BTCUSDT",
payload={"event_id": "...", "open": 52000, "close": 52120},
)
finally:
await producer.stop()
asyncio.run(publish())
Consumer with Inbox/Outbox
import asyncio
from trading_common import settings_kafka
from trading_common.consumer_app import ConsumerApp
from trading_common.db import DB
from trading_common.schema import ensure
async def handle(con, topic, key, payload):
ensure("md.candle.closed@v1", payload)
# Business logic ...
async def main() -> None:
db = DB("postgresql://postgres:postgres@127.0.0.1:55432/trading")
consumer = ConsumerApp(
name="market-data-service",
db=db,
base_cfg={**settings_kafka.KAFKA_COMMON, "client_id": "market-data-service"},
tuning=settings_kafka.CONSUMER_TUNING,
topics=["md.candles"],
group_id="md-service",
handler=handle,
)
await consumer.start()
try:
await consumer.run()
finally:
await consumer.stop()
asyncio.run(main())
Outbox Processing
from trading_common.outbox import OutboxProcessor
processor = OutboxProcessor(db)
async with db.pool.acquire() as con: # pool is available after db.start()
events = await processor.get_pending_events(con, limit=100)
for event_id, topic, key, payload in events:
await producer.send(topic, key, payload)
await processor.mark_published(con, event_id)
await processor.cleanup_old_events(con, days_old=7)
Development Workflow
pytest -m "not slow" # Run fast tests
black src tests # Format
isort src tests # Import sorting
mypy src # Static typing
pytest --cov=trading_common --cov-report=term-missing
Testing
pytest— run the whole suite (asyncio tests use strict mode).pytest -m "not slow"— focus on the fast checks used in CI.pytest --cov=trading_common --cov-report=term-missing— optional coverage report.
These commands are mirrored in .github/workflows/test.yml, so keeping them green locally guarantees the CI job passes.
Related Projects
trading-contracts— JSON schemas for every event type; install alongside this package to validate messages withschema.ensure.infra/in the monorepo provides Kafka/PostgreSQL containers for local development (docker compose -f infra/docker-compose.yml up -d).
License
MIT License. See LICENSE for details.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file trading_common-0.2.4.tar.gz.
File metadata
- Download URL: trading_common-0.2.4.tar.gz
- Upload date:
- Size: 23.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
67e4ca3094717affe6f6dc81e637b177654373eb1bd95c9e7e705cd8f8ab1cbf
|
|
| MD5 |
413586292988ff6310a12e55ece60f48
|
|
| BLAKE2b-256 |
84b430ae0497e8e7a87506c54bb51823108abf3a3dff95d0177f2df9aa3f7003
|
File details
Details for the file trading_common-0.2.4-py3-none-any.whl.
File metadata
- Download URL: trading_common-0.2.4-py3-none-any.whl
- Upload date:
- Size: 12.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
17df890bd14187d52976a147674a31e6061f3e8598179112202b5bffc800496d
|
|
| MD5 |
a55df6aa169131e6a20c3f63083debaf
|
|
| BLAKE2b-256 |
c0a9930eeb8dc83da66296422f60eab9604d9a5ee88eefcea2761718a88f452e
|