Skip to main content

Python client for UmaDB event store

Project description

UmaDB Python Client

The Python package umadb provides a synchronous client for reading and appending events to UmaDB over its gPRC API using Rust-powered bindings via PyO3.

It is adapted into the Python eventsourcing library via the eventsourcing-umadb package.

Installation

From PyPI

To install with pip, create and activate a virtual environment, then:

pip install umadb

Connecting to UmaDB

Use the Client class as the main entry point for connecting to an UmaDB server.

class Client(
    url: str,
    ca_path: str | None = None,
    api_key: str | None = None,
    batch_size: int | None = None,
):
    ...

Parameters

Name Type Description
url str Required connection string. If the argument starts with https or grpcs, a secure TLS channel is created; otherwise an insecure channel is used.
ca_path str|None Optional path to a PEM-encoded root/CA certificate for TLS connections (useful for self-signed servers).
api_key str|None Optional API key used for authenticating with the server.
batch_size int|None Optional hint for how many events to buffer per batch when reading. The server may cap this; a sensible default is used if unset..

Connection Examples

from umadb import Client

# Insecure (no TLS)
client = Client("http://localhost:50051")

# Secure with TLS (system CAs)
client = Client("https://example.com:50051")

# Secure with TLS using self-signed CA
client = Client(
    url="https://localhost:50051",
    ca_path="server.pem",
)

# TLS + API key
client = Client(
    url="https://example.com:50051",
    ca_path="server.pem",
    api_key="umadb:example-api-key-4f7c2b1d9e5f4a038c1a",
)

# TLS + API key + batch size hint
client = Client(
    url="https://example.com:50051",
    ca_path="server.pem",
    api_key="umadb:example-api-key-4f7c2b1d9e5f4a038c1a",
    batch_size=1000,
)

Appending Events

The Client.append() method writes new events to an UmaDB server.

def append(
    events: list[Event],
    condition: AppendCondition | None = None,
    tracking_info: TrackingInfo | None = None,
) -> int:
    ...

The Client.append() method can be used to append new Event instances to UmaDB atomically, with an optional append condition, and optional tracking information. Events are written in order.

Conditional appends with event UUIDs are idempotent. The server does not enforce uniqueness of events IDs.

Parameters

Name Type Description
events list[Event] The list of events to append. Each includes an event type, tags, and data payload.
condition AppendCondition|None Optional append condition to ensure no conflicting writes occur.
tracking_info TrackingInfo|None Optional tracking information – for event-processing components only.

Return Value

Returns the sequence number (int) of the last successfully appended event from this operation. This value can be used to wait for downstream event-processing components in a CQRS system to become up-to-date.

Example

import uuid

from umadb import AppendCondition, Client, Event, IntegrityError, Query, QueryItem

client = Client("http://localhost:50051")

# Define a consistency boundary (same query you use while reading)
cb = Query(items=[QueryItem(types=["example"], tags=["tag1", "tag2"])])

# Read to build decision model
read_resp = client.read(query=cb)
for r in read_resp:
    print(f"Existing event at {r.position}: {r.event}")

last_known = read_resp.head()
print("Last known position:", last_known)

# Produce a new event with a UUID (for idempotent retries) and some metadata
ev = Event(
    event_type="example",
    tags=["tag1", "tag2"],
    data=b"Hello, world!",
    uuid=str(uuid.uuid4()),
    metadata={"source": "readme", "correlation_id": str(uuid.uuid4())},
)

# Append with an optimistic condition: fail if conflicting events exist after last_known
cond = AppendCondition(fail_if_events_match=cb, after=last_known)
position1 = client.append([ev], condition=cond)
print("Appended at:", position1)

# Conflicting append should raise an error (e.g. ValueError)
try:
    client.append(
        [
            Event(
                event_type="example",
                tags=["tag1", "tag2"],
                data=b"Hello, world!",
                uuid=str(uuid.uuid4()),
            )
        ],
        condition=cond,
    )
except IntegrityError as e:
    print("Conflicting event was rejected:", e)

# Idempotent retry with same event UUID and condition should return same commit position
position2 = client.append([ev], condition=cond)
assert position1 == position2
print("Idempotent retry returned position:", position2)

Reading Events

The Client.read() method returns recorded events from an UmaDB server.

def read(
    query: Query | None = None,
    start: int | None = None,
    backwards: bool = False,
    limit: int | None = None,
) -> ReadResponse:
    ...

The Client.read() method can be used both for constructing decision models in a domain layer, and for projecting events into materialized views in CQRS. An optional Query can be provided to select by tags and types.

Parameters

Name Type Description
query Query|None Optional structured query to filter events (by tags, event types, etc).
start int|None Read events from this sequence number. Only events with positions greater than or equal will be returned (or less than or equal if backwards is True.
backwards bool If True events will be read backwards, either from the given position or from the last recorded event.
limit int|None Optional cap on the number of events to retrieve.

Return Value

Returns an iterable "read response" instance from which SequencedEvent instances, and the most relevant "last known" sequence number, can be obtained.

Example

from umadb import Client, Query, QueryItem

client = Client("http://localhost:50051")

# Filter by type(s) and tag(s)
q = Query(items=[QueryItem(types=["example"], tags=["tag1", "tag2"])])

resp = client.read(query=q, start=None, backwards=False, limit=None)
for item in resp:
    print(f"Got event at position {item.position}: {item.event}")

last_known = resp.head()
print("Last known position:", last_known)

Subscriptions

The Client.subscribe() method returns recorded events from an UmaDB server, keeping the stream open to deliver future events as they arrive.

def subscribe(
    query: Query | None = None,
    after: int | None = None,
) -> Subscription:
    ...

The Client.subscribe() method can be used for projecting events into materialized views in CQRS. An optional query can be provided to select by tags and types.

Parameters

Name Type Description
query Query|None Optional structured query to filter events (by tags, event types, etc).
after int|None Read events after this sequence number. Only events with a larger sequence number will be returned.

Return Value

Returns an iterable "subscription" instance from which SequencedEvent instances can be obtained.

Example

from umadb import Client, Query, QueryItem

client = Client("http://localhost:50051")

# Filter by type(s) and tag(s)
q = Query(items=[QueryItem(types=["example"], tags=["tag1", "tag2"])])

# Subscribe to all events
for se in client.subscribe():
    print("New event:", se.position, se.event)
    # Break for demo purposes
    break

Getting Head Position

The Client.head() method returns the position of the last event recorded in an UmaDB server.

def head(self) -> int | None: ...

The Client.head() method can be used for counting the number of recorded events in the database, or for determining the position of the last recorded event when subscribing only to new events.

Return Value

Returns the position (u64) of the last recorded event in the event store, or None if no events have been recorded yet.

Getting Tracking Info

The Client.get_tracking_info() method returns the last recorded position in an upstream sequence of events.

def get_tracking_info(self, source: str) -> int | None: ...

The Client.get_tracking_info() method can be used when starting or resuming an event-processing component. The event-processing component will start by requesting new events from the upstream sequence after this position. The position of an upstream event that has been processed successfully can be recorded atomically when appending new events generated by processing that event.

Parameters

Name Type Description
source str Upstream source name.

Returns the last recorded upstream position (int), or None if the sequence name is not found.

Event

An Event represents a single event either to be appended or already stored in the event log.

Field Type Description
event_type str The event’s logical type (e.g. "UserRegistered").
tags list<str> Tags assigned to the event (used for filtering and indexing).
data bytes Binary payload associated with the event.
uuid str|None Unique event ID.
metadata dict[str, str] Optional string key/value pairs stored alongside the event.

Idempotent support for append operations is activated by setting a UUID on appended events.

event_type and each tag in tags may be up to 65535 bytes long. Appending an event with a longer type or tag fails with a validation error (ValueError).

metadata is for information about the event (provenance, correlation IDs, schema version, etc.) rather than the event payload itself. It is stored and returned unchanged, and is not indexed or matched by queries. It defaults to an empty dict when omitted. Each metadata key and value may be up to 65535 bytes long; appending an event with a longer key or value fails with a validation error.

Include in:

  • Append requests when writing new events to the store.

Included in:

  • SequencedEvent objects when the server responds to read requests.

Matched by:

  • QueryItem during read() and append() operations.

Append Condition

An AppendCondition causes an append request to fail if events match its Query, optionally after a sequence number.

Field Type Description
fail_if_events_match Query Query for conflicting events.
after int|None Sequence number.

Include in:

  • Append requests to define optimistic concurrent control.

To implement a consistency boundary, command handlers can use the same Query used when reading events as the value of fail_if_events_match, and the "head" sequence number received from the read response as the value of after.

Tracking Info

A TrackingInfo instance indicates the source and position of an upstream event.

Field Type Description
source str Upstream sequence name.
position int Upstream sequence number.

Include in:

  • Append requests when recording the results of processing an upstream event.

To implement exactly-once semantics in event-processing components, pull events from an upstream source after the last recorded position, then record the upstream positions of upstream events along with new state that results from processing those events. By processing event sequentially in this way, each event will be processed at least once. And by recording tracking information along with new state, the new state will be recorded at most once. The combination of "at least once" processing and "at most once" recording gives "exactly once" semantics from the point of view of consumers of the recorded state.

Query

A Query defines criteria for selecting events in the event store.

Field Type Description
items list[QueryItem] A list of selection criteria (logical OR).

An Event is selected if any QueryItem matches or the items field is empty.

Include in:

  • Read requests to select events returned by the server.
  • An AppendCondition to select conflicting events.

Query Item

A QueryItem defines a criterion for matching events.

Field Type Description
types list[str] List of event types (logical OR).
tags list[str] List of tags (logical AND).

A QueryItem will match an `Event if:

  • one of its types matches the Event.event_type or its types field is empty; AND
  • all of its tags match one of the Event.tags or its tags field is empty.

Sequenced Event

A SequencedEvent represents a recorded Event along with its assigned sequence number.

Field Type Description
position int The sequence number.
event Event The recorded event.

Included in:

  • Read responses when the server responds to read requests.

Error Handling

The Python client raises Python exceptions on error:

  • Integrity/condition failure: IntegrityError
  • Transport/connection errors: TransportError
  • Authentication failures: AuthenticationError
  • Invalid argument errors: ValueError
  • Other internal errors: RuntimeError or OSError

Your application should catch these as appropriate.

Complete Example

import uuid

from umadb import AppendCondition, Client, Event, IntegrityError, Query, QueryItem

# Connect to the gRPC server (TLS + API key)
client = Client(
    url="http://localhost:50051",
)

# Define a consistency boundary
cb = Query(items=[QueryItem(types=["example"], tags=["tag1", "tag2"])])

# Read events for a decision model
read_response = client.read(query=cb)
for result in read_response:
    print(f"Got event at position {result.position}: {result.event}")

# Remember the last-known position
last_known_position = read_response.head()
print("Last known position is:", last_known_position)

# Create an event with a UUID to enable idempotent append, and some metadata
event = Event(
    event_type="example",
    tags=["tag1", "tag2"],
    data=b"Hello, world!",
    uuid=str(uuid.uuid4()),
    metadata={"source": "readme", "correlation_id": str(uuid.uuid4())},
)

# Append event within the consistency boundary
condition = AppendCondition(fail_if_events_match=cb, after=last_known_position)
commit_position1 = client.append([event], condition=condition)
print("Appended event at position:", commit_position1)

# Append conflicting event — expect an error
try:
    conflicting_event = Event(
        event_type="example",
        tags=["tag1", "tag2"],
        data=b"Hello, world!",
        uuid=str(uuid.uuid4()),  # different UUID
    )
    client.append([conflicting_event], condition=condition)
except IntegrityError as e:
    print("Conflicting event was rejected:", e)

# Idempotent retry — same event ID and condition
print("Retrying to append event at position:", last_known_position)
commit_position2 = client.append([event], condition=condition)
assert commit_position1 == commit_position2
print("Append returned same commit position:", commit_position2)

# Subscribe to all events for a projection
for ev in client.subscribe():
    print(f"Processing event at {ev.position}: {ev.event}")
    if ev.position == commit_position2:
        print("Projection has processed new event!")
        break

Example with Tracking

import uuid

from umadb import (
    AppendCondition,
    Client,
    Event,
    IntegrityError,
    Query,
    QueryItem,
    TrackingInfo,
)

# Connect to the gRPC server (TLS + API key)
client = Client(
    url="http://localhost:50051",
)

# Get last processed upstream event position
last_processed_position = client.get_tracking_info("upstream")

# Pull next unprocessed upstream event...
next_upstream_event_position = 1 + (last_processed_position or 0)

# Construct tracking information from next unprocessed event
tracking_info = TrackingInfo("upstream", next_upstream_event_position)

# Define a consistency boundary
cb = Query(items=[QueryItem(types=["example"], tags=["tag1", "tag2"])])

# Read events for a decision model
read_response = client.read(query=cb)
for result in read_response:
    print(f"Got event at position {result.position}: {result.event}")

# Remember the last-known position
last_known_position = read_response.head()
print("Last known position is:", last_known_position)

# Create an event with a UUID to enable idempotent append
event = Event(
    event_type="example",
    tags=["tag1", "tag2"],
    data=b"Hello, world!",
    uuid=str(uuid.uuid4()),
)

# Append event within the consistency boundary
condition = AppendCondition(fail_if_events_match=cb, after=last_known_position)
commit_position1 = client.append(
    [event], condition=condition, tracking_info=tracking_info
)
print("Appended event at position:", commit_position1)

# Idempotent retry — same event ID and condition
print("Retrying to append event at position:", last_known_position)
commit_position2 = client.append(
    [event], condition=condition, tracking_info=tracking_info
)
assert commit_position1 == commit_position2
print("Append returned same commit position:", commit_position2)

# Check tracking information
assert tracking_info.position == client.get_tracking_info("upstream")

# Unconditional append with conflicting tracking information — expect an error
try:
    conflicting_event = Event(
        event_type="example",
        tags=["tag1", "tag2"],
        data=b"Hello, world!",
        uuid=str(uuid.uuid4()),  # different UUID
    )
    client.append([conflicting_event], condition=None, tracking_info=tracking_info)
except IntegrityError as e:
    print("Conflicting event was rejected:", e)

Notes

  • Python client is synchronous and blocking; if you need async integration, run client calls in a thread pool or use an async worker that offloads to threads.
  • Event data is binary (bytes). Use a consistent serialization (e.g., JSON serialized to UTF-8 bytes, protobuf, msgpack) for your domain.
  • API keys must match the server configuration.
  • For TLS with self-signed certs, pass ca_path with your root/CA certificate.

Testing

The Python bindings can be tested by running a UmaDB server and executing Python code against it.

License

Licensed under either of:

at your option.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

umadb-0.6.0.tar.gz (66.2 kB view details)

Uploaded Source

Built Distributions

If you're not sure about the file name format, learn more about wheel file names.

umadb-0.6.0-cp310-abi3-win_amd64.whl (1.6 MB view details)

Uploaded CPython 3.10+Windows x86-64

umadb-0.6.0-cp310-abi3-win32.whl (1.4 MB view details)

Uploaded CPython 3.10+Windows x86

umadb-0.6.0-cp310-abi3-musllinux_1_2_x86_64.whl (2.4 MB view details)

Uploaded CPython 3.10+musllinux: musl 1.2+ x86-64

umadb-0.6.0-cp310-abi3-musllinux_1_2_aarch64.whl (2.2 MB view details)

Uploaded CPython 3.10+musllinux: musl 1.2+ ARM64

umadb-0.6.0-cp310-abi3-manylinux_2_28_x86_64.whl (2.2 MB view details)

Uploaded CPython 3.10+manylinux: glibc 2.28+ x86-64

umadb-0.6.0-cp310-abi3-manylinux_2_28_aarch64.whl (2.1 MB view details)

Uploaded CPython 3.10+manylinux: glibc 2.28+ ARM64

umadb-0.6.0-cp310-abi3-macosx_11_0_arm64.whl (2.0 MB view details)

Uploaded CPython 3.10+macOS 11.0+ ARM64

umadb-0.6.0-cp310-abi3-macosx_10_12_x86_64.whl (2.1 MB view details)

Uploaded CPython 3.10+macOS 10.12+ x86-64

File details

Details for the file umadb-0.6.0.tar.gz.

File metadata

  • Download URL: umadb-0.6.0.tar.gz
  • Upload date:
  • Size: 66.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: maturin/1.14.1

File hashes

Hashes for umadb-0.6.0.tar.gz
Algorithm Hash digest
SHA256 31829cb1409c85b61a2ca038535d13970359437eb7c01cb0fda2f76ccdb7bb73
MD5 80473d0885aaf4bb55344347eee9112c
BLAKE2b-256 7168da3edf011dde197e736bcb917ae792fdff7047143a5685e8e4cda2d702a9

See more details on using hashes here.

File details

Details for the file umadb-0.6.0-cp310-abi3-win_amd64.whl.

File metadata

  • Download URL: umadb-0.6.0-cp310-abi3-win_amd64.whl
  • Upload date:
  • Size: 1.6 MB
  • Tags: CPython 3.10+, Windows x86-64
  • Uploaded using Trusted Publishing? No
  • Uploaded via: maturin/1.14.1

File hashes

Hashes for umadb-0.6.0-cp310-abi3-win_amd64.whl
Algorithm Hash digest
SHA256 a85142ed9104ea4e3e0bccf06f08b91e01002d34730f0bf0fef7d012b5b6fe1c
MD5 b37bf26a7fbfa87dcfa31ad7ad995181
BLAKE2b-256 0eb8c6b7d4adb564a00a0b8bfb0a5e41b627fc39c95f8469df4bb401abdb2001

See more details on using hashes here.

File details

Details for the file umadb-0.6.0-cp310-abi3-win32.whl.

File metadata

  • Download URL: umadb-0.6.0-cp310-abi3-win32.whl
  • Upload date:
  • Size: 1.4 MB
  • Tags: CPython 3.10+, Windows x86
  • Uploaded using Trusted Publishing? No
  • Uploaded via: maturin/1.14.1

File hashes

Hashes for umadb-0.6.0-cp310-abi3-win32.whl
Algorithm Hash digest
SHA256 cd470c2027d659b3c0b83853d32334211ff4107aaed25b1468d302e0611a9693
MD5 06a4d814d352683bb4777f907616958e
BLAKE2b-256 b996bafb4db1d403164c564d8d18f427930293e0bfaab4324cbc0736f982f589

See more details on using hashes here.

File details

Details for the file umadb-0.6.0-cp310-abi3-musllinux_1_2_x86_64.whl.

File metadata

File hashes

Hashes for umadb-0.6.0-cp310-abi3-musllinux_1_2_x86_64.whl
Algorithm Hash digest
SHA256 6c03d9bf591bef1ceb2a4d117a51b68fad23ac0f60bf2fa3801bfff1fc0d61b9
MD5 983c2519d82722eba7e0461fa91633fb
BLAKE2b-256 47ef4b0f62b155f33c74cd7d0013a5e80576fb09c2559badf335cba5ea8666f5

See more details on using hashes here.

File details

Details for the file umadb-0.6.0-cp310-abi3-musllinux_1_2_aarch64.whl.

File metadata

File hashes

Hashes for umadb-0.6.0-cp310-abi3-musllinux_1_2_aarch64.whl
Algorithm Hash digest
SHA256 c1ca2de59717e6b37ecc923159917c81dda830720708eea2713f52bdd1a4b9f1
MD5 3d7223c92e85faa652a1ae90749f0405
BLAKE2b-256 d93d03e0cd2361d6b2b4b8f3631649a7f73e598ee57d1089403209c528aa249c

See more details on using hashes here.

File details

Details for the file umadb-0.6.0-cp310-abi3-manylinux_2_28_x86_64.whl.

File metadata

File hashes

Hashes for umadb-0.6.0-cp310-abi3-manylinux_2_28_x86_64.whl
Algorithm Hash digest
SHA256 943b6c8113d8d1caec2171a49ff87198489a9531abc869f9e4b5dc49fff14386
MD5 52aa14ca3d1007f850c4b087861198e3
BLAKE2b-256 61054798057049c6eef341e032c0a7c0b11189ffead750cbfdd25e6f7308293d

See more details on using hashes here.

File details

Details for the file umadb-0.6.0-cp310-abi3-manylinux_2_28_aarch64.whl.

File metadata

File hashes

Hashes for umadb-0.6.0-cp310-abi3-manylinux_2_28_aarch64.whl
Algorithm Hash digest
SHA256 f85683c190f0629c3067f7a4c8469972a4d18a76644d61b3c88ce68dce60a0b9
MD5 5f0f1ed1067f1fe07acca8ccda8ff3d7
BLAKE2b-256 6eaaa0c2c547b4dfde1c4d6932b8d7995fea20e8b52e497e1a3801aa7d17eaa5

See more details on using hashes here.

File details

Details for the file umadb-0.6.0-cp310-abi3-macosx_11_0_arm64.whl.

File metadata

File hashes

Hashes for umadb-0.6.0-cp310-abi3-macosx_11_0_arm64.whl
Algorithm Hash digest
SHA256 56ab7c787cf93bd1752fd514cd84eff108b93c9f61deb5406dcb4cedfb4fbd24
MD5 60427b7b642b6bdcc68d87d91f287e7e
BLAKE2b-256 9cbe8455d63e93cf23de2c86ab02bc92491a87d772bfaccf2ed401c44b21e013

See more details on using hashes here.

File details

Details for the file umadb-0.6.0-cp310-abi3-macosx_10_12_x86_64.whl.

File metadata

File hashes

Hashes for umadb-0.6.0-cp310-abi3-macosx_10_12_x86_64.whl
Algorithm Hash digest
SHA256 1471d21115b41c0232b7ba3dd1fb2d3733e885e6ff2a99a951c766f4ef16b100
MD5 22068060ef681b9786a849cf2011e381
BLAKE2b-256 019f802e396b5882f3f9b257401c3cd121fc2851770738f12388df3d1f5ea70c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page