Skip to main content

A library for building event-based systems on top of PostgreSQL

Project description

Depeche DB

A library for building event-based systems on top of PostgreSQL

Tests pypi versions Docs license


Documentation: https://depeche-py.github.io/depeche-db/

Source code: https://github.com/depeche-py/depeche-db


Depeche DB is modern Python library for building event-based systems

Key features:

  • Message store with optimistic concurrency control & strong ordering guarantees
  • Subscriptions with "at least once" semantics
  • Parallel processing of (partitioned) subscriptions
  • No database polling

Requirements

Python 3.9+

SQLAlchemy 1.4 or 2+

PostgreSQL 12+

Installation

pip install depeche-db
# OR
poetry add depeche-db

Example

import pydantic, sqlalchemy, uuid, datetime as dt

from depeche_db import (
    MessageStore,
    StoredMessage,
    MessageHandler,
    SubscriptionMessage,
)
from depeche_db.tools import PydanticMessageSerializer

DB_DSN = "postgresql://depeche:depeche@localhost:4888/depeche_demo"
db_engine = sqlalchemy.create_engine(DB_DSN)


class MyMessage(pydantic.BaseModel):
    content: int
    message_id: uuid.UUID = pydantic.Field(default_factory=uuid.uuid4)
    sent_at: dt.datetime = pydantic.Field(default_factory=dt.datetime.utcnow)

    def get_message_id(self) -> uuid.UUID:
        return self.message_id

    def get_message_time(self) -> dt.datetime:
        return self.sent_at


message_store = MessageStore[MyMessage](
    name="example_store",
    engine=db_engine,
    serializer=PydanticMessageSerializer(MyMessage),
)
message_store.write(stream="aggregate-me-1", message=MyMessage(content=2))
print(list(message_store.read(stream="aggregate-me-1")))
# [StoredMessage(message_id=UUID('...'), stream='aggregate-me-1', version=1, message=MyMessage(content=2, message_id=UUID('...'), sent_at=datetime.datetime(...)), global_position=1)]


class ContentMessagePartitioner:
    def get_partition(self, message: StoredMessage[MyMessage]) -> int:
        return message.message.content % 10


class MyHandlers(MessageHandler[MyMessage]):
    @MessageHandler.register
    def handle_message(self, message: SubscriptionMessage[MyMessage]):
        print(message)


aggregated_stream = message_store.aggregated_stream(
    name="aggregated",
    partitioner=ContentMessagePartitioner(),
    stream_wildcards=["aggregate-me-%"],
)
subscription = aggregated_stream.subscription(
    name="example_subscription",
    handlers=MyHandlers(),
)

aggregated_stream.projector.run()
subscription.runner.run()
# MyHandlers.handle_message prints:
# SubscriptionMessage(partition=2, position=0, stored_message=StoredMessage(...))

Contribute

Contributions in the form of issues, questions, feedback and pull requests are welcome. Before investing a lot of time, let me know what you are up to so we can see if your contribution fits the vision of the project.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

depeche_db-0.8.0.tar.gz (26.6 kB view details)

Uploaded Source

Built Distribution

depeche_db-0.8.0-py3-none-any.whl (32.7 kB view details)

Uploaded Python 3

File details

Details for the file depeche_db-0.8.0.tar.gz.

File metadata

  • Download URL: depeche_db-0.8.0.tar.gz
  • Upload date:
  • Size: 26.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.9.18

File hashes

Hashes for depeche_db-0.8.0.tar.gz
Algorithm Hash digest
SHA256 be1ff87a860546aaa0b9fac120ca81357ecc1048de7d44927404fe868e201c90
MD5 6c9621db14622ede5cf0311d040565d9
BLAKE2b-256 5971fc822397a51d52bdb0aa8a67fa47062fd1629bf2b06ed8449c0df7431725

See more details on using hashes here.

File details

Details for the file depeche_db-0.8.0-py3-none-any.whl.

File metadata

  • Download URL: depeche_db-0.8.0-py3-none-any.whl
  • Upload date:
  • Size: 32.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.9.18

File hashes

Hashes for depeche_db-0.8.0-py3-none-any.whl
Algorithm Hash digest
SHA256 92f054b0d992abf8066c20f58a1a74b9d7f1b40793cabae7a5f2dc70b14fd0d7
MD5 d757d05fbcea6db62c6dc5eb442a3c75
BLAKE2b-256 6f26ff2484ed52e4817eb6eb063eeaa6e705144ebffd445b646ed0a398687ffb

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page