Python async client for the Danube messaging platform
Project description
Danube-py client
The Python async client library for interacting with Danube Messaging Broker platform.
Danube is an open-source distributed Messaging platform written in Rust. Consult the documentation for supported concepts and the platform architecture.
Features
📤 Producer Capabilities
- Basic Messaging - Send messages with byte payloads
- Partitioned Topics - Distribute messages across multiple partitions for horizontal scaling
- Reliable Dispatch - Guaranteed message delivery with persistence (WAL + cloud storage)
- Schema Integration - Type-safe messaging with automatic validation (Bytes, String, Number, Avro, JSON Schema, Protobuf)
📥 Consumer Capabilities
- Flexible Subscriptions - Three subscription types for different use cases:
- Exclusive - Single active consumer, guaranteed ordering
- Shared - Load balancing across multiple consumers, parallel processing
- Failover - High availability with automatic standby promotion
- Message Acknowledgment - Reliable message processing with at-least-once delivery
- Partitioned Consumption - Automatic handling of messages from all partitions
🔐 Schema Registry
- Schema Management - Register, version, and retrieve schemas (JSON Schema, Avro, Protobuf)
- Compatibility Checking - Validate schema evolution (Backward, Forward, Full, None modes)
- Type Safety - Automatic validation against registered schemas
- Schema Evolution - Safe schema updates with compatibility enforcement
🏗️ Client Features
- Async/Await - Built on
asyncioandgrpc.aiofor efficient async I/O - Connection Pooling - Shared connection management across producers/consumers
- Automatic Reconnection - Resilient connection handling with retry logic
- Topic Namespaces - Organize topics with namespace structure (
/namespace/topic-name)
Installation
pip install danube-client
Or install from source:
git clone https://github.com/danube-messaging/danube-py.git
cd danube-py
pip install -e .
Example Usage
Check out the example files.
Start the Danube server
Use the instructions from the documentation to run the Danube broker/cluster.
Create Producer
import asyncio
from danube import DanubeClientBuilder
async def main():
client = await (
DanubeClientBuilder()
.service_url("http://127.0.0.1:6650")
.build()
)
topic = "/default/test_topic"
producer_name = "test_producer"
producer = (
client.new_producer()
.with_topic(topic)
.with_name(producer_name)
.build()
)
await producer.create()
print(f"The Producer {producer_name} was created")
payload = b"Hello Danube"
message_id = await producer.send(payload)
print(f"The Message with id {message_id} was sent")
await producer.close()
asyncio.run(main())
Reliable Dispatch (optional)
Reliable dispatch can be enabled when creating the producer, the broker will stream the messages to the consumer from WAL and cloud storage.
from danube import DispatchStrategy
producer = (
client.new_producer()
.with_topic(topic)
.with_name(producer_name)
.with_dispatch_strategy(DispatchStrategy.RELIABLE)
.build()
)
Create Consumer
import asyncio
from danube import DanubeClientBuilder, SubType
async def main():
client = await (
DanubeClientBuilder()
.service_url("http://127.0.0.1:6650")
.build()
)
topic = "/default/test_topic"
consumer_name = "test_consumer"
subscription_name = "test_subscription"
consumer = (
client.new_consumer()
.with_topic(topic)
.with_consumer_name(consumer_name)
.with_subscription(subscription_name)
.with_subscription_type(SubType.EXCLUSIVE)
.build()
)
# Subscribe to the topic
await consumer.subscribe()
print(f"The Consumer {consumer_name} was created")
# Start receiving messages
queue = await consumer.receive()
while True:
message = await queue.get()
payload = message.payload.decode()
print(f"Received message: {payload!r}")
# Acknowledge the message
await consumer.ack(message)
asyncio.run(main())
Schema Registry
import json
from danube import SchemaType
schema_client = client.schema()
# Register a JSON schema
json_schema = json.dumps({
"type": "object",
"properties": {
"field1": {"type": "string"},
"field2": {"type": "integer"},
},
})
schema_id = await (
schema_client.register_schema("my-app-events")
.with_type(SchemaType.JSON_SCHEMA)
.with_schema_data(json_schema.encode())
.execute()
)
# Create producer with schema reference
producer = (
client.new_producer()
.with_topic("/default/test_topic")
.with_name("schema_producer")
.with_schema_subject("my-app-events")
.build()
)
Browse the examples directory for complete working code.
Contribution
Working on improving and adding new features. Please feel free to contribute or report any issues you encounter.
Running Integration Tests
Before submitting a PR, start the test cluster and run the integration tests:
# 1. Start the cluster
cd docker/
docker compose up -d
# 2. Wait for the broker to be healthy
docker compose ps
# 3. Run the integration tests from the repository root
cd ..
pytest integration_tests/ -v --timeout=120
# 4. Stop the cluster when done
cd docker/
docker compose down -v
Regenerating gRPC stubs
Make sure the proto files are the latest from the Danube project.
Install the required tools:
pip install grpcio-tools
Generate the Python gRPC code from the proto files:
python -m grpc_tools.protoc \
--proto_path=danube/proto \
--python_out=danube/proto \
--grpc_python_out=danube/proto \
danube/proto/DanubeApi.proto
python -m grpc_tools.protoc \
--proto_path=danube/proto \
--python_out=danube/proto \
--grpc_python_out=danube/proto \
danube/proto/SchemaRegistry.proto
Then fix the imports in the generated *_grpc.py files to use package-relative imports:
sed -i 's/^import DanubeApi_pb2/from danube.proto import DanubeApi_pb2/' danube/proto/DanubeApi_pb2_grpc.py
sed -i 's/^import SchemaRegistry_pb2/from danube.proto import SchemaRegistry_pb2/' danube/proto/SchemaRegistry_pb2_grpc.py
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file danube_client-0.2.0.tar.gz.
File metadata
- Download URL: danube_client-0.2.0.tar.gz
- Upload date:
- Size: 29.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6c2595f2808462e681108a90e645f808d524739e13836afc4517f1d2f360f4cc
|
|
| MD5 |
e381204ff23bcb5aacdcfb720b92d759
|
|
| BLAKE2b-256 |
c96140f494a12d454fa1617baabb1b84efe404f8f8ea4e0a6670b62b0aebffa5
|
Provenance
The following attestation bundles were made for danube_client-0.2.0.tar.gz:
Publisher:
publish.yml on danube-messaging/danube-py
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
danube_client-0.2.0.tar.gz -
Subject digest:
6c2595f2808462e681108a90e645f808d524739e13836afc4517f1d2f360f4cc - Sigstore transparency entry: 957092056
- Sigstore integration time:
-
Permalink:
danube-messaging/danube-py@a70f56bd91ec77b9282c50dc0e2c16ea917de66f -
Branch / Tag:
refs/tags/v0.2.0 - Owner: https://github.com/danube-messaging
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@a70f56bd91ec77b9282c50dc0e2c16ea917de66f -
Trigger Event:
push
-
Statement type:
File details
Details for the file danube_client-0.2.0-py3-none-any.whl.
File metadata
- Download URL: danube_client-0.2.0-py3-none-any.whl
- Upload date:
- Size: 34.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6d6c53c65058b6fabb654f802af4dfc32ddc237532623981011f9195074e548b
|
|
| MD5 |
78efc6b5edcfab257e28e87b705dbdc2
|
|
| BLAKE2b-256 |
4da7cc59cbe72e64a4cc5b786196616dc093853b4ad9977a0856a43ee89bb660
|
Provenance
The following attestation bundles were made for danube_client-0.2.0-py3-none-any.whl:
Publisher:
publish.yml on danube-messaging/danube-py
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
danube_client-0.2.0-py3-none-any.whl -
Subject digest:
6d6c53c65058b6fabb654f802af4dfc32ddc237532623981011f9195074e548b - Sigstore transparency entry: 957092117
- Sigstore integration time:
-
Permalink:
danube-messaging/danube-py@a70f56bd91ec77b9282c50dc0e2c16ea917de66f -
Branch / Tag:
refs/tags/v0.2.0 - Owner: https://github.com/danube-messaging
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@a70f56bd91ec77b9282c50dc0e2c16ea917de66f -
Trigger Event:
push
-
Statement type: