Skip to main content

Common utils for trading platform (Kafka/Postgres/outbox/inbox)

Project description

Trading Common

Common utilities for trading platform including Kafka producer/consumer wrappers, Postgres inbox/outbox patterns, and event validation through trading-contracts.

Features

  • Database: AsyncPG wrapper with inbox/outbox tables and idempotency support
  • Kafka: AIOKafka producer/consumer wrappers with transaction handling
  • Schema Validation: Event validation using trading-contracts schemas
  • Outbox Pattern: Reliable message publishing with database persistence

Installation

# Install in development mode
pip install -e ".[dev]"

# Install production dependencies only
pip install -e .

Quick Start

Database Setup

from trading_common.db import DB

db = DB("postgresql://user:pass@localhost:5432/dbname")
await db.start()

# Use in transactions
async with db.pool.acquire() as con:
    tx = con.transaction()
    await tx.start()
    try:
        # Your business logic here
        await db.outbox_put(con, "topic", "key", {"data": "value"})
        await tx.commit()
    except Exception:
        await tx.rollback()
        raise

await db.stop()

Kafka Consumer

from trading_common.kafka import ConsumerApp
from trading_common.schema import ensure

async def handler(con, topic, key, payload):
    # Validate event schema
    ensure("md.candle.closed@v1", payload)

    # Process the event
    # ... your business logic ...

    # Optionally publish to outbox
    await db.outbox_put(con, "strategy.signal@v1", key, {"signal": "buy"})

# Create and run consumer
consumer = ConsumerApp(
    name="strategy-service",
    db=db,
    bootstrap="localhost:9092",
    topics=["market-data"],
    group_id="strategy-group",
    handler=handler
)

await consumer.start()
await consumer.run()  # Runs indefinitely

Kafka Producer

from trading_common.kafka import Producer

producer = Producer("localhost:9092")
await producer.start()

# Send messages
await producer.send("topic", "key", {"data": "value"})

await producer.stop()

Outbox Processing

from trading_common.outbox import OutboxProcessor

outbox = OutboxProcessor(db)

async with db.pool.acquire() as con:
    # Get pending events
    events = await outbox.get_pending_events(con, limit=100)

    for event_id, topic, key, payload in events:
        # Publish to Kafka
        await producer.send(topic, key, payload)

        # Mark as published
        await outbox.mark_published(con, event_id)

    # Clean up old events
    await outbox.cleanup_old_events(con, days_old=7)

Development

# Install development dependencies
pip install -e ".[dev]"

# Run tests
pytest

# Format code
black src tests
isort src tests

# Type checking
mypy src

# Run with coverage
pytest --cov=trading_common --cov-report=term-missing

Architecture

  • Inbox Pattern: Ensures idempotent message processing
  • Outbox Pattern: Reliable message publishing with database persistence
  • Transaction Safety: All operations wrapped in database transactions
  • Schema Validation: Events validated against JSON schemas before processing

Dependencies

  • Python 3.10+
  • asyncpg: Async PostgreSQL driver
  • aiokafka: Async Kafka client
  • ujson: Fast JSON serializer
  • trading-contracts: Schema validation
  • jsonschema: JSON schema validation

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

trading_common-0.1.0.tar.gz (11.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

trading_common-0.1.0-py3-none-any.whl (7.1 kB view details)

Uploaded Python 3

File details

Details for the file trading_common-0.1.0.tar.gz.

File metadata

  • Download URL: trading_common-0.1.0.tar.gz
  • Upload date:
  • Size: 11.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.11.13

File hashes

Hashes for trading_common-0.1.0.tar.gz
Algorithm Hash digest
SHA256 095e69c78d8bfd017a2160c2bbe24f7f300d1b7c5a34c36dec15f4f4b7f83ba6
MD5 98456a08e88817d629ca71b8712f837d
BLAKE2b-256 8bbeb08f51b5a18a2fa446dc4988670c122a714190e4fc6ad4299d0760e02c49

See more details on using hashes here.

File details

Details for the file trading_common-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: trading_common-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 7.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.11.13

File hashes

Hashes for trading_common-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 a47a8334e99b7f71f34ca13e8501fe5d9d026cc2280f98667379ec96f7d2836f
MD5 a48fc754eb291adf814c3eb2b3d7df5f
BLAKE2b-256 46454dc95e766cbb872fec2c899f477e2eebb664d09cce18308f4d52aff85558

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page