Skip to main content

High-performance Python client for Rivven distributed streaming platform

Project description

rivven-python

High-performance Python bindings for the Rivven event streaming platform.

Features

  • Native Performance: Zero-copy message handling through Rust bindings (PyO3)
  • Async-First: Full async/await support with Python's asyncio
  • Type-Safe: Complete type annotations for IDE support
  • Transaction Support: Exactly-once semantics with transactional producers
  • Authentication: Multiple auth methods (simple, SCRAM-SHA-256)
  • Admin Operations: Full topic and partition management

Installation

pip install rivven

Usage

Basic Connection

import asyncio
import rivven

async def main():
    # Connect to Rivven cluster
    client = await rivven.connect("localhost:9092")
    
    # Create a topic
    await client.create_topic("my-topic", partitions=3)
    
    # List topics
    topics = await client.list_topics()
    print(f"Topics: {topics}")

asyncio.run(main())

Producer

import asyncio
import rivven

async def produce():
    client = await rivven.connect("localhost:9092")
    producer = await client.producer("my-topic", batch_size=16384, linger_ms=5)
    
    # Send a message
    offset = await producer.send(b'{"event": "login"}', key=b"user-123")
    print(f"Sent at offset: {offset}")
    
    # Send to specific partition
    await producer.send_to_partition(b"value", partition=0, key=b"key")
    
    # Batch send — all messages are enqueued concurrently and
    # batched automatically on the wire for maximum throughput
    offsets = await producer.send_batch([b"msg1", b"msg2", b"msg3"])
    
    # Flush pending records
    await producer.flush()
    
    # Close when done
    await producer.close()

asyncio.run(produce())

Consumer

import asyncio
import rivven

async def consume():
    client = await rivven.connect("localhost:9092")
    consumer = client.consumer("my-topic", group="my-group")
    
    # Fetch messages
    messages = await consumer.fetch(max_messages=100)
    for msg in messages:
        print(f"Offset {msg.offset}: {msg.value_str()}")
    
    # Commit offsets
    await consumer.commit()
    
    # Or use async iterator (poll-retries on empty fetch, never stops)
    # With auto_commit=True (default), offsets are committed per batch.
    # Explicit commit is only needed with auto_commit=False:
    async for msg in consumer:
        print(f"Received: {msg.value_str()}")
        await consumer.commit()

asyncio.run(consume())

Admin Operations

import asyncio
import rivven

async def admin():
    client = await rivven.connect("localhost:9092")
    
    # Create topic
    await client.create_topic("new-topic", partitions=3, replication_factor=1)
    
    # List topics
    topics = await client.list_topics()
    print(f"Topics: {topics}")
    
    # Get topic configuration
    configs = await client.describe_topic_configs("new-topic")
    print(f"Configs: {configs}")
    
    # Modify topic configuration
    await client.alter_topic_config("new-topic", [("retention.ms", "86400000")])
    
    # Add partitions
    await client.create_partitions("new-topic", new_total=6)
    
    # Get offset for timestamp
    offset = await client.get_offset_for_timestamp("new-topic", 0, 1699900000000)
    
    # Delete records before offset
    await client.delete_records("new-topic", 0, before_offset=100)
    
    # Delete topic
    await client.delete_topic("old-topic")

asyncio.run(admin())

Authentication

import asyncio
import rivven

async def authenticated():
    client = await rivven.connect("localhost:9092")
    
    # Simple authentication
    await client.authenticate("username", "password")
    
    # Or SCRAM-SHA-256 authentication
    await client.authenticate_scram("username", "password")
    
    # Use client as normal
    topics = await client.list_topics()

asyncio.run(authenticated())

TLS Connection

import asyncio
import rivven

async def secure():
    # Connect with TLS
    client = await rivven.connect_tls(
        "localhost:9093",
        ca_cert_path="/path/to/ca.pem",
        server_name="broker.example.com",
        client_cert_path="/path/to/client.pem",  # Optional: mTLS
        client_key_path="/path/to/client.key",   # Optional: mTLS
    )
    
    topics = await client.list_topics()

asyncio.run(secure())

Transactions (Exactly-Once Semantics)

import asyncio
import rivven

async def transactional():
    client = await rivven.connect("localhost:9092")
    
    # Initialize transactional producer - returns a ProducerState object
    producer_state = await client.init_producer_id()
    print(f"Producer ID: {producer_state.producer_id}, Epoch: {producer_state.producer_epoch}")
    
    try:
        # Begin transaction (thread-safe: per-transactional-id lock prevents races;
        # lock entries are cleaned up after commit/abort to prevent unbounded growth)
        await client.begin_transaction("my-txn-id", producer_state)
        
        # Add partitions to transaction
        await client.add_partitions_to_txn("my-txn-id", producer_state, [
            ("my-topic", 0),
            ("my-topic", 1),
        ])
        
        # Publish with idempotent semantics - sequence is auto-incremented
        await client.publish_idempotent(
            topic="my-topic",
            value=b"message-1",
            producer_state=producer_state,
            key=b"key-1"
        )
        
        await client.publish_idempotent(
            topic="my-topic",
            value=b"message-2",
            producer_state=producer_state,
            key=b"key-2"
        )
        
        # Commit transaction
        await client.commit_transaction("my-txn-id", producer_state)
        
    except Exception as e:
        # Abort on error
        await client.abort_transaction("my-txn-id", producer_state)
        raise

asyncio.run(transactional())

Exception Handling

Rivven provides a hierarchy of exception types for granular error handling:

from rivven import (
    RivvenException,        # Base exception for all Rivven errors
    ConnectionException,    # Connection-related errors
    ServerException,        # Server-side errors
    TimeoutException,       # Request timeouts
    SerializationException, # Serialization/deserialization errors
    ConfigException,        # Configuration errors
)

try:
    client = await rivven.connect("localhost:9092")
except ConnectionException as e:
    print(f"Failed to connect: {e}")
except TimeoutException as e:
    print(f"Connection timed out: {e}")
except RivvenException as e:
    print(f"General Rivven error: {e}")

Testing

Running Tests

# Install test dependencies
pip install -r requirements-test.txt

# Run API tests (no broker required)
pytest tests/test_api.py -v

# Run integration tests (requires running broker)
export RIVVEN_BROKER="localhost:9092"
pytest tests/test_integration.py -v -m integration

Type Checking

The package includes type stubs (rivven.pyi) for IDE support and static type checking:

pip install mypy
mypy your_code.py

Building from Source

# Install maturin
pip install maturin

# Build wheel
cd crates/rivven-python
maturin build --release

# Install locally
pip install target/wheels/rivven-*.whl

# Development install (editable)
maturin develop --release

Documentation

License

Apache-2.0. See LICENSE.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distributions

If you're not sure about the file name format, learn more about wheel file names.

rivven-0.0.22-cp312-cp312-manylinux_2_28_aarch64.whl (1.2 MB view details)

Uploaded CPython 3.12manylinux: glibc 2.28+ ARM64

rivven-0.0.22-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.3 MB view details)

Uploaded CPython 3.12manylinux: glibc 2.17+ x86-64

rivven-0.0.22-cp312-cp312-macosx_11_0_arm64.whl (1.1 MB view details)

Uploaded CPython 3.12macOS 11.0+ ARM64

File details

Details for the file rivven-0.0.22-cp312-cp312-manylinux_2_28_aarch64.whl.

File metadata

File hashes

Hashes for rivven-0.0.22-cp312-cp312-manylinux_2_28_aarch64.whl
Algorithm Hash digest
SHA256 4a743f41719ae83d3c60f14a1674fe9720b33b93fa75b62a7b33884df20bdc65
MD5 2791d611b721103379bfb3943edbcd8f
BLAKE2b-256 f933f498f156b5a734e289dd8147df864f75e3a729f65cbaeb7e5696aa845fce

See more details on using hashes here.

Provenance

The following attestation bundles were made for rivven-0.0.22-cp312-cp312-manylinux_2_28_aarch64.whl:

Publisher: release.yml on hupe1980/rivven

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file rivven-0.0.22-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.

File metadata

File hashes

Hashes for rivven-0.0.22-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
Algorithm Hash digest
SHA256 3edbdbab1a465b2f6cc184bfd7e30caa4735f570e7a56e853178eb4ec2183ed5
MD5 17e117aaf89fa9959630bc3cfc8b2558
BLAKE2b-256 79c998fab242927fefb5aded673230d48f36842a76df73ac4c64eecdef7454f5

See more details on using hashes here.

Provenance

The following attestation bundles were made for rivven-0.0.22-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl:

Publisher: release.yml on hupe1980/rivven

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file rivven-0.0.22-cp312-cp312-macosx_11_0_arm64.whl.

File metadata

File hashes

Hashes for rivven-0.0.22-cp312-cp312-macosx_11_0_arm64.whl
Algorithm Hash digest
SHA256 ed432412daf2b3124d92ffb8bf3254378730ccd4008c43966aa55c464ea5a260
MD5 1fe84795ae56365851388804ce3a5b08
BLAKE2b-256 4fe7b33cceaae4b10e1b23e7985674e38d74d5a6ea5e38a8371303b25f9f97a5

See more details on using hashes here.

Provenance

The following attestation bundles were made for rivven-0.0.22-cp312-cp312-macosx_11_0_arm64.whl:

Publisher: release.yml on hupe1980/rivven

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page