Skip to main content

Event-driven architecture toolkit for Python

Project description

natricine

Python port of watermill. Async-first event-driven architecture toolkit. "Vibe ported" with Opus 4.5, as such there are no guarantees at this stage of compatibility.

Installation

pip install natricine

# Backends
pip install natricine-redisstream    # Redis Streams
pip install natricine-aws            # AWS SNS/SQS
pip install natricine-sql            # PostgreSQL/SQLite
pip install natricine-http           # HTTP webhooks
pip install natricine_otel           # OpenTelemetry tracing/metrics

Quick Start

import asyncio
from natricine.pubsub import InMemoryPubSub, Message
from natricine.cqrs import CommandBus, EventBus, PydanticMarshaler
from pydantic import BaseModel

# Define commands and events
class CreateUser(BaseModel):
    user_id: str
    name: str

class UserCreated(BaseModel):
    user_id: str
    name: str

# Set up buses
pubsub = InMemoryPubSub()
marshaler = PydanticMarshaler()
command_bus = CommandBus(pubsub, pubsub, marshaler)
event_bus = EventBus(pubsub, pubsub, marshaler)

# Register handlers
@command_bus.handler
async def handle_create_user(cmd: CreateUser) -> None:
    print(f"Creating user: {cmd.name}")
    await event_bus.publish(UserCreated(user_id=cmd.user_id, name=cmd.name))

@event_bus.handler
async def on_user_created(event: UserCreated) -> None:
    print(f"User created: {event.name}")

# Run
async def main():
    async with pubsub:
        async with asyncio.TaskGroup() as tg:
            tg.create_task(command_bus.run())
            tg.create_task(event_bus.run())

            await command_bus.send(CreateUser(user_id="1", name="Alice"))
            await asyncio.sleep(0.1)

            await command_bus.close()
            await event_bus.close()

asyncio.run(main())

Package Structure

natricine                     # Core package
├── pubsub                    # Message, Publisher, Subscriber, InMemoryPubSub
├── router                    # Router, Middleware
└── cqrs                      # CommandBus, EventBus

natricine_redis               # pip install natricine-redisstream
natricine_aws                 # pip install natricine-aws
natricine_sql                 # pip install natricine-sql
natricine_http                # pip install natricine-http
natricine_otel                # pip install natricine_otel

Backends

In-Memory (built-in)

from natricine.pubsub import InMemoryPubSub

pubsub = InMemoryPubSub()

Redis Streams

from redis.asyncio import Redis
from natricine_redis import RedisStreamPublisher, RedisStreamSubscriber

redis = Redis.from_url("redis://localhost:6379")
publisher = RedisStreamPublisher(redis)
subscriber = RedisStreamSubscriber(redis, group_name="my-app", consumer_name="worker-1")

AWS SQS/SNS

import aioboto3
from natricine_aws import SQSPublisher, SQSSubscriber, SNSPublisher, SNSSubscriber, SNSConfig

session = aioboto3.Session()

# Direct SQS
publisher = SQSPublisher(session)
subscriber = SQSSubscriber(session)

# SNS fan-out to SQS
sns_publisher = SNSPublisher(session)
sns_subscriber = SNSSubscriber(session, config=SNSConfig(consumer_group="my-group"))

Middleware

from natricine.router import Router, retry, timeout

router = Router()
router.add_middleware(retry(max_retries=3, delay=1.0))
router.add_middleware(timeout(seconds=30))

References

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

natricine-0.1.1.tar.gz (11.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

natricine-0.1.1-py3-none-any.whl (20.8 kB view details)

Uploaded Python 3

File details

Details for the file natricine-0.1.1.tar.gz.

File metadata

  • Download URL: natricine-0.1.1.tar.gz
  • Upload date:
  • Size: 11.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for natricine-0.1.1.tar.gz
Algorithm Hash digest
SHA256 d695635c37484126d8a89a57fe15529c6bf727834b1bbd70568e42130aea9b8b
MD5 02cc70572a6a9b85936d8330034661d9
BLAKE2b-256 e36026819118b80899d850a2680bca2eb774f32960bbe33904a3052895046b1d

See more details on using hashes here.

Provenance

The following attestation bundles were made for natricine-0.1.1.tar.gz:

Publisher: publish.yml on nm523/natricine

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file natricine-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: natricine-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 20.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for natricine-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 b615db61cd47f8b94fc8cf3db2d4347bc92470fc823023ddc813d21997a32cb2
MD5 d76b98af6075bc256f59f010e32406c5
BLAKE2b-256 b392323303b6c38e73e58fe7c58f53fb4ddc9d8a26b90662bec31dfa3cc999d8

See more details on using hashes here.

Provenance

The following attestation bundles were made for natricine-0.1.1-py3-none-any.whl:

Publisher: publish.yml on nm523/natricine

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page