A library for building event-based systems on top of PostgreSQL
Project description
Depeche DB
A library for building event-based systems on top of PostgreSQL
Documentation: https://depeche-py.github.io/depeche-db/
Source code: https://github.com/depeche-py/depeche-db
Depeche DB is modern Python library for building event-based systems
Key features:
- Message store with optimistic concurrency control & strong ordering guarantees
- Subscriptions with "at least once" semantics
- Parallel processing of (partitioned) subscriptions
- No database polling
Requirements
Python 3.9+
SQLAlchemy 1.4 or 2+
PostgreSQL 12+
Installation
pip install depeche-db
# OR
poetry add depeche-db
Example
import pydantic, sqlalchemy, uuid, datetime as dt
from depeche_db import (
MessageStore,
StoredMessage,
MessageHandler,
SubscriptionMessage,
)
from depeche_db.tools import PydanticMessageSerializer
DB_DSN = "postgresql://depeche:depeche@localhost:4888/depeche_demo"
db_engine = sqlalchemy.create_engine(DB_DSN)
class MyMessage(pydantic.BaseModel):
content: int
message_id: uuid.UUID = pydantic.Field(default_factory=uuid.uuid4)
sent_at: dt.datetime = pydantic.Field(default_factory=dt.datetime.utcnow)
def get_message_id(self) -> uuid.UUID:
return self.message_id
def get_message_time(self) -> dt.datetime:
return self.sent_at
message_store = MessageStore[MyMessage](
name="example_store",
engine=db_engine,
serializer=PydanticMessageSerializer(MyMessage),
)
message_store.write(stream="aggregate-me-1", message=MyMessage(content=2))
print(list(message_store.read(stream="aggregate-me-1")))
# [StoredMessage(message_id=UUID('...'), stream='aggregate-me-1', version=1, message=MyMessage(content=2, message_id=UUID('...'), sent_at=datetime.datetime(...)), global_position=1)]
class ContentMessagePartitioner:
def get_partition(self, message: StoredMessage[MyMessage]) -> int:
return message.message.content % 10
class MyHandlers(MessageHandler[MyMessage]):
@MessageHandler.register
def handle_message(self, message: SubscriptionMessage[MyMessage]):
print(message)
aggregated_stream = message_store.aggregated_stream(
name="aggregated",
partitioner=ContentMessagePartitioner(),
stream_wildcards=["aggregate-me-%"],
)
subscription = aggregated_stream.subscription(
name="example_subscription",
handlers=MyHandlers(),
)
aggregated_stream.projector.run()
subscription.runner.run()
# MyHandlers.handle_message prints:
# SubscriptionMessage(partition=2, position=0, stored_message=StoredMessage(...))
Contribute
Contributions in the form of issues, questions, feedback and pull requests are welcome. Before investing a lot of time, let me know what you are up to so we can see if your contribution fits the vision of the project.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
depeche_db-0.7.0.tar.gz
(24.9 kB
view hashes)
Built Distribution
depeche_db-0.7.0-py3-none-any.whl
(31.0 kB
view hashes)
Close
Hashes for depeche_db-0.7.0-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | be466c14ea54416f11824a45c6b47c9380a27e840e0f67f21b184cc89569aa14 |
|
MD5 | 5db9c8655302fb235c8e344a2a798a87 |
|
BLAKE2b-256 | 56d070c3c70d781518fc0fd8fea5cfaeab4524e9be1d2737b695eb2e171458c2 |