Common utils for trading platform (Kafka/Postgres/outbox/inbox)
Project description
Trading Common
Common utilities for trading platform including Kafka producer/consumer wrappers, Postgres inbox/outbox patterns, and event validation through trading-contracts.
Features
- Database: AsyncPG wrapper with inbox/outbox tables and idempotency support
- Kafka: AIOKafka producer/consumer wrappers with transaction handling
- Schema Validation: Event validation using trading-contracts schemas
- Outbox Pattern: Reliable message publishing with database persistence
Installation
# Install in development mode
pip install -e ".[dev]"
# Install production dependencies only
pip install -e .
Quick Start
Database Setup
from trading_common.db import DB
db = DB("postgresql://user:pass@localhost:5432/dbname")
await db.start()
# Use in transactions
async with db.pool.acquire() as con:
tx = con.transaction()
await tx.start()
try:
# Your business logic here
await db.outbox_put(con, "topic", "key", {"data": "value"})
await tx.commit()
except Exception:
await tx.rollback()
raise
await db.stop()
Kafka Consumer
from trading_common.kafka import ConsumerApp
from trading_common.schema import ensure
async def handler(con, topic, key, payload):
# Validate event schema
ensure("md.candle.closed@v1", payload)
# Process the event
# ... your business logic ...
# Optionally publish to outbox
await db.outbox_put(con, "strategy.signal@v1", key, {"signal": "buy"})
# Create and run consumer
consumer = ConsumerApp(
name="strategy-service",
db=db,
bootstrap="localhost:9092",
topics=["market-data"],
group_id="strategy-group",
handler=handler
)
await consumer.start()
await consumer.run() # Runs indefinitely
Kafka Producer
from trading_common.kafka import Producer
producer = Producer("localhost:9092")
await producer.start()
# Send messages
await producer.send("topic", "key", {"data": "value"})
await producer.stop()
Outbox Processing
from trading_common.outbox import OutboxProcessor
outbox = OutboxProcessor(db)
async with db.pool.acquire() as con:
# Get pending events
events = await outbox.get_pending_events(con, limit=100)
for event_id, topic, key, payload in events:
# Publish to Kafka
await producer.send(topic, key, payload)
# Mark as published
await outbox.mark_published(con, event_id)
# Clean up old events
await outbox.cleanup_old_events(con, days_old=7)
Development
# Install development dependencies
pip install -e ".[dev]"
# Run tests
pytest
# Format code
black src tests
isort src tests
# Type checking
mypy src
# Run with coverage
pytest --cov=trading_common --cov-report=term-missing
Architecture
- Inbox Pattern: Ensures idempotent message processing
- Outbox Pattern: Reliable message publishing with database persistence
- Transaction Safety: All operations wrapped in database transactions
- Schema Validation: Events validated against JSON schemas before processing
Dependencies
- Python 3.10+
- asyncpg: Async PostgreSQL driver
- aiokafka: Async Kafka client
- ujson: Fast JSON serializer
- trading-contracts: Schema validation
- jsonschema: JSON schema validation
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
trading_common-0.1.0.tar.gz
(11.6 kB
view details)
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file trading_common-0.1.0.tar.gz.
File metadata
- Download URL: trading_common-0.1.0.tar.gz
- Upload date:
- Size: 11.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.11.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
095e69c78d8bfd017a2160c2bbe24f7f300d1b7c5a34c36dec15f4f4b7f83ba6
|
|
| MD5 |
98456a08e88817d629ca71b8712f837d
|
|
| BLAKE2b-256 |
8bbeb08f51b5a18a2fa446dc4988670c122a714190e4fc6ad4299d0760e02c49
|
File details
Details for the file trading_common-0.1.0-py3-none-any.whl.
File metadata
- Download URL: trading_common-0.1.0-py3-none-any.whl
- Upload date:
- Size: 7.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.11.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a47a8334e99b7f71f34ca13e8501fe5d9d026cc2280f98667379ec96f7d2836f
|
|
| MD5 |
a48fc754eb291adf814c3eb2b3d7df5f
|
|
| BLAKE2b-256 |
46454dc95e766cbb872fec2c899f477e2eebb664d09cce18308f4d52aff85558
|