Skip to main content

No project description provided

Project description

outbox-streaming

Reliably send messages to message/task brokers, like Kafka or Celery

Roadmap

Done

  • ✅ Kafka + SQLAlchemy
  • ✅ Kafka + SQLAlchemy + asyncio

In progress

  • ⏹ Celery + SQLAlchemy
  • ⏹ Celery + SQLAlchemy + asyncio

Planned

  • 🆕 Kafka + Django ORM
  • 🆕 Celery + Django ORM
  • 🆕 Dramatiq + SQLAlchemy
  • 🆕 Dramatiq + SQLAlchemy + asyncio
  • 🆕 Dramatiq + Django
  • 🆕 RabbitMQ + SQLAlchemy
  • 🆕 RabbitMQ + SQLAlchemy + asyncio
  • 🆕 RabbitMQ + Djagno

Example FastAPI + Kafka + SQLAlchemy

from fastapi import FastAPI
from outbox_streaming.kafka.sqlalchemy import SQLAlchemyKafkaOutbox
from sqlalchemy import orm

from app import models, db
from app.config import config
from app.schemas import TodoCreate

app = FastAPI()

# create instance of SQLAlchemyKafkaOutbox
outbox = SQLAlchemyKafkaOutbox(
    engine=db.engine,
    kafka_servers=config.KAFKA_SERVERS,
)

# Run separate tread that monitor outbox table and publish messages to Kafka. It's not recommended for production,
# but it's handy for development
outbox.publisher.run_daemon()

Session = orm.sessionmaker(bind=db.engine)

# create outbox tables 
outbox.storage.create_tables(engine=db.engine)


@app.post('/todos')
def create_todo(create: TodoCreate) -> str:

    with Session() as session:

        # create new object
        todo = models.Todo(text=create.text)

        session.add(todo)

        # create kafka event in outbox table
        outbox.save(
            session=session,
            topic='todo_created',
            value=todo.to_dict(),
        )

        # commit changes in database
        session.commit()
        
    # publisher will pick up kafka message from outbox table and will send it kafka topic

    return "OK"

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

outbox-streaming-0.1.0.tar.gz (16.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

outbox_streaming-0.1.0-py3-none-any.whl (19.1 kB view details)

Uploaded Python 3

File details

Details for the file outbox-streaming-0.1.0.tar.gz.

File metadata

  • Download URL: outbox-streaming-0.1.0.tar.gz
  • Upload date:
  • Size: 16.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.1.14 CPython/3.10.1 Darwin/21.5.0

File hashes

Hashes for outbox-streaming-0.1.0.tar.gz
Algorithm Hash digest
SHA256 d7ea981203f968f64c051c84251bd63ed0f6ac2a4f43fa6a7277f5ca1e7b8b22
MD5 3597c689c109d632a90b96eaa5187794
BLAKE2b-256 3e35e04c0bbe2473bb22fc0a7b6b620b4f1fee873f816e7aae5d474508981c10

See more details on using hashes here.

File details

Details for the file outbox_streaming-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: outbox_streaming-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 19.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.1.14 CPython/3.10.1 Darwin/21.5.0

File hashes

Hashes for outbox_streaming-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 3f6ac749ac42dc20a82369a194e92cbd8f6dfe3f983045c0593af4b92d0b4a6b
MD5 490e904c693709a3ca1c66da59e92600
BLAKE2b-256 03764ba4f584fddf445c4a49b44f5173a1c8d84d535b57d3e5a2f69adcdf49d1

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page