A high-level Cloudflare Queues consumer library for Python
Project description
CFQ
A high-level Python client for consuming messages from Cloudflare Queues with async workers.
from cfq import CFQ
client = CFQ(
api_token="your_cloudflare_api_token",
account_id="your_account_id",
)
@client.consumer(queue_id="your_queue_id")
async def process_messages(message):
# Messages will be automatically ACKed on success
# or sent back to the queue to be retried on exceptions.
await client.start()
CFQ's API design is inspired by TaskIQ and Celery, adapted for Cloudflare Queues.
Installation
uv add cfq
CFQ Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
api_token |
str |
required | Cloudflare API token with Queues permissions |
account_id |
str |
required | Your Cloudflare account identifier |
max_workers |
int |
10 |
Maximum concurrent message handlers (async workers) |
polling_interval_ms |
float |
1000 |
Polling interval in milliseconds when queue is empty |
flush_interval_ms |
float |
1000 |
Interval in milliseconds to send acks / retries to Cloudflare |
max_batch_size |
int |
10 |
Messages to pull per request |
allow_retry |
bool |
True |
Whether to retry failed messages |
retry_delay_seconds |
int |
0 |
Delay before retrying failed messages |
heartbeat_interval_seconds |
int |
0 |
Heartbeat logging interval (0 = disabled) |
logger |
Logger |
None |
Custom logger (defaults to "cfq" logger) |
httpx_logs |
bool |
False |
Enable httpx debug logs (disabled by default) |
Consumer Decorator Parameters
from cloudflare.types.queues.message_pull_response import Message
@client.consumer(queue_id="queue_id", visibility_timeout_ms=60000)
async def my_consumer(message: Message):
# Your message processing logic
pass
| Parameter | Type | Default | Description |
|---|---|---|---|
queue_id |
str |
required | The Cloudflare Queue ID to consume from |
visibility_timeout_ms |
int |
60000 |
Message visibility timeout in milliseconds |
Multiple Queue Consumers
client = CFQ(
api_token="your_token",
account_id="your_account_id",
max_workers=20, # Shared across all consumers
)
@client.consumer(queue_id="email_queue_id")
async def handle_emails(message: Message):
# Process email messages
await send_email(message.body)
@client.consumer(queue_id="webhook_queue_id", visibility_timeout_ms=30000)
async def handle_webhooks(message: Message):
# Process webhook messages with shorter timeout
await process_webhook(message.body)
await client.start()
Custom Configuration Example
client = CFQ(
api_token="your_token",
account_id="your_account_id",
max_workers=5,
polling_interval_ms=500, # Poll every 500ms
max_batch_size=20, # Pull up to 20 messages at once
allow_retry=True,
retry_delay_seconds=30, # Wait 30s before retry
heartbeat_interval_seconds=60, # Log heartbeat every minute
)
Error Handling
CFQ automatically handles message acknowledgment and retries:
- Success: Messages are automatically ACKed after successful processing
- Failure with retry enabled: Failed messages are retried with configurable delay
- Failure with retry disabled: Failed messages are discarded
- Worker limits: New messages wait for available workers when
max_workersis reached
Monitoring
Enable heartbeat logging to monitor processing rates:
client = CFQ(
# ... other config
heartbeat_interval_seconds=30, # Log every 30 seconds
)
This will output logs like:
INFO:cfq:Heartbeat | Processed 42 messages in last 30 seconds
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file cfq-0.1.2.tar.gz.
File metadata
- Download URL: cfq-0.1.2.tar.gz
- Upload date:
- Size: 5.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.8.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2a5eb7d55dea8e9037c8d526088ec616122c20a1dd237feeb2a3cc69f8b518ce
|
|
| MD5 |
26dad551de102a20ed61b9c5c0f446c0
|
|
| BLAKE2b-256 |
656cdcaefe6616d84de07874bcd5e07418439d9987e85be7a6b32ad23b660a21
|
File details
Details for the file cfq-0.1.2-py3-none-any.whl.
File metadata
- Download URL: cfq-0.1.2-py3-none-any.whl
- Upload date:
- Size: 5.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.8.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
983991f603c7dcbcb499b8d319d75f8db67c1c1d27cec678a15c3e6dec80c593
|
|
| MD5 |
77dea2aee0d980bc7873987fe783fcbf
|
|
| BLAKE2b-256 |
b8e2e64d90f3b01f354ba3d4eb78799404291ece3f70a2511b79cce91c91987a
|