Python client for the queue-ti message queue service
Project description
queue-ti Python Client
A Python client library for queue-ti, providing high-level Producer and Consumer APIs for async and sync applications.
- Async first — Native async/await with automatic reconnect and token refresh
- Sync wrapper — Drop-in synchronous API that runs async code on a background thread
- Type-safe — Full type hints and mypy-strict compatible
- Python 3.11+ — Requires Python 3.11 or later
Installation
pip install queue-ti-client
Quick Start
Async Producer
import asyncio
from queueti import connect, ConnectOptions
async def main():
# Connect to the server
client = await connect("localhost:50051", options=ConnectOptions(insecure=True))
producer = client.producer()
# Publish a message
msg_id = await producer.publish(
topic="orders",
payload=b'{"amount": 99.99}',
)
print(f"Published: {msg_id}")
await client.close()
asyncio.run(main())
Async Consumer
import asyncio
from queueti import connect, ConnectOptions, ConsumerOptions
async def main():
client = await connect("localhost:50051", options=ConnectOptions(insecure=True))
# Consume messages (blocks until cancelled)
consumer = client.consumer(
topic="orders",
options=ConsumerOptions(concurrency=4),
)
async def handler(msg):
print(f"[{msg.id}] {msg.payload}")
# Return normally to auto-ack; raise to auto-nack
await consumer.consume(handler)
asyncio.run(main())
Sync Producer
from queueti import connect_sync, ConnectOptions
client = connect_sync("localhost:50051", options=ConnectOptions(insecure=True))
producer = client.producer()
msg_id = producer.publish(
topic="orders",
payload=b'{"amount": 99.99}',
)
print(f"Published: {msg_id}")
client.close()
Sync Consumer
from queueti import connect_sync, ConnectOptions, ConsumerOptions
client = connect_sync("localhost:50051", options=ConnectOptions(insecure=True))
consumer = client.consumer(
topic="orders",
options=ConsumerOptions(concurrency=4),
)
def handler(msg):
print(f"[{msg.id}] {msg.payload}")
# Return normally to auto-ack; raise to auto-nack
# Blocks until interrupted (Ctrl+C)
consumer.consume(handler)
Connection
Basic Connection
from queueti import connect
client = await connect("localhost:50051")
Insecure (Development)
from queueti import connect, ConnectOptions
client = await connect(
"localhost:50051",
options=ConnectOptions(insecure=True),
)
With Bearer Token
from queueti import connect, ConnectOptions
client = await connect(
"localhost:50051",
options=ConnectOptions(token="your-jwt-token"),
)
With Token Refresh
When your token expires, you can provide a refresher function to obtain a new token automatically:
from queueti import connect, ConnectOptions
async def refresh_token() -> str:
# Fetch a new token (e.g., from your auth service)
new_token = await fetch_fresh_token()
return new_token
client = await connect(
"localhost:50051",
options=ConnectOptions(
token="initial-token",
token_refresher=refresh_token,
),
)
# Token will refresh automatically before expiry
You can also manually set a new token:
client.set_token("new-token")
ConnectOptions
All fields are optional.
| Field | Type | Description |
|---|---|---|
token |
str | None |
Bearer token for JWT authentication |
token_refresher |
Callable[[], Awaitable[str]] | None |
Async function to refresh the token before expiry |
insecure |
bool |
Disable TLS (for development only; default: False) |
Authentication
When the server has auth.enabled = true, every RPC call requires a valid JWT. Tokens are issued by the server's HTTP API and expire after 15 minutes.
Using QueueTiAuth (recommended)
The QueueTiAuth helper automatically checks if authentication is required and handles login and token refresh:
import asyncio
import queueti
auth = queueti.QueueTiAuth.login("http://localhost:8080", "admin", "secret")
async def main():
opts = queueti.ConnectOptions(
token=auth.token,
token_refresher=auth.async_refresh,
)
client = await queueti.connect("localhost:50051", opts)
try:
producer = client.producer()
msg_id = await producer.publish("orders", b"...")
print(f"Published: {msg_id}")
finally:
await client.close()
async with queueti.AsyncAdminClient(
"http://localhost:8080",
queueti.AdminOptions(token=auth.token),
) as admin:
configs = await admin.list_topic_configs()
asyncio.run(main())
For the synchronous client, use connect_sync and refresh() (the sync variant of async_refresh()):
import queueti
auth = queueti.QueueTiAuth.login("http://localhost:8080", "admin", "secret")
client = queueti.connect_sync("localhost:50051", queueti.ConnectOptions(
token=auth.token,
token_refresher=auth.async_refresh,
))
try:
producer = client.producer()
msg_id = producer.publish("orders", b"...")
print(f"Published: {msg_id}")
finally:
client.close()
The QueueTiAuth helper:
- Calls
GET /api/auth/statusto check if authentication is required - If auth is disabled, returns a no-op instance with a null token
- If auth is enabled, calls
POST /api/auth/loginwith the provided credentials - Exposes
.token(str or None) for the current JWT,.async_refresh()for async clients, and.refresh()for sync clients
Option 1 — Obtaining a token manually
TOKEN=$(curl -s -X POST http://localhost:8080/api/auth/login \
-H "Content-Type: application/json" \
-d '{"username":"admin","password":"secret"}' \
| jq -r '.token')
Option 2 — Automatic refresh with custom fetcher
When your token expires, you can provide a refresher function to obtain a new token automatically:
from queueti import connect, ConnectOptions
async def refresh_token() -> str:
# Fetch a new token (e.g., from your auth service)
new_token = await fetch_fresh_token()
return new_token
client = await connect(
"localhost:50051",
options=ConnectOptions(
token="initial-token",
token_refresher=refresh_token,
),
)
# Token will refresh automatically before expiry
Option 3 — Manual update
Call client.set_token() to swap the token on a live connection. The new token takes effect on the very next RPC call; no reconnection is needed.
client = await connect(
"localhost:50051",
options=ConnectOptions(token="initial-token"),
)
# Later, when you have a fresh token:
client.set_token("new-token")
This is useful when token lifecycle is managed externally (e.g. a shared token store, a sidecar, or an existing refresh loop in your application).
Option 4 — Static token (short-lived processes)
For scripts or batch jobs that complete within the 15-minute token window, a static token is sufficient:
from queueti import connect, ConnectOptions
import os
client = await connect(
"localhost:50051",
options=ConnectOptions(token=os.getenv("QUEUETI_TOKEN")),
)
Producer API
AsyncProducer.publish()
msg_id: str = await producer.publish(
topic: str,
payload: bytes,
options: PublishOptions | None = None,
) -> str
Publish a message and return its assigned ID.
Parameters:
topic(str) — Topic namepayload(bytes) — Message contentoptions(PublishOptions | None) — Optional metadata and publishing settings
Returns: Message ID (str)
Raises: PublishError if the RPC fails
Example:
msg_id = await producer.publish(
topic="orders",
payload=b'{"sku": "ABC123", "qty": 5}',
options=PublishOptions(metadata={"source": "api"}),
)
Producer.publish() (Sync)
Identical to AsyncProducer.publish() but blocks instead of awaiting.
PublishOptions
| Field | Type | Description |
|---|---|---|
metadata |
dict[str, str] |
Optional metadata key-value pairs (default: {}) |
Consumer API
AsyncConsumer.consume()
await consumer.consume(
handler: Callable[[Message], Awaitable[None]],
) -> None
Stream messages from the topic, calling the handler for each message. Runs until cancelled. Auto-acks on success; auto-nacks on exception.
Parameters:
handler— Async function called for each message. Raise an exception to nack.
Behavior:
- Reconnects with exponential backoff on stream errors
- Concurrency controlled via
ConsumerOptions.concurrency - Visibility timeout overridable per-call via
ConsumerOptions.visibility_timeout_seconds
Example:
async def process_order(msg: Message):
order = json.loads(msg.payload)
print(f"Processing order {order['id']} (retry #{msg.retry_count})")
if order["amount"] < 0:
raise ValueError("invalid amount")
await consumer.consume(process_order)
AsyncConsumer.consume_batch()
await consumer.consume_batch(
options: BatchOptions,
handler: Callable[[list[Message]], Awaitable[None]],
) -> None
Poll batches from the topic, calling the handler with all messages in the batch. Runs until cancelled. Handler is responsible for acking/nacking each message.
Parameters:
options(BatchOptions) — Batch size and optional visibility timeout overridehandler— Async function called with a list ofMessageobjects
Behavior:
- Polls with exponential backoff when the queue is empty
- Each message is individually locked and can be acked/nacked independently
- Handler errors do not prevent ack/nack of individual messages
Example:
from queueti import BatchOptions
async def handle_batch(messages: list[Message]):
for msg in messages:
try:
order = json.loads(msg.payload)
await process_order(order)
await msg.ack()
except Exception as e:
await msg.nack(f"processing failed: {e}")
await consumer.consume_batch(
options=BatchOptions(batch_size=10, visibility_timeout_seconds=60),
handler=handle_batch,
)
Consumer.consume() (Sync)
Blocks on the calling thread and processes messages one at a time. Identical behavior to async version.
def handler(msg: SyncMessage):
# Process message; raise to nack
pass
consumer.consume(handler) # Blocks until interrupted
Consumer.consume_batch() (Sync)
Blocks on the calling thread and processes message batches.
def handler(messages: list[SyncMessage]):
for msg in messages:
try:
# Process...
msg.ack()
except Exception as e:
msg.nack(f"error: {e}")
consumer.consume_batch(
options=BatchOptions(batch_size=10),
handler=handler,
)
ConsumerOptions
All fields are optional.
| Field | Type | Default | Description |
|---|---|---|---|
concurrency |
int |
1 |
Number of messages to process in parallel (must be ≥ 1) |
visibility_timeout_seconds |
int | None |
None |
Override server default visibility timeout for this consumer (seconds) |
consumer_group |
str | None |
None |
Consumer group name for independent message consumption; see Consumer Groups |
BatchOptions
| Field | Type | Description |
|---|---|---|
batch_size |
int |
Maximum messages to dequeue in one call (must be ≥ 1) |
visibility_timeout_seconds |
int | None |
Optional visibility timeout override (seconds) |
consumer_group |
str | None |
Consumer group name for independent message consumption |
Message
Fields
Received from consume() or consume_batch().
| Field | Type | Description |
|---|---|---|
id |
str |
Unique message identifier |
topic |
str |
Topic the message belongs to |
payload |
bytes |
Message content |
metadata |
dict[str, str] |
User-supplied metadata |
created_at |
datetime |
Enqueue timestamp (UTC) |
retry_count |
int |
Current retry count (0 = first attempt) |
Methods
await msg.ack() -> None — Acknowledge the message (removes it from the queue).
Raises AckError if the RPC fails.
await msg.nack(reason: str = "") -> None — Return the message to the queue for retry (or to DLQ if max retries exceeded).
Raises NackError if the RPC fails.
Note: When using consume(), ack/nack are called automatically. Only call them directly with consume_batch().
SyncMessage
Identical to Message, but with synchronous ack() and nack() methods. Used with Consumer.consume() and Consumer.consume_batch().
Consumer Groups
Consumer groups enable independent consumption of the same messages by multiple systems. Each group tracks its own delivery state, allowing parallel processing of the same message by different applications without interference.
When a consumer group is specified, the client sends all RPCs scoped to that group and receives all messages enqueued to the topic. Each message is delivered independently to each group. A message is only deleted from the queue when all registered groups have acknowledged it.
Registering a Consumer Group
Consumer groups must be registered on the server before use:
curl -X POST http://localhost:8080/api/topics/orders/consumer-groups \
-H "Content-Type: application/json" \
-d '{"consumer_group": "warehouse"}'
Once registered, the group automatically receives all pending messages enqueued before registration (backfill), plus all future messages.
Async Consumer
import asyncio
from queueti import connect, ConnectOptions, ConsumerOptions
async def main():
client = await connect("localhost:50051", options=ConnectOptions(insecure=True))
consumer = client.consumer(
"orders",
options=ConsumerOptions(consumer_group="warehouse", concurrency=4),
)
async def handler(msg):
print(f"[warehouse] processing {msg.id}")
# Return normally to Ack; raise to Nack
await consumer.consume(handler)
asyncio.run(main())
Async Batch Consumer
from queueti import BatchOptions
async def handle_batch(messages):
for msg in messages:
try:
# Process...
await msg.ack()
except Exception as e:
await msg.nack(f"error: {e}")
await consumer.consume_batch(
options=BatchOptions(batch_size=50, consumer_group="warehouse"),
handler=handle_batch,
)
Sync Consumer
from queueti import connect_sync, ConnectOptions, ConsumerOptions
client = connect_sync("localhost:50051", options=ConnectOptions(insecure=True))
consumer = client.consumer(
"orders",
options=ConsumerOptions(consumer_group="warehouse", concurrency=4),
)
def handler(msg):
print(f"[warehouse] processing {msg.id}")
# Return normally to Ack; raise to Nack
consumer.consume(handler) # Blocks until interrupted
Sync Batch Consumer
from queueti import BatchOptions
def handle_batch(messages):
for msg in messages:
try:
# Process...
msg.ack()
except Exception as e:
msg.nack(f"error: {e}")
consumer.consume_batch(
options=BatchOptions(batch_size=50, consumer_group="warehouse"),
handler=handle_batch,
)
Error Handling
All exceptions inherit from QueueTiError.
QueueTiError
Base exception for all queue-ti client errors.
from queueti import QueueTiError
try:
await consumer.consume(handler)
except QueueTiError as e:
print(f"Queue operation failed: {e}")
PublishError
Raised when a message cannot be published.
from queueti import PublishError
try:
msg_id = await producer.publish("orders", payload)
except PublishError as e:
print(f"Failed to publish: {e}")
AckError
Raised when acknowledging a message fails.
from queueti import AckError
try:
await msg.ack()
except AckError as e:
print(f"Failed to ack message {msg.id}: {e}")
NackError
Raised when nacking a message fails.
from queueti import NackError
try:
await msg.nack("processing failed")
except NackError as e:
print(f"Failed to nack message {msg.id}: {e}")
Examples
Robust Async Consumer with Exponential Backoff
import asyncio
from queueti import connect, ConnectOptions, ConsumerOptions, Message
async def consume_with_backoff():
client = await connect(
"localhost:50051",
options=ConnectOptions(insecure=True),
)
consumer = client.consumer(
topic="emails",
options=ConsumerOptions(concurrency=8),
)
async def send_email(msg: Message):
payload = json.loads(msg.payload)
try:
await send_smtp(payload["to"], payload["body"])
except TemporaryFailure:
raise # Nack; will retry after visibility timeout
except PermanentFailure:
# Don't raise; let it go to DLQ if max retries exceeded
await msg.nack("permanent failure, skipping")
try:
await consumer.consume(send_email)
except KeyboardInterrupt:
print("Shutting down...")
finally:
await client.close()
asyncio.run(consume_with_backoff())
Batch Processing with Manual Ack/Nack
import asyncio
import json
from queueti import connect, ConnectOptions, BatchOptions, Message
async def batch_processor():
client = await connect("localhost:50051", options=ConnectOptions(insecure=True))
consumer = client.consumer("events")
async def process_batch(messages: list[Message]):
# Process all messages; commit to DB once
rows = []
for msg in messages:
event = json.loads(msg.payload)
rows.append(event)
try:
async with db_pool.acquire() as conn:
await conn.executemany(
"INSERT INTO events (...) VALUES (...)",
rows,
)
# Commit succeeded; ack all
for msg in messages:
await msg.ack()
except Exception as e:
# Commit failed; nack all
for msg in messages:
await msg.nack(f"db error: {e}")
await consumer.consume_batch(
options=BatchOptions(batch_size=100),
handler=process_batch,
)
asyncio.run(batch_processor())
Sync Consumer in a Worker Thread
import threading
import json
from queueti import connect_sync, ConnectOptions, SyncMessage
def worker():
client = connect_sync("localhost:50051", options=ConnectOptions(insecure=True))
consumer = client.consumer("webhooks")
def handle_webhook(msg: SyncMessage):
payload = json.loads(msg.payload)
requests.post(payload["url"], json=payload["data"])
try:
consumer.consume(handle_webhook)
finally:
client.close()
# Run in a separate thread
thread = threading.Thread(target=worker, daemon=True)
thread.start()
thread.join()
Development setup
macOS and some Linux distributions ship an externally-managed Python that blocks
pip install at the system level. Use a virtual environment:
# From the repo root — creates .venv and installs all dev dependencies
make setup-python
# Run the test suite
make test-python
Or manually:
cd clients/python
python3 -m venv .venv
source .venv/bin/activate
pip install -e ".[dev]"
Testing
With the virtual environment active:
# Run all tests
pytest
# Run specific test file
pytest tests/test_consumer.py
# Run with verbose output
pytest -v
# Run mypy
mypy queueti/
Logging
The library uses Python's standard logging module. To see internal debug logs:
import logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("queueti")
logger.setLevel(logging.DEBUG)
Troubleshooting
Connection refused
Ensure the queue-ti server is running on the correct host and port:
# Development (insecure, local)
client = await connect("localhost:50051", options=ConnectOptions(insecure=True))
# Production (TLS required)
client = await connect("queue-ti.example.com:50051")
Token refresh not working
Ensure your token_refresher function returns a valid JWT string and handles errors:
async def refresh_token() -> str:
try:
response = await auth_service.refresh()
return response.token
except Exception as e:
logger.error(f"Token refresh failed: {e}")
raise # Exponential backoff will apply
Messages not being processed
Check that:
- Messages are being published (
publish()succeeded) - Consumer handler is not raising unexpected exceptions
- Visibility timeout is long enough for your processing (increase via
ConsumerOptions.visibility_timeout_seconds) - Topic exists and has messages (use admin UI or check logs)
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file queue_ti_client-2026.5.4.tar.gz.
File metadata
- Download URL: queue_ti_client-2026.5.4.tar.gz
- Upload date:
- Size: 27.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
da86edf1526c4a458d5cbe65d34627bc19dfa2a77986dc093fc574207fa2b00a
|
|
| MD5 |
a486acd1e7dc347d81fa082dc4e1ba5e
|
|
| BLAKE2b-256 |
1d69852f7285b258a33225aba0284e256947e03458d5d41e2ddc221b9458509e
|
Provenance
The following attestation bundles were made for queue_ti_client-2026.5.4.tar.gz:
Publisher:
release.yml on Joessst-Dev/queue-ti
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
queue_ti_client-2026.5.4.tar.gz -
Subject digest:
da86edf1526c4a458d5cbe65d34627bc19dfa2a77986dc093fc574207fa2b00a - Sigstore transparency entry: 1474787144
- Sigstore integration time:
-
Permalink:
Joessst-Dev/queue-ti@3ac0cdf82db5dcd030177841561e94602634c3ee -
Branch / Tag:
refs/tags/v2026.05.4 - Owner: https://github.com/Joessst-Dev
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@3ac0cdf82db5dcd030177841561e94602634c3ee -
Trigger Event:
push
-
Statement type:
File details
Details for the file queue_ti_client-2026.5.4-py3-none-any.whl.
File metadata
- Download URL: queue_ti_client-2026.5.4-py3-none-any.whl
- Upload date:
- Size: 23.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0a4f9391b5cdda690e677c2855a10d2cedd802431f67c9d491a004337f136fcf
|
|
| MD5 |
ddaf5a91f7df4388261edb4da5f40148
|
|
| BLAKE2b-256 |
de01401f5b649832ff508984f31f3a864de2221732a98d8a781912010f8597ba
|
Provenance
The following attestation bundles were made for queue_ti_client-2026.5.4-py3-none-any.whl:
Publisher:
release.yml on Joessst-Dev/queue-ti
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
queue_ti_client-2026.5.4-py3-none-any.whl -
Subject digest:
0a4f9391b5cdda690e677c2855a10d2cedd802431f67c9d491a004337f136fcf - Sigstore transparency entry: 1474787193
- Sigstore integration time:
-
Permalink:
Joessst-Dev/queue-ti@3ac0cdf82db5dcd030177841561e94602634c3ee -
Branch / Tag:
refs/tags/v2026.05.4 - Owner: https://github.com/Joessst-Dev
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@3ac0cdf82db5dcd030177841561e94602634c3ee -
Trigger Event:
push
-
Statement type: