Event-driven architecture toolkit for Python
Project description
natricine
Python port of watermill. Async-first event-driven architecture toolkit. "Vibe ported" with Opus 4.5, as such there are no guarantees at this stage of compatibility.
Installation
pip install natricine
# Backends
pip install natricine-redisstream # Redis Streams
pip install natricine-aws # AWS SNS/SQS
pip install natricine-sql # PostgreSQL/SQLite
pip install natricine-http # HTTP webhooks
pip install natricine_otel # OpenTelemetry tracing/metrics
Quick Start
import asyncio
from natricine.pubsub import InMemoryPubSub, Message
from natricine.cqrs import CommandBus, EventBus, PydanticMarshaler
from pydantic import BaseModel
# Define commands and events
class CreateUser(BaseModel):
user_id: str
name: str
class UserCreated(BaseModel):
user_id: str
name: str
# Set up buses
pubsub = InMemoryPubSub()
marshaler = PydanticMarshaler()
command_bus = CommandBus(pubsub, pubsub, marshaler)
event_bus = EventBus(pubsub, pubsub, marshaler)
# Register handlers
@command_bus.handler
async def handle_create_user(cmd: CreateUser) -> None:
print(f"Creating user: {cmd.name}")
await event_bus.publish(UserCreated(user_id=cmd.user_id, name=cmd.name))
@event_bus.handler
async def on_user_created(event: UserCreated) -> None:
print(f"User created: {event.name}")
# Run
async def main():
async with pubsub:
async with asyncio.TaskGroup() as tg:
tg.create_task(command_bus.run())
tg.create_task(event_bus.run())
await command_bus.send(CreateUser(user_id="1", name="Alice"))
await asyncio.sleep(0.1)
await command_bus.close()
await event_bus.close()
asyncio.run(main())
Package Structure
natricine # Core package
├── pubsub # Message, Publisher, Subscriber, InMemoryPubSub
├── router # Router, Middleware
└── cqrs # CommandBus, EventBus
natricine_redis # pip install natricine-redisstream
natricine_aws # pip install natricine-aws
natricine_sql # pip install natricine-sql
natricine_http # pip install natricine-http
natricine_otel # pip install natricine_otel
Backends
In-Memory (built-in)
from natricine.pubsub import InMemoryPubSub
pubsub = InMemoryPubSub()
Redis Streams
from redis.asyncio import Redis
from natricine_redis import RedisStreamPublisher, RedisStreamSubscriber
redis = Redis.from_url("redis://localhost:6379")
publisher = RedisStreamPublisher(redis)
subscriber = RedisStreamSubscriber(redis, group_name="my-app", consumer_name="worker-1")
AWS SQS/SNS
import aioboto3
from natricine_aws import SQSPublisher, SQSSubscriber, SNSPublisher, SNSSubscriber, SNSConfig
session = aioboto3.Session()
# Direct SQS
publisher = SQSPublisher(session)
subscriber = SQSSubscriber(session)
# SNS fan-out to SQS
sns_publisher = SNSPublisher(session)
sns_subscriber = SNSSubscriber(session, config=SNSConfig(consumer_group="my-group"))
Middleware
from natricine.router import Router, retry, timeout
router = Router()
router.add_middleware(retry(max_retries=3, delay=1.0))
router.add_middleware(timeout(seconds=30))
References
- watermill - Go event-driven library (inspiration)
- watermill-aws - AWS backend reference
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file natricine-0.2.0.tar.gz.
File metadata
- Download URL: natricine-0.2.0.tar.gz
- Upload date:
- Size: 12.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
34d4612604432b82e0069dc6134b26cc02a14bedd50a4f96c37330b67a033493
|
|
| MD5 |
76f28b81d4661f48a03219b87f120f38
|
|
| BLAKE2b-256 |
870bd623bbba64210981c1cf9a45a3d60e320b4eeb287e5cc3860dd84a6fa04a
|
Provenance
The following attestation bundles were made for natricine-0.2.0.tar.gz:
Publisher:
publish.yml on nm523/natricine
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
natricine-0.2.0.tar.gz -
Subject digest:
34d4612604432b82e0069dc6134b26cc02a14bedd50a4f96c37330b67a033493 - Sigstore transparency entry: 779507849
- Sigstore integration time:
-
Permalink:
nm523/natricine@696810b7d13acdfc687f930ea3d23f95334ff795 -
Branch / Tag:
refs/tags/natricine-v0.2.0 - Owner: https://github.com/nm523
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@696810b7d13acdfc687f930ea3d23f95334ff795 -
Trigger Event:
push
-
Statement type:
File details
Details for the file natricine-0.2.0-py3-none-any.whl.
File metadata
- Download URL: natricine-0.2.0-py3-none-any.whl
- Upload date:
- Size: 22.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
90ef61d4ac83a6a8eb0726f6e8f6aab3099372d61426b71acb73a215c1bb3ff2
|
|
| MD5 |
eece1e1a59be8c3599667fbbe3f46e3a
|
|
| BLAKE2b-256 |
3fb9ae54d3479bdf3d21acdfc58303a2c5bd467adcbe96711102793810737b67
|
Provenance
The following attestation bundles were made for natricine-0.2.0-py3-none-any.whl:
Publisher:
publish.yml on nm523/natricine
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
natricine-0.2.0-py3-none-any.whl -
Subject digest:
90ef61d4ac83a6a8eb0726f6e8f6aab3099372d61426b71acb73a215c1bb3ff2 - Sigstore transparency entry: 779507850
- Sigstore integration time:
-
Permalink:
nm523/natricine@696810b7d13acdfc687f930ea3d23f95334ff795 -
Branch / Tag:
refs/tags/natricine-v0.2.0 - Owner: https://github.com/nm523
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@696810b7d13acdfc687f930ea3d23f95334ff795 -
Trigger Event:
push
-
Statement type: