Skip to main content

Event-driven architecture toolkit for Python

Project description

natricine

Python port of watermill. Async-first event-driven architecture toolkit. "Vibe ported" with Opus 4.5, as such there are no guarantees at this stage of compatibility.

Installation

pip install natricine

# Backends
pip install natricine-redisstream    # Redis Streams
pip install natricine-aws            # AWS SNS/SQS
pip install natricine-sql            # PostgreSQL/SQLite
pip install natricine-http           # HTTP webhooks
pip install natricine_otel           # OpenTelemetry tracing/metrics

Quick Start

import asyncio
from natricine.pubsub import InMemoryPubSub, Message
from natricine.cqrs import CommandBus, EventBus, PydanticMarshaler
from pydantic import BaseModel

# Define commands and events
class CreateUser(BaseModel):
    user_id: str
    name: str

class UserCreated(BaseModel):
    user_id: str
    name: str

# Set up buses
pubsub = InMemoryPubSub()
marshaler = PydanticMarshaler()
command_bus = CommandBus(pubsub, pubsub, marshaler)
event_bus = EventBus(pubsub, pubsub, marshaler)

# Register handlers
@command_bus.handler
async def handle_create_user(cmd: CreateUser) -> None:
    print(f"Creating user: {cmd.name}")
    await event_bus.publish(UserCreated(user_id=cmd.user_id, name=cmd.name))

@event_bus.handler
async def on_user_created(event: UserCreated) -> None:
    print(f"User created: {event.name}")

# Run
async def main():
    async with pubsub:
        async with asyncio.TaskGroup() as tg:
            tg.create_task(command_bus.run())
            tg.create_task(event_bus.run())

            await command_bus.send(CreateUser(user_id="1", name="Alice"))
            await asyncio.sleep(0.1)

            await command_bus.close()
            await event_bus.close()

asyncio.run(main())

Package Structure

natricine                     # Core package
├── pubsub                    # Message, Publisher, Subscriber, InMemoryPubSub
├── router                    # Router, Middleware
└── cqrs                      # CommandBus, EventBus

natricine_redis               # pip install natricine-redisstream
natricine_aws                 # pip install natricine-aws
natricine_sql                 # pip install natricine-sql
natricine_http                # pip install natricine-http
natricine_otel                # pip install natricine_otel

Backends

In-Memory (built-in)

from natricine.pubsub import InMemoryPubSub

pubsub = InMemoryPubSub()

Redis Streams

from redis.asyncio import Redis
from natricine_redis import RedisStreamPublisher, RedisStreamSubscriber

redis = Redis.from_url("redis://localhost:6379")
publisher = RedisStreamPublisher(redis)
subscriber = RedisStreamSubscriber(redis, group_name="my-app", consumer_name="worker-1")

AWS SQS/SNS

import aioboto3
from natricine_aws import SQSPublisher, SQSSubscriber, SNSPublisher, SNSSubscriber, SNSConfig

session = aioboto3.Session()

# Direct SQS
publisher = SQSPublisher(session)
subscriber = SQSSubscriber(session)

# SNS fan-out to SQS
sns_publisher = SNSPublisher(session)
sns_subscriber = SNSSubscriber(session, config=SNSConfig(consumer_group="my-group"))

Middleware

from natricine.router import Router, retry, timeout

router = Router()
router.add_middleware(retry(max_retries=3, delay=1.0))
router.add_middleware(timeout(seconds=30))

References

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

natricine-0.2.0.tar.gz (12.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

natricine-0.2.0-py3-none-any.whl (22.5 kB view details)

Uploaded Python 3

File details

Details for the file natricine-0.2.0.tar.gz.

File metadata

  • Download URL: natricine-0.2.0.tar.gz
  • Upload date:
  • Size: 12.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for natricine-0.2.0.tar.gz
Algorithm Hash digest
SHA256 34d4612604432b82e0069dc6134b26cc02a14bedd50a4f96c37330b67a033493
MD5 76f28b81d4661f48a03219b87f120f38
BLAKE2b-256 870bd623bbba64210981c1cf9a45a3d60e320b4eeb287e5cc3860dd84a6fa04a

See more details on using hashes here.

Provenance

The following attestation bundles were made for natricine-0.2.0.tar.gz:

Publisher: publish.yml on nm523/natricine

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file natricine-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: natricine-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 22.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for natricine-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 90ef61d4ac83a6a8eb0726f6e8f6aab3099372d61426b71acb73a215c1bb3ff2
MD5 eece1e1a59be8c3599667fbbe3f46e3a
BLAKE2b-256 3fb9ae54d3479bdf3d21acdfc58303a2c5bd467adcbe96711102793810737b67

See more details on using hashes here.

Provenance

The following attestation bundles were made for natricine-0.2.0-py3-none-any.whl:

Publisher: publish.yml on nm523/natricine

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page