Skip to main content

Message Store is an event sourcing implementation on top of NATS JetStream

Project description

Test Coverage

Zencastr Message Store for Python

Message store implementation in Python

This package implements an event sourcing model of storing application data. It is similar to Eventide.

Usage

# python3 -m asyncio
client = await nats.connect("nats://localhost:4222")
message_store = MessageStore(client, "env", should_create_missing_streams=True)

# Create stream
await message_store.ensure_stream("stream-name")

# Prepare subscription with handlers
subscription = message_store.create_subscription(
    "stream-name.>",
    "durable-consumer-name",
    handlers={
        "Command": lambda msg: print(msg.seq, msg.subject, msg.data, sep="\t"),
        "FailingCommand": lambda msg: 1 / 0,
    },
)
subscription.start()

# Publish messages and await processing

await message_store.publish_message("stream-name.unique-id1", Message("Command", {"key": "value"}))
# `1   stream-name.unique-id   {'key': 'value'}`
await message_store.publish_message("stream-name.unique-id2", Message("FailingCommand", {"key": "badvalue"}))
# NOTE: Default behavior is to retry 3 times, on the 4th attempt it will TERM the message
# Failed to handle message with subject env.stream-name.unique-id2, seq: 2, data: b'{"type": "FailingCommand", "data": {"key": "badvalue"}}', exception: ZeroDivisionError division by zero
# Failed to handle message with subject env.stream-name.unique-id2, seq: 2, data: b'{"type": "FailingCommand", "data": {"key": "badvalue"}}', exception: ZeroDivisionError division by zero
# Failed to handle message with subject env.stream-name.unique-id2, seq: 2, data: b'{"type": "FailingCommand", "data": {"key": "badvalue"}}', exception: ZeroDivisionError division by zero
# Giving up on processing message #2, subject env.stream-name.unique-id2 from stream env-stream-name. This attempt (#4) exceeds max
await asyncio.sleep(1)  # Leave time for the messages to be processed

Authors

  • Rui Figueiredo (@ruidfigueiredo)
  • Alex Cannan (@alexcannan)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

message_store-0.3.0.tar.gz (12.3 kB view details)

Uploaded Source

Built Distribution

message_store-0.3.0-py3-none-any.whl (13.1 kB view details)

Uploaded Python 3

File details

Details for the file message_store-0.3.0.tar.gz.

File metadata

  • Download URL: message_store-0.3.0.tar.gz
  • Upload date:
  • Size: 12.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.12.7

File hashes

Hashes for message_store-0.3.0.tar.gz
Algorithm Hash digest
SHA256 e2a96418017174b31662b9f95034120056b44a56ff7f9012ac155986056a19d6
MD5 9e933c9d8d0c13d834fc9ea7f596a1eb
BLAKE2b-256 f08c06ab13d892cb4e11b98d0f03cd416a50e34b972e0f102ecb1baf2a6a601e

See more details on using hashes here.

File details

Details for the file message_store-0.3.0-py3-none-any.whl.

File metadata

File hashes

Hashes for message_store-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 883e5583e7458d52451712a3b4a57d6221d2b3a971573567e3147a6598968d03
MD5 447a5f9626e4be61f7c02c2a32627164
BLAKE2b-256 0227c6b346acb81f3c80a979b9f45ef5c195b0335872abf558806da30504786f

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page