Skip to main content

Official Python SDK for the streamq message broker

Project description

streamq-python

Official Python SDK for streamq — a lightweight message broker with WebSocket and SSE delivery.

Installation

pip install streamq-python

Requires Python 3.9+.


Quick start

Async (FastAPI, aiohttp, async Django)

import asyncio
from streamq import AsyncClient

async def main():
    async with AsyncClient("http://localhost:8080") as client:
        # Create a topic
        await client.create_topic("payments")

        # Publish a message
        result = await client.publish("payments", b'{"amount": 100, "currency": "GHS"}')
        print(f"published at offset {result.offset}")

        # Subscribe and receive messages
        async with client.subscribe("payments", group="billing", from_offset=0) as consumer:
            async for msg in consumer:
                print(f"offset {msg.offset}: {msg.payload}")
                await consumer.ack(msg.offset)  # at-least-once delivery

asyncio.run(main())

Sync (Django, Flask, Celery, scripts)

from streamq import SyncClient

client = SyncClient("http://localhost:8080")

client.create_topic("payments")

result = client.publish("payments", b'{"amount": 100}')
print(f"published at offset {result.offset}")

with client.subscribe("payments", group="billing", from_offset=0) as consumer:
    for msg in consumer:
        print(f"offset {msg.offset}: {msg.payload}")
        consumer.ack(msg.offset)

client.close()

API

AsyncClient(base_url, **options)

from streamq import AsyncClient

async with AsyncClient(
    "http://localhost:8080",
    timeout=10.0,                  # HTTP call timeout in seconds (default: 10)
    reconnect_delay=2.0,           # seconds between reconnects (default: 2)
    max_reconnect_attempts=0,      # 0 = retry forever (default: 0)
) as client:
    ...

SyncClient(base_url, **options)

from streamq import SyncClient

client = SyncClient("http://localhost:8080",
    timeout=10.0,
    reconnect_delay=2.0,
    max_reconnect_attempts=0,
)

Topic management

# Create
await client.create_topic("events")

# List all topics
topics = await client.list_topics()
for t in topics:
    print(f"{t.name}: {t.message_count} messages, latest offset {t.latest_offset}")

# Get single topic stats
info = await client.get_topic("events")

# Delete
await client.delete_topic("events")

Publishing

# Payload is bytes — encode however you like
result = await client.publish("payments", b'{"amount": 100}')
print(f"offset: {result.offset}, timestamp: {result.timestamp}")

Subscribing

# Broadcast — receive every message (no group)
async with client.subscribe("payments") as consumer:
    async for msg in consumer:
        print(msg.payload)

# Consumer group — work queue (one message → one subscriber in group)
async with client.subscribe("payments", group="billing") as consumer:
    ...

# Replay from the beginning of retained history
async with client.subscribe("payments", from_offset=0) as consumer:
    ...

# Resume from a specific offset
async with client.subscribe("payments", from_offset=42) as consumer:
    ...

# SSE instead of WebSocket (no ack support, works through more proxies)
async with client.subscribe("payments", protocol="sse") as consumer:
    ...

Error handling

from streamq import (
    AsyncClient,
    StreamqConflict,
    StreamqNotFound,
    StreamqError,
)

try:
    await client.create_topic("payments")
except StreamqConflict:
    pass  # already exists — fine
except StreamqNotFound:
    print("not found")
except StreamqError as e:
    print(f"broker error: {e}")

Delivery guarantees

Setup Guarantee Notes
WebSocket + group + ack() At-least-once Broker redelivers from last committed offset on reconnect
WebSocket, no group At-most-once Auto-committed on send
SSE (any) At-most-once Unidirectional — no ack channel

Message type

@dataclass
class Message:
    id: str
    offset: int
    topic: str
    payload: bytes      # base64 decoded automatically
    timestamp: datetime

Broker

The broker is at github.com/GordenArcher/streamq.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

streamq_python-0.1.0.tar.gz (11.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

streamq_python-0.1.0-py3-none-any.whl (14.8 kB view details)

Uploaded Python 3

File details

Details for the file streamq_python-0.1.0.tar.gz.

File metadata

  • Download URL: streamq_python-0.1.0.tar.gz
  • Upload date:
  • Size: 11.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.0

File hashes

Hashes for streamq_python-0.1.0.tar.gz
Algorithm Hash digest
SHA256 5a65e8fda18f4b7a9bf25459ce68f62c884652e42068b0acbc6098e6073e6a78
MD5 c0c9352582da80dbb9550105157647a1
BLAKE2b-256 13f6462a87095dc585c6a321ab17288a74f1ce389a20258c3899c51589dbb0a5

See more details on using hashes here.

File details

Details for the file streamq_python-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: streamq_python-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 14.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.0

File hashes

Hashes for streamq_python-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 e6f8870a0166f1d718b5ca3326bd8619d6ce7586b6f076cd656ec085a0199fa9
MD5 5dcc208e2778fbab56d3ba776402f53f
BLAKE2b-256 e4969d49d767c40ec31b487a3a0c410ac9df432368daf4400f1ce74215e303c7

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page