Skip to main content

A library for building event-based systems on top of PostgreSQL

Project description

Depeche DB

A library for building event-based systems on top of PostgreSQL

Tests pypi versions Docs license


Documentation: https://depeche-py.github.io/depeche-db/

Source code: https://github.com/depeche-py/depeche-db


Depeche DB is modern Python library for building event-based systems

Key features:

  • Message store with optimistic concurrency control & strong ordering guarantees
  • Subscriptions with "at least once" semantics
  • Parallel processing of (partitioned) subscriptions
  • No database polling

Requirements

Python 3.9+ SQLAlchemy 1.4 or 2+ PostgreSQL 12+ Psycopg (Version 2 >= 2.9.3 or Version 3 >= 3.1)

Installation

pip install depeche-db
# OR
poetry add depeche-db
# OR
uv add depeche-db

Example

import pydantic, sqlalchemy, uuid, datetime as dt

from depeche_db import (
    MessageStore,
    StoredMessage,
    MessageHandler,
    SubscriptionMessage,
)
from depeche_db.tools import PydanticMessageSerializer

DB_DSN = "postgresql://depeche:depeche@localhost:4888/depeche_demo"
db_engine = sqlalchemy.create_engine(DB_DSN)


class MyMessage(pydantic.BaseModel):
    content: int
    message_id: uuid.UUID = pydantic.Field(default_factory=uuid.uuid4)
    sent_at: dt.datetime = pydantic.Field(default_factory=dt.datetime.utcnow)

    def get_message_id(self) -> uuid.UUID:
        return self.message_id

    def get_message_time(self) -> dt.datetime:
        return self.sent_at


message_store = MessageStore[MyMessage](
    name="example_store",
    engine=db_engine,
    serializer=PydanticMessageSerializer(MyMessage),
)
message_store.write(stream="aggregate-me-1", message=MyMessage(content=2))
print(list(message_store.read(stream="aggregate-me-1")))
# [StoredMessage(message_id=UUID('...'), stream='aggregate-me-1', version=1, message=MyMessage(content=2, message_id=UUID('...'), sent_at=datetime.datetime(...)), global_position=1)]


class ContentMessagePartitioner:
    def get_partition(self, message: StoredMessage[MyMessage]) -> int:
        return message.message.content % 10


class MyHandlers(MessageHandler[MyMessage]):
    @MessageHandler.register
    def handle_message(self, message: SubscriptionMessage[MyMessage]):
        print(message)


aggregated_stream = message_store.aggregated_stream(
    name="aggregated",
    partitioner=ContentMessagePartitioner(),
    stream_wildcards=["aggregate-me-%"],
)
subscription = aggregated_stream.subscription(
    name="example_subscription",
    handlers=MyHandlers(),
)

aggregated_stream.projector.run()
subscription.runner.run()
# MyHandlers.handle_message prints:
# SubscriptionMessage(partition=2, position=0, stored_message=StoredMessage(...))

Contribute

Contributions in the form of issues, questions, feedback and pull requests are welcome. Before investing a lot of time, let me know what you are up to so we can see if your contribution fits the vision of the project.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

depeche_db-0.14.0rc1.tar.gz (346.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

depeche_db-0.14.0rc1-py3-none-any.whl (49.1 kB view details)

Uploaded Python 3

File details

Details for the file depeche_db-0.14.0rc1.tar.gz.

File metadata

  • Download URL: depeche_db-0.14.0rc1.tar.gz
  • Upload date:
  • Size: 346.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.9.25

File hashes

Hashes for depeche_db-0.14.0rc1.tar.gz
Algorithm Hash digest
SHA256 9edbb5db92743f61e426120a19525262555d72e5be8e72a2e922ff99fc0d9d14
MD5 7a8f0152eb0336a0702d1b0b1f7bac9e
BLAKE2b-256 fa90033c1cfc79cc081bb57eb4f3ce4baa879b0572921aba1c251bfd635257fb

See more details on using hashes here.

File details

Details for the file depeche_db-0.14.0rc1-py3-none-any.whl.

File metadata

  • Download URL: depeche_db-0.14.0rc1-py3-none-any.whl
  • Upload date:
  • Size: 49.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.9.25

File hashes

Hashes for depeche_db-0.14.0rc1-py3-none-any.whl
Algorithm Hash digest
SHA256 3f8b9b739c2bbf20d68ddef134b742ddab445607d117c66ad928a43b44cd6d22
MD5 09b5499bf63acaff5fda6a13ed4855c6
BLAKE2b-256 f50f763a4babed6d4d257aaf5d4423aa0e84eaccfee7d3e9def208ad01c4e9ca

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page