gRPC client for EventStore DB
Project description
esdb-py
EventStoreDB Python gRPC client
NOTE: This project is still work in progress
Completed features
- secure connection
- basic auth
- connection string parsing
- streams
- append
- batch append (v21.10+)
- delete
- read stream
- read all with stream/event type filters (v21.10+)
- catch-up subscriptions
- tombstone
- filtering
- persistent subscriptions
- create
- read stream
- read all with filter (v21.10+)
- update
- delete
- list
- info
- reply parked events
- CRUD for projections
- users
Installation
Using pip:
pip install esdb
Using poetry:
poetry add esdb
Development
-
Install poetry
-
Create virtualenv (i.e. using pyenv):
pyenv install 3.12.0 pyenv virtualenv 3.12.0 esdb-py pyenv local esdb-py
-
Install deps with
poetry install -
Start eventstore in docker:
make run-esdb -
Run the tests:
pytest tests
Usage
Have a look at tests for more examples.
Connection string examples
DNS discovery with credentials, discovery configuration, node preference and ca file path
esdb+discover://admin:changeit@localhost:2111?discoveryInterval=0&maxDiscoverAttempts=3&tlsCafile=certs/ca/ca.crt&nodePreference=follower
Single-node insecure connection
esdb://localhost:2111?tls=false
Supported parameters:
discoveryIntervalgossipTimeoutmaxDiscoverAttemptsnodePreferencekeepAliveIntervalkeepAliveTimeouttlstlsCafiletlsVerifyCertdefaultDeadline
Connection string can be generated here.
Discovery and node preferences
from esdb import ESClient
client = ESClient("esdb+discover://admin:changeit@localhost:2111?nodePreference=follower")
Connection configuration
from esdb import ESClient
# Connect without TLS
client = ESClient("esdb://localhost:2111?tls=false")
# Secure connection with basic auth and keepalive
client = ESClient("esdb://admin:changeit@localhost:2111?tlsCafile=certs/ca/ca.crt&keepAliveInterval=5&keepAliveTimeout=5")
Append, Read, Catch-up subscriptions
import asyncio
import datetime
import uuid
from esdb import ESClient
client = ESClient("esdb+discover://admin:changeit@localhost:2111")
stream = f"test-{str(uuid.uuid4())}"
async def streams():
async with client.connect() as conn:
# Appending to stream
for i in range(10):
append_result = await conn.streams.append(
stream=stream,
event_type="test_event",
data={"i": i, "ts": datetime.datetime.utcnow().isoformat()},
)
# Read up to 10 events
async for result in conn.streams.read(stream=stream, count=10):
print(result.data)
# Read up to 10 events, backwards
async for result in conn.streams.read(stream=stream, count=10, backwards=True):
print(result.data)
# Read up to 10 events, starting from 5th event
async for result in conn.streams.read(stream=stream, count=10, revision=5):
print(result.data)
# Read up to 10 events backwards, starting from 5th event
async for result in conn.streams.read(stream=stream, count=10, backwards=True, revision=5):
print(result.data)
# Create a catch-up subscription to a stream
async for result in conn.streams.read(stream=stream, subscribe=True):
print(result.data)
asyncio.run(streams())
Batch append
import asyncio
import uuid
from esdb import ESClient
from esdb.streams import Message
async def batch_append():
# Append multiple events in as a single batch
# Batch append is not supported on EventStore < v21.10
stream = str(uuid.uuid4())
messages: list[Message] = [
Message(event_type="one", data={"item": 1}),
Message(event_type="one", data={"item": 2}),
Message(event_type="one", data={"item": 3}),
Message(event_type="two", data={"item": 1}),
Message(event_type="two", data={"item": 2}),
Message(event_type="two", data={"item": 3}),
]
async with ESClient("esdb+discover://admin:changeit@localhost:2111").connect() as conn:
response = await conn.streams.batch_append(stream=stream, messages=messages)
assert response.current_revision == 5
events = [e async for e in conn.streams.read(stream=stream, count=50)]
assert len(events) == 6
asyncio.run(batch_append())
Catch-up subscription to all events with filtering
import uuid
import asyncio
from esdb import ESClient
from esdb.shared import Filter
async def filters():
async with ESClient("esdb+discover://admin:changeit@localhost:2111").connect() as conn:
# Append 10 events with the same prefix to random streams
for i in range(10):
await conn.streams.append(stream=str(uuid.uuid4()), event_type=f"prefix-{i}", data=b"")
# subscribe to events from all streams, filtering by event type
async for event in conn.streams.read_all(
subscribe=True, # subscribe will wait for events, use count=<n> to read <n> events and stop
filter_by=Filter(
kind=Filter.Kind.EVENT_TYPE,
regex="^prefix-",
# Checkpoint only required when subscribe=True, it's not needed when using count=<int>
checkpoint_interval_multiplier=1000,
),
):
print(event)
asyncio.run(filters())
Persistent subscriptions
import asyncio
from esdb import ESClient
from esdb.shared import Filter
from esdb.subscriptions import SubscriptionSettings, NackAction
client = ESClient("esdb+discover://admin:changeit@localhost:2111")
stream = "stream-foo"
group = "group-bar"
async def persistent():
async with client.connect() as conn:
# emit some events to the same stream
for i in range(50):
await conn.streams.append(stream, "foobar", {"i": i})
# create a stream subscription
await conn.subscriptions.create_stream_subscription(
stream=stream,
group_name=group,
settings=SubscriptionSettings(
max_subscriber_count=50,
read_batch_size=5,
live_buffer_size=10,
history_buffer_size=10,
consumer_strategy=SubscriptionSettings.ConsumerStrategy.ROUND_ROBIN,
checkpoint_ms=10000,
),
)
# create subscription to all events with filtering
# Only supported on EventStore v21.10+
await conn.subscriptions.create_all_subscription(
group_name="subscription_group",
filter_by=Filter(kind=Filter.Kind.EVENT_TYPE, regex="^some_type$", checkpoint_interval_multiplier=200),
settings=SubscriptionSettings(
read_batch_size=50,
live_buffer_size=100,
history_buffer_size=100,
max_retry_count=2,
checkpoint_ms=20000,
),
)
# read from a subscription
async with client.connect() as conn:
sub = conn.subscriptions.subscribe(stream=stream, group_name=group, buffer_size=5)
async for event in sub:
try:
# do work with event
print(event)
await sub.ack([event])
except Exception as err:
await sub.nack([event], NackAction.RETRY, reason=str(err))
# get subscription info
info = await conn.subscriptions.get_info(group, stream)
assert info.group_name == group
# delete subscription
await conn.subscriptions.delete(group, stream)
# list subscriptions
subs = await conn.subscriptions.list()
for sub in subs:
print(sub.total_items)
asyncio.run(persistent())
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file esdb-0.3.5.tar.gz.
File metadata
- Download URL: esdb-0.3.5.tar.gz
- Upload date:
- Size: 63.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.6.1 CPython/3.11.4 Darwin/23.0.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0311dabd56ce0b42b5c55e13b7085e74276270327e08a6164929cad739366ce6
|
|
| MD5 |
1f163120a7d42f6d719d20e04cf8dd95
|
|
| BLAKE2b-256 |
a83628dc6153021725623c2232aca6785ed31c2bea8a7188ac6746009b1ccc63
|
File details
Details for the file esdb-0.3.5-py3-none-any.whl.
File metadata
- Download URL: esdb-0.3.5-py3-none-any.whl
- Upload date:
- Size: 80.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.6.1 CPython/3.11.4 Darwin/23.0.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
866fc108d933becc23ea7e20f74e0c1f01337c17fd4705c4381ab2e2d7af0e42
|
|
| MD5 |
18ea152edd5a0ee18801343108b4e3f2
|
|
| BLAKE2b-256 |
9c3ff72fae47146573ed2006410f9a06ac2da8c7d4ad6b7aa8376302bbc68369
|