A library for building event-based systems on top of PostgreSQL
Project description
Depeche DB
A library for building event-based systems on top of PostgreSQL
Documentation: https://depeche-py.github.io/depeche-db/
Source code: https://github.com/depeche-py/depeche-db
Depeche DB is modern Python library for building event-based systems
Key features:
- Message store with optimistic concurrency control & strong ordering guarantees
- Subscriptions with "at least once" semantics
- Parallel processing of (partitioned) subscriptions
- No database polling
Requirements
Python 3.9+
SQLAlchemy 1.4 or 2+
PostgreSQL 12+
Installation
pip install depeche-db
# OR
poetry add depeche-db
Example
import pydantic, sqlalchemy, uuid, datetime as dt
from depeche_db import (
MessageStore,
StoredMessage,
MessageHandler,
SubscriptionMessage,
)
from depeche_db.tools import PydanticMessageSerializer
DB_DSN = "postgresql://depeche:depeche@localhost:4888/depeche_demo"
db_engine = sqlalchemy.create_engine(DB_DSN)
class MyMessage(pydantic.BaseModel):
content: int
message_id: uuid.UUID = pydantic.Field(default_factory=uuid.uuid4)
sent_at: dt.datetime = pydantic.Field(default_factory=dt.datetime.utcnow)
def get_message_id(self) -> uuid.UUID:
return self.message_id
def get_message_time(self) -> dt.datetime:
return self.sent_at
message_store = MessageStore[MyMessage](
name="example_store",
engine=db_engine,
serializer=PydanticMessageSerializer(MyMessage),
)
message_store.write(stream="aggregate-me-1", message=MyMessage(content=2))
print(list(message_store.read(stream="aggregate-me-1")))
# [StoredMessage(message_id=UUID('...'), stream='aggregate-me-1', version=1, message=MyMessage(content=2, message_id=UUID('...'), sent_at=datetime.datetime(...)), global_position=1)]
class ContentMessagePartitioner:
def get_partition(self, message: StoredMessage[MyMessage]) -> int:
return message.message.content % 10
class MyHandlers(MessageHandler[MyMessage]):
@MessageHandler.register
def handle_message(self, message: SubscriptionMessage[MyMessage]):
print(message)
aggregated_stream = message_store.aggregated_stream(
name="aggregated",
partitioner=ContentMessagePartitioner(),
stream_wildcards=["aggregate-me-%"],
)
subscription = aggregated_stream.subscription(
name="example_subscription",
handlers=MyHandlers(),
)
aggregated_stream.projector.run()
subscription.runner.run()
# MyHandlers.handle_message prints:
# SubscriptionMessage(partition=2, position=0, stored_message=StoredMessage(...))
Contribute
Contributions in the form of issues, questions, feedback and pull requests are welcome. Before investing a lot of time, let me know what you are up to so we can see if your contribution fits the vision of the project.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file depeche_db-0.8.0.tar.gz
.
File metadata
- Download URL: depeche_db-0.8.0.tar.gz
- Upload date:
- Size: 26.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.9.18
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | be1ff87a860546aaa0b9fac120ca81357ecc1048de7d44927404fe868e201c90 |
|
MD5 | 6c9621db14622ede5cf0311d040565d9 |
|
BLAKE2b-256 | 5971fc822397a51d52bdb0aa8a67fa47062fd1629bf2b06ed8449c0df7431725 |
File details
Details for the file depeche_db-0.8.0-py3-none-any.whl
.
File metadata
- Download URL: depeche_db-0.8.0-py3-none-any.whl
- Upload date:
- Size: 32.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.9.18
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 92f054b0d992abf8066c20f58a1a74b9d7f1b40793cabae7a5f2dc70b14fd0d7 |
|
MD5 | d757d05fbcea6db62c6dc5eb442a3c75 |
|
BLAKE2b-256 | 6f26ff2484ed52e4817eb6eb063eeaa6e705144ebffd445b646ed0a398687ffb |