Skip to main content

A high-level Cloudflare Queues consumer library for Python

Project description

CFQ

A high-level Python client for consuming messages from Cloudflare Queues with async workers.

from cfq import CFQ

client = CFQ(
    api_token="your_cloudflare_api_token",
    account_id="your_account_id",
)

@client.consumer(queue_id="your_queue_id")
async def process_messages(message):
    # Messages will be automatically ACKed on success 
    # or sent back to the queue to be retried on exceptions.

await client.start()

CFQ's API design is inspired by TaskIQ and Celery, adapted for Cloudflare Queues.

Installation

uv add cfq

CFQ Parameters

Parameter Type Default Description
api_token str required Cloudflare API token with Queues permissions
account_id str required Your Cloudflare account identifier
max_workers int 10 Maximum concurrent message handlers (async workers)
polling_interval_ms float 1000 Polling interval in milliseconds when queue is empty
max_batch_size int 10 Messages to pull per request
allow_retry bool True Whether to retry failed messages
retry_delay_seconds int 0 Delay before retrying failed messages
heartbeat_interval_seconds int 0 Heartbeat logging interval (0 = disabled)
logger Logger None Custom logger (defaults to "cfq" logger)

Consumer Decorator Parameters

from cloudflare.types.queues.message_pull_response import Message

@client.consumer(queue_id="queue_id", visibility_timeout_ms=60000)
async def my_consumer(message: Message):
    # Your message processing logic
    pass
Parameter Type Default Description
queue_id str required The Cloudflare Queue ID to consume from
visibility_timeout_ms int 60000 Message visibility timeout in milliseconds

Multiple Queue Consumers

client = CFQ(
    api_token="your_token",
    account_id="your_account_id",
    max_workers=20,  # Shared across all consumers
)

@client.consumer(queue_id="email_queue_id")
async def handle_emails(message: Message):
    # Process email messages
    await send_email(message.body)

@client.consumer(queue_id="webhook_queue_id", visibility_timeout_ms=30000)
async def handle_webhooks(message: Message):
    # Process webhook messages with shorter timeout
    await process_webhook(message.body)

await client.start()

Custom Configuration Example

client = CFQ(
    api_token="your_token",
    account_id="your_account_id",
    max_workers=5,
    polling_interval_ms=500,  # Poll every 500ms
    max_batch_size=20,        # Pull up to 20 messages at once
    allow_retry=True,
    retry_delay_seconds=30,   # Wait 30s before retry
    heartbeat_interval_seconds=60,  # Log heartbeat every minute
)

Error Handling

CFQ automatically handles message acknowledgment and retries:

  • Success: Messages are automatically ACKed after successful processing
  • Failure with retry enabled: Failed messages are retried with configurable delay
  • Failure with retry disabled: Failed messages are discarded
  • Worker limits: New messages wait for available workers when max_workers is reached

Monitoring

Enable heartbeat logging to monitor processing rates:

client = CFQ(
    # ... other config
    heartbeat_interval_seconds=30,  # Log every 30 seconds
)

This will output logs like:

INFO:cfq:Heartbeat | Processed 42 messages in last 30 seconds

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cfq-0.1.0.tar.gz (4.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

cfq-0.1.0-py3-none-any.whl (5.0 kB view details)

Uploaded Python 3

File details

Details for the file cfq-0.1.0.tar.gz.

File metadata

  • Download URL: cfq-0.1.0.tar.gz
  • Upload date:
  • Size: 4.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.8.3

File hashes

Hashes for cfq-0.1.0.tar.gz
Algorithm Hash digest
SHA256 521d054649bf16f87275a2e5e697d87044f193a796ef2a05b867dca881b3b5df
MD5 63c32c81359dc36f8be1f0064efbccd5
BLAKE2b-256 2ec2062187fd5914c29490e19c719a2e71940c88fcf9ca88b308f01d24e7a453

See more details on using hashes here.

File details

Details for the file cfq-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: cfq-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 5.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.8.3

File hashes

Hashes for cfq-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 75ca9db8a3a7f77aee170ba6ed49e7b7ecf7a53a903a0b41ef9f2b0bce5e477c
MD5 4fa017150cfae85c0eba656a44bd8889
BLAKE2b-256 065e94605660173a2b70727d49dc16d57b74b3ff21adc0d9c8737cf66289c961

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page