Skip to main content

A high-level Cloudflare Queues consumer library for Python

Project description

CFQ

A high-level Python client for consuming messages from Cloudflare Queues with async workers.

from cfq import CFQ

client = CFQ(
    api_token="your_cloudflare_api_token",
    account_id="your_account_id",
)

@client.consumer(queue_id="your_queue_id")
async def process_messages(message):
    # Messages will be automatically ACKed on success 
    # or sent back to the queue to be retried on exceptions.

await client.start()

CFQ's API design is inspired by TaskIQ and Celery, adapted for Cloudflare Queues.

Installation

uv add cfq

CFQ Parameters

Parameter Type Default Description
api_token str required Cloudflare API token with Queues permissions
account_id str required Your Cloudflare account identifier
max_workers int 10 Maximum concurrent message handlers (async workers)
polling_interval_ms float 1000 Polling interval in milliseconds when queue is empty
max_batch_size int 10 Messages to pull per request
allow_retry bool True Whether to retry failed messages
retry_delay_seconds int 0 Delay before retrying failed messages
heartbeat_interval_seconds int 0 Heartbeat logging interval (0 = disabled)
logger Logger None Custom logger (defaults to "cfq" logger)

Consumer Decorator Parameters

from cloudflare.types.queues.message_pull_response import Message

@client.consumer(queue_id="queue_id", visibility_timeout_ms=60000)
async def my_consumer(message: Message):
    # Your message processing logic
    pass
Parameter Type Default Description
queue_id str required The Cloudflare Queue ID to consume from
visibility_timeout_ms int 60000 Message visibility timeout in milliseconds

Multiple Queue Consumers

client = CFQ(
    api_token="your_token",
    account_id="your_account_id",
    max_workers=20,  # Shared across all consumers
)

@client.consumer(queue_id="email_queue_id")
async def handle_emails(message: Message):
    # Process email messages
    await send_email(message.body)

@client.consumer(queue_id="webhook_queue_id", visibility_timeout_ms=30000)
async def handle_webhooks(message: Message):
    # Process webhook messages with shorter timeout
    await process_webhook(message.body)

await client.start()

Custom Configuration Example

client = CFQ(
    api_token="your_token",
    account_id="your_account_id",
    max_workers=5,
    polling_interval_ms=500,  # Poll every 500ms
    max_batch_size=20,        # Pull up to 20 messages at once
    allow_retry=True,
    retry_delay_seconds=30,   # Wait 30s before retry
    heartbeat_interval_seconds=60,  # Log heartbeat every minute
)

Error Handling

CFQ automatically handles message acknowledgment and retries:

  • Success: Messages are automatically ACKed after successful processing
  • Failure with retry enabled: Failed messages are retried with configurable delay
  • Failure with retry disabled: Failed messages are discarded
  • Worker limits: New messages wait for available workers when max_workers is reached

Monitoring

Enable heartbeat logging to monitor processing rates:

client = CFQ(
    # ... other config
    heartbeat_interval_seconds=30,  # Log every 30 seconds
)

This will output logs like:

INFO:cfq:Heartbeat | Processed 42 messages in last 30 seconds

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cfq-0.1.1.tar.gz (4.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

cfq-0.1.1-py3-none-any.whl (5.0 kB view details)

Uploaded Python 3

File details

Details for the file cfq-0.1.1.tar.gz.

File metadata

  • Download URL: cfq-0.1.1.tar.gz
  • Upload date:
  • Size: 4.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.8.3

File hashes

Hashes for cfq-0.1.1.tar.gz
Algorithm Hash digest
SHA256 5684641151fab22d655e2bef6bd4d7e3527e481f90610cbd507f51f8cc46fe94
MD5 c1166ff34a25946292ff2f18cb404433
BLAKE2b-256 d71902f91c44a18b8ffd64e9ba2a9ebf32200b555ddc47ec7c8a65dd8a4ccc2c

See more details on using hashes here.

File details

Details for the file cfq-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: cfq-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 5.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.8.3

File hashes

Hashes for cfq-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 3a4b175462978dd30889108d7d77ea3d2b4bbed2baf4b220236dd135b5a7010c
MD5 232fc68700dae6376264d9ee326b5a18
BLAKE2b-256 a419eaa67d66fd09bbf2ccba6c33b1db617592e6135e966abab11bfb0b5f3233

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page