Skip to main content

Python client for UmaDB event store

Project description

UmaDB Python Client

The Python package umadb provides a synchronous client for reading and appending events to UmaDB over its gPRC API using Rust-powered bindings via PyO3.

It is adapted into the Python eventsourcing library via the eventsourcing-umadb package.

Installation

From PyPI

To install with pip, create and activate a virtual environment, then:

pip install umadb

Connecting to UmaDB

Use the Client class as the main entry point for connecting to an UmaDB server.

class Client(
    url: str,
    ca_path: str | None = None,
    api_key: str | None = None,
    batch_size: int | None = None,
):
    ...

Parameters

Name Type Description
url str Required connection string. If the argument starts with https or grpcs, a secure TLS channel is created; otherwise an insecure channel is used.
ca_path str|None Optional path to a PEM-encoded root/CA certificate for TLS connections (useful for self-signed servers).
api_key str|None Optional API key used for authenticating with the server.
batch_size int|None Optional hint for how many events to buffer per batch when reading. The server may cap this; a sensible default is used if unset..

Connection Examples

from umadb import Client

# Insecure (no TLS)
client = Client("http://localhost:50051")

# Secure with TLS (system CAs)
client = Client("https://example.com:50051")

# Secure with TLS using self-signed CA
client = Client(
    url="https://localhost:50051",
    ca_path="server.pem",
)

# TLS + API key
client = Client(
    url="https://example.com:50051",
    ca_path="server.pem",
    api_key="umadb:example-api-key-4f7c2b1d9e5f4a038c1a",
)

# TLS + API key + batch size hint
client = Client(
    url="https://example.com:50051",
    ca_path="server.pem",
    api_key="umadb:example-api-key-4f7c2b1d9e5f4a038c1a",
    batch_size=1000,
)

Appending Events

The Client.append() method writes new events to an UmaDB server.

def append(
    events: list[Event],
    condition: AppendCondition | None = None,
    tracking_info: TrackingInfo | None = None,
) -> int:
    ...

The Client.append() method can be used to append new Event instances to UmaDB atomically, with an optional append condition, and optional tracking information. Events are written in order.

Conditional appends with event UUIDs are idempotent. The server does not enforce uniqueness of events IDs.

Parameters

Name Type Description
events list[Event] The list of events to append. Each includes an event type, tags, and data payload.
condition AppendCondition|None Optional append condition to ensure no conflicting writes occur.
tracking_info TrackingInfo|None Optional tracking information – for event-processing components only.

Return Value

Returns the sequence number (int) of the last successfully appended event from this operation. This value can be used to wait for downstream event-processing components in a CQRS system to become up-to-date.

Example

import uuid

from umadb import AppendCondition, Client, Event, IntegrityError, Query, QueryItem

client = Client("http://localhost:50051")

# Define a consistency boundary (same query you use while reading)
cb = Query(items=[QueryItem(types=["example"], tags=["tag1", "tag2"])])

# Read to build decision model
read_resp = client.read(query=cb)
for r in read_resp:
    print(f"Existing event at {r.position}: {r.event}")

last_known = read_resp.head()
print("Last known position:", last_known)

# Produce a new event with a UUID (for idempotent retries)
ev = Event(
    event_type="example",
    tags=["tag1", "tag2"],
    data=b"Hello, world!",
    uuid=str(uuid.uuid4()),
)

# Append with an optimistic condition: fail if conflicting events exist after last_known
cond = AppendCondition(fail_if_events_match=cb, after=last_known)
position1 = client.append([ev], condition=cond)
print("Appended at:", position1)

# Conflicting append should raise an error (e.g. ValueError)
try:
    client.append(
        [
            Event(
                event_type="example",
                tags=["tag1", "tag2"],
                data=b"Hello, world!",
                uuid=str(uuid.uuid4()),
            )
        ],
        condition=cond,
    )
except IntegrityError as e:
    print("Conflicting event was rejected:", e)

# Idempotent retry with same event UUID and condition should return same commit position
position2 = client.append([ev], condition=cond)
assert position1 == position2
print("Idempotent retry returned position:", position2)

Reading Events

The Client.read() method returns recorded events from an UmaDB server.

def read(
    query: Query | None = None,
    start: int | None = None,
    backwards: bool = False,
    limit: int | None = None,
    subscribe: bool = False,
) -> ReadResponse:
    ...

The Client.read() method can be used both for constructing decision models in a domain layer, and for projecting events into materialized views in CQRS. An optional Query can be provided to select by tags and types.

Parameters

Name Type Description
query Query|None Optional structured query to filter events (by tags, event types, etc).
start int|None Read events from this sequence number. Only events with positions greater than or equal will be returned (or less than or equal if backwards is True.
backwards bool If True events will be read backwards, either from the given position or from the last recorded event.
limit int|None Optional cap on the number of events to retrieve.
subscribe bool If True, keeps the stream open to deliver future events as they arrive.

Return Value

Returns an iterable "read response" instance from which SequencedEvent instances, and the most relevant "last known" sequence number, can be obtained.

Example

from umadb import Client, Query, QueryItem

client = Client("http://localhost:50051")

# Filter by type(s) and tag(s)
q = Query(items=[QueryItem(types=["example"], tags=["tag1", "tag2"])])

resp = client.read(
    query=q, start=None, backwards=False, limit=None, subscribe=False
)
for item in resp:
    print(f"Got event at position {item.position}: {item.event}")

last_known = resp.head()
print("Last known position:", last_known)

# Subscribe to new events
subscription = client.read(subscribe=True)
for se in subscription:
    print("New event:", se.position, se.event)
    # Break for demo purposes
    break

Getting Head Position

The Client.head() method returns the position of the last event recorded in an UmaDB server.

def head(self) -> int | None: ...

The Client.head() method can be used both for constructing decision models in a domain layer, and for projecting events into materialized views in CQRS. An optional DCBQuery can be provided to select by tags and types.

Return Value

Returns the position (u64) of the last recorded event in the event store, or None if no events have been recorded yet.

Getting Tracking Info

The Client.get_tracking_info() method returns the last recorded position in an upstream sequence of events.

def get_tracking_info(self, source: str) -> int | None: ...

The Client.get_tracking_info() method can be used when starting or resuming an event-processing component. The event-processing component will start by requesting new events from the upstream sequence after this position. The position of an upstream event that has been processed successfully can be recorded atomically when appending new events generated by processing that event.

Parameters

Name Type Description
source str Upstream source name.

Returns the last recorded upstream position (int), or None if the sequence name is not found.

Event

An Event represents a single event either to be appended or already stored in the event log.

Field Type Description
event_type str The event’s logical type (e.g. "UserRegistered").
tags list<str> Tags assigned to the event (used for filtering and indexing).
data bytes Binary payload associated with the event.
uuid str|None Unique event ID.

Idempotent support for append operations is activated by setting a UUID on appended events.

Include in:

  • Append requests when writing new events to the store.

Included in:

  • SequencedEvent objects when the server responds to read requests.

Matched by:

  • QueryItem during read() and append() operations.

Append Condition

An AppendCondition causes an append request to fail if events match its Query, optionally after a sequence number.

Field Type Description
fail_if_events_match Query Query for conflicting events.
after int|None Sequence number.

Include in:

  • Append requests to define optimistic concurrent control.

To implement a consistency boundary, command handlers can use the same Query used when reading events as the value of fail_if_events_match, and the "head" sequence number received from the read response as the value of after.

Tracking Info

A TrackingInfo instance indicates the source and position of an upstream event.

Field Type Description
source str Upstream sequence name.
position int Upstream sequence number.

Include in:

  • Append requests when recording the results of processing an upstream event.

To implement exactly-once semantics in event-processing components, pull events from an upstream source after the last recorded position, then record the upstream positions of upstream events along with new state that results from processing those events. By processing event sequentially in this way, each event will be processed at least once. And by recording tracking information along with new state, the new state will be recorded at most once. The combination of "at least once" processing and "at most once" recording gives "exactly once" semantics from the point of view of consumers of the recorded state.

Query

A Query defines criteria for selecting events in the event store.

Field Type Description
items list[QueryItem] A list of selection criteria (logical OR).

An Event is selected if any QueryItem matches or the items field is empty.

Include in:

  • Read requests to select events returned by the server.
  • An AppendCondition to select conflicting events.

Query Item

A QueryItem defines a criterion for matching events.

Field Type Description
types list[str] List of event types (logical OR).
tags list[str] List of tags (logical AND).

A QueryItem will match an `Event if:

  • one of its types matches the Event.event_type or its types field is empty; AND
  • all of its tags match one of the Event.tags or its tags field is empty.

Sequenced Event

A SequencedEvent represents a recorded Event along with its assigned sequence number.

Field Type Description
position int The sequence number.
event Event The recorded event.

Included in:

  • Read responses when the server responds to read requests.

Error Handling

The Python client raises Python exceptions on error:

  • Integrity/condition failure: IntegrityError
  • Transport/connection errors: TransportError
  • Authentication failures: AuthenticationError
  • Invalid argument errors: ValueError
  • Other internal errors: RuntimeError or OSError

Your application should catch these as appropriate.

Complete Example

import uuid

from umadb import AppendCondition, Client, Event, IntegrityError, Query, QueryItem

# Connect to the gRPC server (TLS + API key)
client = Client(
    url="http://localhost:50051",
)

# Define a consistency boundary
cb = Query(items=[QueryItem(types=["example"], tags=["tag1", "tag2"])])

# Read events for a decision model
read_response = client.read(query=cb)
for result in read_response:
    print(f"Got event at position {result.position}: {result.event}")

# Remember the last-known position
last_known_position = read_response.head()
print("Last known position is:", last_known_position)

# Create an event with a UUID to enable idempotent append
event = Event(
    event_type="example",
    tags=["tag1", "tag2"],
    data=b"Hello, world!",
    uuid=str(uuid.uuid4()),
)

# Append event within the consistency boundary
condition = AppendCondition(fail_if_events_match=cb, after=last_known_position)
commit_position1 = client.append([event], condition=condition)
print("Appended event at position:", commit_position1)

# Append conflicting event — expect an error
try:
    conflicting_event = Event(
        event_type="example",
        tags=["tag1", "tag2"],
        data=b"Hello, world!",
        uuid=str(uuid.uuid4()),  # different UUID
    )
    client.append([conflicting_event], condition=condition)
except IntegrityError as e:
    print("Conflicting event was rejected:", e)

# Idempotent retry — same event ID and condition
print("Retrying to append event at position:", last_known_position)
commit_position2 = client.append([event], condition=condition)
assert commit_position1 == commit_position2
print("Append returned same commit position:", commit_position2)

# Subscribe to all events for a projection
subscription = client.read(subscribe=True)
for ev in subscription:
    print(f"Processing event at {ev.position}: {ev.event}")
    if ev.position == commit_position2:
        print("Projection has processed new event!")
        break

Example with Tracking

import uuid

from umadb import (
    AppendCondition,
    Client,
    Event,
    IntegrityError,
    Query,
    QueryItem,
    TrackingInfo,
)

# Connect to the gRPC server (TLS + API key)
client = Client(
    url="http://localhost:50051",
)

# Get last processed upstream event position
last_processed_position = client.get_tracking_info("upstream")

# Pull next unprocessed upstream event...
next_upstream_event_position = 1 + (last_processed_position or 0)

# Construct tracking information from next unprocessed event
tracking_info = TrackingInfo("upstream", next_upstream_event_position)

# Define a consistency boundary
cb = Query(items=[QueryItem(types=["example"], tags=["tag1", "tag2"])])

# Read events for a decision model
read_response = client.read(query=cb)
for result in read_response:
    print(f"Got event at position {result.position}: {result.event}")

# Remember the last-known position
last_known_position = read_response.head()
print("Last known position is:", last_known_position)

# Create an event with a UUID to enable idempotent append
event = Event(
    event_type="example",
    tags=["tag1", "tag2"],
    data=b"Hello, world!",
    uuid=str(uuid.uuid4()),
)

# Append event within the consistency boundary
condition = AppendCondition(fail_if_events_match=cb, after=last_known_position)
commit_position1 = client.append(
    [event], condition=condition, tracking_info=tracking_info
)
print("Appended event at position:", commit_position1)

# Idempotent retry — same event ID and condition
print("Retrying to append event at position:", last_known_position)
commit_position2 = client.append(
    [event], condition=condition, tracking_info=tracking_info
)
assert commit_position1 == commit_position2
print("Append returned same commit position:", commit_position2)

# Check tracking information
assert tracking_info.position == client.get_tracking_info("upstream")

# Unconditional append with conflicting tracking information — expect an error
try:
    conflicting_event = Event(
        event_type="example",
        tags=["tag1", "tag2"],
        data=b"Hello, world!",
        uuid=str(uuid.uuid4()),  # different UUID
    )
    client.append([conflicting_event], condition=None, tracking_info=tracking_info)
except IntegrityError as e:
    print("Conflicting event was rejected:", e)

Notes

  • Python client is synchronous and blocking; if you need async integration, run client calls in a thread pool or use an async worker that offloads to threads.
  • Event data is binary (bytes). Use a consistent serialization (e.g., JSON serialized to UTF-8 bytes, protobuf, msgpack) for your domain.
  • API keys must match the server configuration.
  • For TLS with self-signed certs, pass ca_path with your root/CA certificate.

Testing

The Python bindings can be tested by running a UmaDB server and executing Python code against it.

License

Licensed under either of:

at your option.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

umadb-0.3.1.tar.gz (60.9 kB view details)

Uploaded Source

Built Distributions

If you're not sure about the file name format, learn more about wheel file names.

umadb-0.3.1-cp310-abi3-win_amd64.whl (1.9 MB view details)

Uploaded CPython 3.10+Windows x86-64

umadb-0.3.1-cp310-abi3-win32.whl (1.6 MB view details)

Uploaded CPython 3.10+Windows x86

umadb-0.3.1-cp310-abi3-musllinux_1_2_x86_64.whl (2.4 MB view details)

Uploaded CPython 3.10+musllinux: musl 1.2+ x86-64

umadb-0.3.1-cp310-abi3-musllinux_1_2_aarch64.whl (2.2 MB view details)

Uploaded CPython 3.10+musllinux: musl 1.2+ ARM64

umadb-0.3.1-cp310-abi3-manylinux_2_28_x86_64.whl (2.3 MB view details)

Uploaded CPython 3.10+manylinux: glibc 2.28+ x86-64

umadb-0.3.1-cp310-abi3-manylinux_2_28_aarch64.whl (2.0 MB view details)

Uploaded CPython 3.10+manylinux: glibc 2.28+ ARM64

umadb-0.3.1-cp310-abi3-macosx_11_0_arm64.whl (2.0 MB view details)

Uploaded CPython 3.10+macOS 11.0+ ARM64

umadb-0.3.1-cp310-abi3-macosx_10_12_x86_64.whl (2.2 MB view details)

Uploaded CPython 3.10+macOS 10.12+ x86-64

File details

Details for the file umadb-0.3.1.tar.gz.

File metadata

  • Download URL: umadb-0.3.1.tar.gz
  • Upload date:
  • Size: 60.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: maturin/1.11.5

File hashes

Hashes for umadb-0.3.1.tar.gz
Algorithm Hash digest
SHA256 a232689aa5f34f2c62525a3ec6fc346e1fcda8ee61c398258c0834b34550659a
MD5 56bf71bc2a58d55e775de7c911d10ca8
BLAKE2b-256 a41fe43552a12aed52f9aa2f4763acfdfb45ad9f0c611279dacf4a6c774f6a27

See more details on using hashes here.

File details

Details for the file umadb-0.3.1-cp310-abi3-win_amd64.whl.

File metadata

  • Download URL: umadb-0.3.1-cp310-abi3-win_amd64.whl
  • Upload date:
  • Size: 1.9 MB
  • Tags: CPython 3.10+, Windows x86-64
  • Uploaded using Trusted Publishing? No
  • Uploaded via: maturin/1.11.5

File hashes

Hashes for umadb-0.3.1-cp310-abi3-win_amd64.whl
Algorithm Hash digest
SHA256 8d49951ecd2b0b041f79903d37d32fc5928a3719293a9af2450b115a205f7514
MD5 f2390c25e2942d71f495ceb7b21b25d5
BLAKE2b-256 2ba4dad5a4e70a5508d21f60d0b22b35867d9f1da771c87842d8c94ca9aca517

See more details on using hashes here.

File details

Details for the file umadb-0.3.1-cp310-abi3-win32.whl.

File metadata

  • Download URL: umadb-0.3.1-cp310-abi3-win32.whl
  • Upload date:
  • Size: 1.6 MB
  • Tags: CPython 3.10+, Windows x86
  • Uploaded using Trusted Publishing? No
  • Uploaded via: maturin/1.11.5

File hashes

Hashes for umadb-0.3.1-cp310-abi3-win32.whl
Algorithm Hash digest
SHA256 e3ee9ea0e6715d807dd3fb1ff1b3608942a3842ce8a5dd5cd4b2c122279b1602
MD5 6b28dea3ddbbb708b93a94468c7133a7
BLAKE2b-256 39dc9335bce0d7250a4633a9327528561ffc032ef8d58895eb2593473836b586

See more details on using hashes here.

File details

Details for the file umadb-0.3.1-cp310-abi3-musllinux_1_2_x86_64.whl.

File metadata

File hashes

Hashes for umadb-0.3.1-cp310-abi3-musllinux_1_2_x86_64.whl
Algorithm Hash digest
SHA256 a0e4f1f1f6a31b6711e395cc5f5c25912e1111baf26db9b6f758fef9f856ade8
MD5 95ea1c153e4dfbe1ff09eaf73bcdc38c
BLAKE2b-256 2afbcff0a4981b33cfb47fe728de7012ce06edfba6dd2e2cf4e86c9ee0a0bb39

See more details on using hashes here.

File details

Details for the file umadb-0.3.1-cp310-abi3-musllinux_1_2_aarch64.whl.

File metadata

File hashes

Hashes for umadb-0.3.1-cp310-abi3-musllinux_1_2_aarch64.whl
Algorithm Hash digest
SHA256 b23d46a2a19de0d10bfc535feb5560cee4a3ef075cd6e6444e9732e11d1952e6
MD5 f41001ac1a1506e8e7bdeb35cc01d8c2
BLAKE2b-256 96fcc3729a33d08b13be3575d00511408e98475aca0726cf0d87a61f4f8be8cf

See more details on using hashes here.

File details

Details for the file umadb-0.3.1-cp310-abi3-manylinux_2_28_x86_64.whl.

File metadata

File hashes

Hashes for umadb-0.3.1-cp310-abi3-manylinux_2_28_x86_64.whl
Algorithm Hash digest
SHA256 d563c459386019bb18159b3e3edb0f5b550f7e56d64bee49c8f5640747b661fc
MD5 07f46c8a042d9465a6575a7938715e84
BLAKE2b-256 bafea891bbd049af485701287ff2cfc78e9f694abdcd16f9430ef3b03af2bf45

See more details on using hashes here.

File details

Details for the file umadb-0.3.1-cp310-abi3-manylinux_2_28_aarch64.whl.

File metadata

File hashes

Hashes for umadb-0.3.1-cp310-abi3-manylinux_2_28_aarch64.whl
Algorithm Hash digest
SHA256 b22aa914af65d4872e000581eab6f82545905f84c01108a227af19a483e0db92
MD5 7463a9701c3ccaf43916cab3afe945ef
BLAKE2b-256 ae369a052c8fd98a3755f0bdf22918f95f103fb39c00fe94f81ec4c4ec00e724

See more details on using hashes here.

File details

Details for the file umadb-0.3.1-cp310-abi3-macosx_11_0_arm64.whl.

File metadata

File hashes

Hashes for umadb-0.3.1-cp310-abi3-macosx_11_0_arm64.whl
Algorithm Hash digest
SHA256 94c8d1bdfe53c03ef3c09838d1bb7414c96840e02277f268906cac8b36e92eca
MD5 799ab59231b79e0d87e030fdf3eb1095
BLAKE2b-256 43a8a87d57a6719a832dcb383010d829c7f35dd0e4a31304d982f37859558126

See more details on using hashes here.

File details

Details for the file umadb-0.3.1-cp310-abi3-macosx_10_12_x86_64.whl.

File metadata

File hashes

Hashes for umadb-0.3.1-cp310-abi3-macosx_10_12_x86_64.whl
Algorithm Hash digest
SHA256 f7e14201a464d7e91a8a4661df9431b8cae4fdc02bf02a41783455bdd38283d1
MD5 843e412e2d8abe0ce66e2bbdaccd10ee
BLAKE2b-256 5823fcc12e6e0180981c6c8d8c17072e9dc8eefc4e0ab4d29cf9768ffd1dd3c0

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page