Official Python SDK for the streamq message broker
Project description
streamq-python
Official Python SDK for streamq — a lightweight message broker with WebSocket and SSE delivery.
Installation
pip install streamq-python
Requires Python 3.9+.
Quick start
Async (FastAPI, aiohttp, async Django)
import asyncio
from streamq import AsyncClient
async def main():
async with AsyncClient("http://localhost:8080") as client:
# Create a topic
await client.create_topic("payments")
# Publish a message
result = await client.publish("payments", b'{"amount": 100, "currency": "GHS"}')
print(f"published at offset {result.offset}")
# Subscribe and receive messages
async with client.subscribe("payments", group="billing", from_offset=0) as consumer:
async for msg in consumer:
print(f"offset {msg.offset}: {msg.payload}")
await consumer.ack(msg.offset) # at-least-once delivery
asyncio.run(main())
Sync (Django, Flask, Celery, scripts)
from streamq import SyncClient
client = SyncClient("http://localhost:8080")
client.create_topic("payments")
result = client.publish("payments", b'{"amount": 100}')
print(f"published at offset {result.offset}")
with client.subscribe("payments", group="billing", from_offset=0) as consumer:
for msg in consumer:
print(f"offset {msg.offset}: {msg.payload}")
consumer.ack(msg.offset)
client.close()
API
AsyncClient(base_url, **options)
from streamq import AsyncClient
async with AsyncClient(
"http://localhost:8080",
timeout=10.0, # HTTP call timeout in seconds (default: 10)
reconnect_delay=2.0, # seconds between reconnects (default: 2)
max_reconnect_attempts=0, # 0 = retry forever (default: 0)
) as client:
...
SyncClient(base_url, **options)
from streamq import SyncClient
client = SyncClient("http://localhost:8080",
timeout=10.0,
reconnect_delay=2.0,
max_reconnect_attempts=0,
)
Topic management
# Create
await client.create_topic("events")
# List all topics
topics = await client.list_topics()
for t in topics:
print(f"{t.name}: {t.message_count} messages, latest offset {t.latest_offset}")
# Get single topic stats
info = await client.get_topic("events")
# Delete
await client.delete_topic("events")
Publishing
# Payload is bytes — encode however you like
result = await client.publish("payments", b'{"amount": 100}')
print(f"offset: {result.offset}, timestamp: {result.timestamp}")
Subscribing
# Broadcast — receive every message (no group)
async with client.subscribe("payments") as consumer:
async for msg in consumer:
print(msg.payload)
# Consumer group — work queue (one message → one subscriber in group)
async with client.subscribe("payments", group="billing") as consumer:
...
# Replay from the beginning of retained history
async with client.subscribe("payments", from_offset=0) as consumer:
...
# Resume from a specific offset
async with client.subscribe("payments", from_offset=42) as consumer:
...
# SSE instead of WebSocket (no ack support, works through more proxies)
async with client.subscribe("payments", protocol="sse") as consumer:
...
Error handling
from streamq import (
AsyncClient,
StreamqConflict,
StreamqNotFound,
StreamqError,
)
try:
await client.create_topic("payments")
except StreamqConflict:
pass # already exists — fine
except StreamqNotFound:
print("not found")
except StreamqError as e:
print(f"broker error: {e}")
Delivery guarantees
| Setup | Guarantee | Notes |
|---|---|---|
WebSocket + group + ack() |
At-least-once | Broker redelivers from last committed offset on reconnect |
| WebSocket, no group | At-most-once | Auto-committed on send |
| SSE (any) | At-most-once | Unidirectional — no ack channel |
Message type
@dataclass
class Message:
id: str
offset: int
topic: str
payload: bytes # base64 decoded automatically
timestamp: datetime
Broker
The broker is at github.com/GordenArcher/streamq.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
streamq_python-0.1.0.tar.gz
(11.6 kB
view details)
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file streamq_python-0.1.0.tar.gz.
File metadata
- Download URL: streamq_python-0.1.0.tar.gz
- Upload date:
- Size: 11.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5a65e8fda18f4b7a9bf25459ce68f62c884652e42068b0acbc6098e6073e6a78
|
|
| MD5 |
c0c9352582da80dbb9550105157647a1
|
|
| BLAKE2b-256 |
13f6462a87095dc585c6a321ab17288a74f1ce389a20258c3899c51589dbb0a5
|
File details
Details for the file streamq_python-0.1.0-py3-none-any.whl.
File metadata
- Download URL: streamq_python-0.1.0-py3-none-any.whl
- Upload date:
- Size: 14.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e6f8870a0166f1d718b5ca3326bd8619d6ce7586b6f076cd656ec085a0199fa9
|
|
| MD5 |
5dcc208e2778fbab56d3ba776402f53f
|
|
| BLAKE2b-256 |
e4969d49d767c40ec31b487a3a0c410ac9df432368daf4400f1ce74215e303c7
|