An advanced async rate limiter middleware for Taskiq (Python). Features configurable pacing strategies (adaptive smoothing vs. fixed throughput), burst control, and task throttling.
Project description
Taskiq Rate Limiter Middleware
An advanced, in-process, asynchronous rate-limiting middleware for the Taskiq framework.
This middleware offers advanced pacing strategies designed to either smooth task execution to prevent load spikes or maximize task throughput by ensuring a specific number of tasks are processed within a time window.
Key Features
- Rolling Window Rate Limiting: Restrict the number of tasks that can start within a configurable time window (e.g., 100 tasks per 60 seconds).
- Burst Control: Allow a configurable number of tasks to execute immediately before pacing is enforced. (Default is half of the limit)
- Configurable Pacing Strategies:
adaptive(default): Prioritizes load smoothing by dynamically spacing tasks based on the remaining time and capacity in the current window.fixed: Prioritizes throughput maximization by enforcing a fixed, predictable interval between tasks to guarantee the full limit is utilized over the window.
- Task Queuing: Asynchronously queues tasks that exceed the rate limit by default, rather than rejecting them.
- Jitter: Introduces a small amount of random jitter to task start times, helping to prevent the "thundering herd" problem in distributed systems.
Installation
pip install taskiq-rate-limiter
# OR
uv add taskiq-rate-limiter
Quick Start
Integrate the RateLimitMiddleware into your Taskiq broker and configure tasks for rate limiting using labels.
from taskiq import InMemoryBroker
from rate_limit_middleware import RateLimitMiddleware
# 1. Initialize the middleware with global defaults
# Use the 'fixed' strategy to ensure a consistent 100 tasks per minute.
rate_limiter = RateLimitMiddleware(
default_rate_limit_enabled=True,
default_limit=100,
default_window_seconds=60,
default_pacing_strategy="fixed"
)
# 2. Add the middleware to your broker
broker = InMemoryBroker().with_middlewares(rate_limiter)
# 3. This task will inherit the global rate-limiting settings
@broker.task
async def my_limited_task(i: int):
print(f"Executing task {i}")
return i
# ---- OR: Per-Task Configuration -----
# Disable rate limiting by default
rate_limiter = RateLimitMiddleware(default_rate_limit_enabled=False)
broker = InMemoryBroker().with_middlewares(rate_limiter)
# And enable it specifically on tasks that need it
@broker.task(
labels={
"rate_limit_enabled": True,
"rate_limit_limit": 100,
"rate_limit_window_seconds": 60,
"rate_limit_pacing_strategy": "fixed",
}
)
async def my_limited_task(i: int):
print(f"Executing task {i}")
return i
Configuration Options
Configuration is applied in two layers:
- Global Defaults: Set during
RateLimitMiddleware(...)initialization. - Per-Task Overrides: Set as key-value pairs in the
labelsdictionary of the@broker.task(...)decorator.
Below is a complete list of available options.
1. Core Limiting
These settings define the fundamental rate limit parameters.
rate_limit_enabled(bool): Enables or disables rate limiting for the task.- Default:
False(unless overridden in middleware init).
- Default:
rate_limit_limit(int): The maximum number of tasks allowed to start within the window.- Default:
100
- Default:
rate_limit_window_seconds(float): The duration of the rolling window in seconds.- Default:
60.0
- Default:
# Allow 500 tasks every 5 minutes (300 seconds)
@broker.task(labels={
"rate_limit_enabled": True,
"rate_limit_limit": 500,
"rate_limit_window_seconds": 300,
})
async def bulk_process(): ...
2. Pacing & Burst Control
These options control the execution timing of tasks within the window.
rate_limit_pacing_strategy(str): Determines the pacing algorithm. Accepts"adaptive"(for load smoothing) or"fixed"(for throughput maximization).- Default:
"adaptive"
- Default:
rate_limit_pacing_start_threshold(int): The number of tasks allowed to execute immediately (burst) at the start of a window before the pacing strategy is applied.- Default:
limit // 2. Set to0to pace every task. Set tolimitto disable pacing (pure burst).
- Default:
# Example: Strict, even pacing for an SLA.
# Process exactly 1 task every second (60 per 60s) with no initial burst.
@broker.task(labels={
"rate_limit_enabled": True,
"rate_limit_limit": 60,
"rate_limit_window_seconds": 60,
"rate_limit_pacing_strategy": "fixed",
"rate_limit_pacing_start_threshold": 0, # Start pacing immediately
})
async def sla_critical_task(): ...
3. Queue Management
These settings control behavior when tasks arrive faster than the rate limit allows.
rate_limit_max_queue_size(int | None): The maximum number of tasks permitted to wait for execution. IfNone, the queue is unbounded.- Default:
None
- Default:
rate_limit_reject_when_full(bool): IfTrue, tasks arriving when the queue is full will be rejected immediately with aRateLimitQueueFullError.- Default:
False
- Default:
# Example: Fail-fast configuration.
# If 10 tasks are already waiting, reject subsequent tasks immediately.
@broker.task(labels={
"rate_limit_enabled": True,
"rate_limit_limit": 10,
"rate_limit_max_queue_size": 10,
"rate_limit_reject_when_full": True,
})
async def realtime_request(): ...
4. Advanced
rate_limit_jitter_ms(float): Adds up to this many milliseconds of random delay to each paced task. This is useful in distributed environments to prevent workers from executing in perfect lock-step.- Default:
0
- Default:
# Add up to 50ms of random jitter to task start times.
@broker.task(labels={"rate_limit_enabled": True, "rate_limit_jitter_ms": 50})
async def distributed_task(): ...
Deep Dive: Pacing Strategies
The rate_limit_pacing_strategy is a key configuration option that fundamentally alters how the limiter schedules tasks after the initial burst (pacing_start_threshold) is consumed.
| Strategy | Primary Goal | How it Works | Best For |
|---|---|---|---|
adaptive |
Load Smoothing | Dynamically calculates delay based on remaining time and remaining slots in the window. | Protecting downstream services from sudden traffic spikes. Use when avoiding overload is the top priority. |
fixed |
Throughput Maximization | Calculates a single, fixed interval (window / limit) and schedules tasks at this pace. |
Meeting SLAs or processing a predictable volume of tasks. Use when guaranteeing capacity is the top priority. |
adaptive Pacing (Default)
- Goal: To prevent a sudden spike of tasks from overwhelming a system.
- Mechanism: It calculates the spacing between tasks dynamically based on the time remaining and slots remaining in the current window.
- Behavior: This strategy schedules tasks relative to when the previous task was scheduled. A large burst of tasks arriving early can schedule tasks for the entire window's capacity, potentially resulting in fewer tasks being executed than the
limitif the window ends before all scheduled tasks can run.
rate_limiter_pacing_50_adaptive.mp4
fixed Pacing
- Goal: To ensure a predictable number of tasks can be processed in a given window.
- Mechanism: It calculates a single, fixed pace based on the total window time and the total limit (e.g.,
60 seconds / 100 tasks = 0.6s per task). It then schedules each task at its ideal, pre-calculated time slot. - Behavior: This strategy guarantees that all
limitslots are available and spaced evenly throughout the window. If a task arrives before its ideal time slot, it will be delayed until that time, ensuring a perfectly even execution rate.
rate_limiter_pacing_50_fixed.mp4
Use Case: No-Burst Pacing (pacing_start_threshold=0)
Setting pacing_start_threshold to 0 removes the initial burst allowance. This forces the limiter to apply its pacing strategy to every task from the beginning of the window, enabling more granular control over the execution rate.
No Burst with adaptive Pacing
- Behavior: This configuration enforces load smoothing immediately. The very first task will be subject to a calculated delay to spread execution over the available window. The delay between subsequent tasks remains dynamic.
- Use Case: Ideal for extremely sensitive downstream systems where even a small initial burst of traffic is undesirable. It ensures the smoothest possible ramp-up of tasks.
rate_limiter_no_burst_adaptive.mp4
No Burst with fixed Pacing
- Behavior: This configuration establishes a strict, metronomic execution rate. Every task is delayed to fit into its pre-calculated, evenly spaced time slot (e.g., one task is executed precisely every
0.6seconds for a100/60slimit). - Use Case: Essential for meeting strict Service Level Agreements (SLAs) that require a constant, predictable processing rate. This guarantees that the system never exceeds the target throughput at any point within the window.
rate_limiter_no_burst_fixed.mp4
Full Example
This example demonstrates how to configure different tasks with different rate-limiting strategies.
import asyncio
from taskiq import InMemoryBroker
from rate_limit_middleware import RateLimitMiddleware
# Use the default 'adaptive' strategy globally for safety
broker = InMemoryBroker().with_middlewares(RateLimitMiddleware())
# This task targets a sensitive, external API. We want to smooth out calls.
# It will inherit the global 'adaptive' strategy.
@broker.task(
labels={
"rate_limit_enabled": True,
"rate_limit_limit": 50,
"rate_limit_window_seconds": 60,
"rate_limit_pacing_start_threshold": 10,
}
)
async def call_sensitive_api(payload: dict):
print("Calling sensitive API with adaptive pacing...")
await asyncio.sleep(0.1)
# This task is for high-volume internal data processing. We want to maximize throughput.
# We override the global default and set the strategy to 'fixed'.
@broker.task(
labels={
"rate_limit_enabled": True,
"rate_limit_limit": 200,
"rate_limit_window_seconds": 60,
"rate_limit_pacing_start_threshold": 50,
"rate_limit_pacing_strategy": "fixed",
}
)
async def process_internal_data(item_id: int):
print(f"Processing item {item_id} at max throughput.")
await asyncio.sleep(0.1)
License
This project is licensed under the MIT License. See the LICENSE file for details.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file taskiq_rate_limiter-0.1.0b1.tar.gz.
File metadata
- Download URL: taskiq_rate_limiter-0.1.0b1.tar.gz
- Upload date:
- Size: 11.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8bd7985aefb77b01669ca7c750760b1ae8b16898c4b21edc6415e70eb33f367d
|
|
| MD5 |
eb2e2256446d6f35c5584ad85af26c46
|
|
| BLAKE2b-256 |
37d96e4e68a768e0226d26875afbf1242c475a4b4b888b5e67933b145fe5cb7f
|
Provenance
The following attestation bundles were made for taskiq_rate_limiter-0.1.0b1.tar.gz:
Publisher:
python-publish.yml on artimesphoenix/taskiq-rate-limiter
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
taskiq_rate_limiter-0.1.0b1.tar.gz -
Subject digest:
8bd7985aefb77b01669ca7c750760b1ae8b16898c4b21edc6415e70eb33f367d - Sigstore transparency entry: 581248648
- Sigstore integration time:
-
Permalink:
artimesphoenix/taskiq-rate-limiter@f7204146b17f0e150986f6e5c033ef24f312a2b3 -
Branch / Tag:
refs/tags/v0.1.0b1 - Owner: https://github.com/artimesphoenix
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
python-publish.yml@f7204146b17f0e150986f6e5c033ef24f312a2b3 -
Trigger Event:
release
-
Statement type:
File details
Details for the file taskiq_rate_limiter-0.1.0b1-py3-none-any.whl.
File metadata
- Download URL: taskiq_rate_limiter-0.1.0b1-py3-none-any.whl
- Upload date:
- Size: 11.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7dea391b6c3cdb3c0aa64cb4839e3f34e17e4d8946fcec805af357c5d23ebafa
|
|
| MD5 |
f6ff58aec9c190f82e7c543fa44de53b
|
|
| BLAKE2b-256 |
251dd83659b3ea0073a844de1ce03aefd2fa1a157e8ce3a916de5622e30e6572
|
Provenance
The following attestation bundles were made for taskiq_rate_limiter-0.1.0b1-py3-none-any.whl:
Publisher:
python-publish.yml on artimesphoenix/taskiq-rate-limiter
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
taskiq_rate_limiter-0.1.0b1-py3-none-any.whl -
Subject digest:
7dea391b6c3cdb3c0aa64cb4839e3f34e17e4d8946fcec805af357c5d23ebafa - Sigstore transparency entry: 581248655
- Sigstore integration time:
-
Permalink:
artimesphoenix/taskiq-rate-limiter@f7204146b17f0e150986f6e5c033ef24f312a2b3 -
Branch / Tag:
refs/tags/v0.1.0b1 - Owner: https://github.com/artimesphoenix
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
python-publish.yml@f7204146b17f0e150986f6e5c033ef24f312a2b3 -
Trigger Event:
release
-
Statement type: