Python SDK for EZThrottle - The API Dam for rate-limited services
Project description
EZThrottle Python SDK
The API Dam for rate-limited services. Queue and execute HTTP requests with smart retry logic, multi-region racing, and webhook delivery.
Get Your API Key
👉 Get started at ezthrottle.network
Pay for delivery through outages and rate limiting. Unlimited free concurrency.
No need to manage Lambda functions, SQS queues, DynamoDB, or complex retry logic. EZThrottle handles webhook fanout, distributed queuing, and multi-region orchestration for you. Just grab an API key and start shipping reliable API calls.
The End of Serverless Infrastructure
RIP OPS. Hello serverless without maintenance.
The era of managing serverless infrastructure is over. No more Lambda functions to deploy, SQS queues to configure, DynamoDB tables to provision, or CloudWatch alarms to tune. EZThrottle replaces your entire background job infrastructure with a single API call. Just code your business logic—we handle the rest.
Speed & Reliability Through Multi-Region Racing
Execute requests across multiple geographic regions simultaneously (IAD, LAX, ORD, etc.). The fastest region wins—delivering sub-second response times. When a region experiences issues, requests automatically route to healthy regions with zero configuration. Geographic distribution + intelligent routing = blazing-fast reliable delivery, every time.
Installation
pip install ezthrottle
Quick Start
from ezthrottle import EZThrottle, Step, StepType
client = EZThrottle(api_key="your_api_key")
# Simple job submission
result = (
Step(client)
.url("https://api.example.com/endpoint")
.method("POST")
.type(StepType.PERFORMANCE)
.webhooks([{"url": "https://your-app.com/webhook"}])
.execute()
)
print(f"Job ID: {result['job_id']}")
Pricing
Free Tier - 1 Million Requests/Month Forever
No credit card. No limits. All features included.
- 1,000,000 requests per month FREE
- Multi-region racing, webhook fanout, retry logic - everything
- ~30,000 requests/day (covers most production apps)
- Perfect for indie devs, startups, side projects
Early Adopter Pricing (Subject to Change)
| Tier | Included Requests | Monthly Price | Overage (per 100k) | Hard Cap |
|---|---|---|---|---|
| Free | 1M requests/month | $0 | N/A | 1M (upgrade to continue) |
| Indie | 2M requests/month | $50 | $50/100k | 5M (upgrade to continue) |
| Growth | 5M requests/month | $200 | $40/100k | 10M (upgrade to continue) |
| Pro | 10M requests/month | $500 | $25/100k | 25M (upgrade to continue) |
Hard caps protect you from surprise bills. When you hit your tier's cap, requests pause until you upgrade or the month resets.
Overage pricing: Pay only for what you use beyond your included requests, up to your tier's hard cap.
Example: Indie tier uses 3M requests = $50 (base) + $50 (1M overage) = $100 total
Smart Upgrade Incentives
The math makes upgrading obvious:
Scenario: Using 8M requests/month
| Option | Calculation | Total Cost |
|---|---|---|
| Stay on Indie (hit cap) | Service stops at 5M | ❌ Lost revenue |
| Pay Indie overages | $50 + ($50 × 30) = $50 + $1,500 | ❌ $1,550/month |
| Upgrade to Growth | $200 base + ($40 × 30) = $200 + $1,200 | ⚠️ $1,400/month |
| Upgrade to Pro | $500 base (includes 10M) | ✅ $500/month |
Upgrading to Pro saves you $900-1,050/month vs paying overages.
The tiers are designed so you WANT to upgrade - overage pricing is intentionally expensive to make the next tier a no-brainer.
Need 25M+ requests/month, no caps, or custom SLAs? 👉 Contact us for enterprise pricing
Early Adopter Benefits
Lock in these rates by signing up now. Pricing subject to change for new customers. Early adopters keep their tier pricing even as we adjust rates.
Questions? 👉 Pricing FAQ | Contact sales
Ready to stop debugging Lambda at 3am? 👉 Start free with 1M requests/month
Why This Pricing Makes Sense
What's a Good Night's Sleep Worth?
3am PagerDuty alert: "Stripe API down. Retry storm taking down prod. Revenue stopped."
You wake up. Laptop. VPN. SSH into servers. Lambda logs scrolling. DynamoDB throttling. SQS backlog exploding. IAM policies denying for no reason. Concurrent execution limits hit. CloudWatch costs spiking.
You spend 2 hours debugging. Fix the immediate issue. Write a post-mortem. Promise to "build better retry logic."
Three months later, same alert. Different API.
The AWS Nightmare Nobody Talks About
Building retry infrastructure on AWS means:
Lambda Hell:
- Concurrent execution limits (1000 by default, need to request increases)
- Cold starts killing performance (500ms+ latency spikes)
- IAM policies that randomly deny for no fucking reason
- CloudWatch logs costing more than the Lambdas themselves
- Debugging distributed traces across 47 Lambda invocations
SQS Madness:
- Dead letter queues filling up
- Visibility timeout confusion (did it process? who knows!)
- FIFO vs Standard (wrong choice = data loss)
- Poison messages breaking your workers
- No built-in retry logic for 429/500 errors
DynamoDB Pain:
- Provisioned throughput math (always wrong)
- Hot partition keys throttling randomly
- GSI limits (20 max, need to plan carefully)
- Point-in-time recovery costing $$$
- Read/write capacity units (what even are these?)
The Real Kicker:
- AWS has no built-in tool for queueing 429 and 500 errors at scale
- You have to build it yourself
- With Lambda + SQS + DynamoDB + Step Functions + EventBridge
- And debug the whole mess when it breaks at 3am
Why AWS Can't Do This (And EZThrottle Can)
Performance:
- EZThrottle core: Written in Gleam (compiles to Erlang/OTP)
- Actor-based concurrency: Millions of jobs, zero race conditions
- Sub-millisecond job routing: Faster than Lambda cold starts
- Multi-region racing: Native to our architecture (not bolted on)
AWS Stack:
- Lambda: Cold starts, concurrent execution limits, IAM hell
- SQS: No native retry logic, visibility timeout confusion
- DynamoDB: Hot partitions, throughput throttling
- Step Functions: $0.025 per 1000 state transitions (adds up fast)
You can't build this on AWS serverless and get the same performance. We tried. It doesn't work. That's why we built EZThrottle.
The Hidden Cost of Retry Storms
What happens when Stripe/OpenAI/Anthropic has an outage?
Without EZThrottle:
5-minute API outage causes:
1000 req/sec × 5 retries = 5000 req/sec retry storm
5000 req/sec × 300 seconds = 1.5M failed requests
1.5M × 10KB payload = 15GB egress
15GB × $0.09/GB = $1,350 in AWS egress fees
Plus:
- Lambda concurrent execution limit hit (all new requests fail)
- SQS queues backing up (visibility timeout chaos)
- DynamoDB throttling (hot partition from retry attempts)
- CloudWatch logs exploding ($200+ in 5 minutes)
- Your servers maxed out (can't serve real users)
Total cost: $1,550 + 2 hours of engineer time + lost revenue
With EZThrottle:
Same 5-minute outage:
1000 req/sec × 1 submit to EZThrottle = 1000 req/sec
300k requests × $0.50/1k = $150 total
Plus:
- Your servers stay healthy (serving real users)
- No retry storm (EZThrottle handles retries)
- No egress fees (one request out, webhook back)
- No debugging at 3am
- No lost revenue
Total cost: $150 + 0 engineer time + 0 lost revenue
Savings: $1,400 per outage (and your sanity)
The Hidden Cost of Building This Yourself
You're about to hire 2 engineers to build retry infrastructure. Let's do the math.
DIY Cost (AWS + Engineers):
| Component | Year 1 | Ongoing |
|---|---|---|
| Infrastructure | ||
| Lambda (retries + webhooks) | $1,200 | $1,200/year |
| SQS (job queues) | $1,200 | $1,200/year |
| DynamoDB (state tracking) | $3,000 | $3,000/year |
| CloudWatch (logs) | $1,200 | $1,200/year |
| Data transfer (egress fees) | $12,000 | $12,000/year |
| Infrastructure subtotal | $18,600 | $18,600/year |
| Engineering | ||
| Initial build (3 months, 2 engineers @ $150k) | $75,000 | - |
| Ongoing maintenance (30% time, 2 engineers) | $45,000 | $90,000/year |
| On-call rotation (outage response) | $15,000 | $30,000/year |
| Engineering subtotal | $135,000 | $120,000/year |
| TOTAL DIY COST | $153,600 | $138,600/year |
EZThrottle Cost:
| Component | Year 1 | Ongoing |
|---|---|---|
| Free tier (1M requests/month) | $0 | $0/year |
| Pro tier (2M requests/month) | $6,000 | $6,000/year |
| Engineer time to integrate | $5,000 | $0/year |
| TOTAL EZTHROTTLE COST | $11,000 | $6,000/year |
Savings: $142,600 in Year 1, $132,600/year ongoing
Or put another way: You save an entire senior engineer's salary every year.
FRUGAL vs PERFORMANCE: Choose Your Strategy
| Feature | FRUGAL | PERFORMANCE |
|---|---|---|
| Execution | Client-side first | Server-side distributed |
| When to use | High success rate (95%+) | Mission-critical / high traffic |
| Cost | Only pay when forwarded | Always uses EZThrottle |
| During API outages | Retry storm (melts your servers) | Servers stay healthy |
| Egress fees | High (every retry = AWS egress) | Low (one request to EZThrottle) |
| Lambda limits | Hit concurrent execution cap | Never hit limits |
| IAM debugging | Your problem | Not your problem |
| Good night's sleep | Nope | Yes |
Rate Limiting: 2 RPS Per Domain
EZThrottle throttles at 2 requests per second PER TARGET DOMAIN:
api.stripe.com→ 2 RPSapi.openai.com→ 2 RPSapi.anthropic.com→ 2 RPS
All domains run concurrently. The limit is per destination, not per account.
Need higher limits? Return X-EZTHROTTLE-RPS header or request custom defaults.
Real-World Example: Payment Processor
Before EZThrottle (AWS Lambda + SQS):
- Stripe outage: 15 minutes
- Retry storm: 2M failed requests
- AWS egress fees: $1,800
- Lambda concurrent execution limit hit: 45 minutes total downtime
- Lost revenue: $50,000
- Engineer time debugging: 6 hours (including 3am wake-up)
- CloudWatch logs: $400
- Customer support tickets: 200
- Total cost per outage: $52,200
After EZThrottle:
- Same Stripe outage: 15 minutes
- Submitted to EZThrottle: 300k requests
- EZThrottle cost: $150
- Servers stayed online: 0 minutes downtime
- Lost revenue: $0
- Engineer time: 0 hours (slept through it)
- Customer support tickets: 5
- Total cost per outage: $150
ROI: 348x cost reduction per outage
Plus ongoing savings:
- 60% reduction in AWS egress fees ($7,200/year saved)
- Zero Lambda IAM debugging (priceless)
- No more 3am pages (actually priceless)
- One less engineer needed ($150k/year saved)
What You're Really Paying For
❌ Wrong comparison: "EZThrottle ($500/1M) vs Lambda ($0.20/1M)" → This ignores SQS, DynamoDB, egress, IAM hell, and engineers
✅ Right comparison: "EZThrottle ($6k/year) vs DIY ($139k/year)" → Lambda + SQS + DynamoDB + engineers + sanity
You're not paying for request proxying. You're paying to never debug Lambda IAM policies at 3am again.
What you get:
- ✅ No retry storms during API outages
- ✅ No Lambda concurrent execution limits
- ✅ No IAM policy debugging hell
- ✅ No SQS dead letter queue mysteries
- ✅ No DynamoDB hot partition throttling
- ✅ Multi-region racing (3+ regions, fastest wins)
- ✅ Webhook reliability (automatic retries)
- ✅ Built in Gleam/OTP (actor-based, zero race conditions)
- ✅ Sleep through outages (we handle it)
AWS can't do this at this scale. That's why EZThrottle exists.
SDK Documentation
Step Types
StepType.PERFORMANCE (Server-side execution)
Submit jobs to EZThrottle for distributed execution with multi-region racing and webhook delivery.
Step(client)
.url("https://api.stripe.com/charges")
.type(StepType.PERFORMANCE)
.webhooks([{"url": "https://app.com/webhook"}])
.regions(["iad", "lax", "ord"]) # Multi-region racing
.execution_mode("race") # First completion wins
.execute()
StepType.FRUGAL (Client-side first)
Execute locally first, only forward to EZThrottle on specific error codes. Saves money!
Step(client)
.url("https://api.example.com")
.type(StepType.FRUGAL)
.fallback_on_error([429, 500, 503]) # Forward to EZThrottle on these codes
.execute()
Idempotent Key Strategies
Critical concept: Idempotent keys prevent duplicate job execution. Choose the right strategy for your use case.
IdempotentStrategy.HASH (Default)
Backend generates deterministic hash of (url, method, body, customer_id). Prevents duplicates.
Use when:
- Payment processing (don't charge twice!)
- Critical operations (create user, send notification)
- You want automatic deduplication
Example:
from ezthrottle import IdempotentStrategy
# Prevents duplicate charges - same request = rejected as duplicate
Step(client)
.url("https://api.stripe.com/charges")
.body('{"amount": 1000, "currency": "usd"}')
.idempotent_strategy(IdempotentStrategy.HASH) # Default
.execute()
# Second call with same params → "duplicate" (not charged twice!)
IdempotentStrategy.UNIQUE
SDK generates unique UUID per request. Allows duplicates.
Use when:
- Polling endpoints (same URL, different data each time)
- Webhooks (want to send every time)
- Scheduled jobs (run every minute/hour)
- GET requests that return changing data
Example:
# Poll API every minute - each request gets unique UUID
while True:
Step(client)
.url("https://api.example.com/status")
.idempotent_strategy(IdempotentStrategy.UNIQUE) # New UUID each time
.execute()
time.sleep(60)
Without UNIQUE strategy, polling would fail:
# BAD - Second request rejected as duplicate!
Step(client).url("https://api.com/status").execute() # Works
Step(client).url("https://api.com/status").execute() # Rejected! Same hash
Custom Keys
Provide your own business logic keys.
Use when:
- You have existing ID system (order ID, transaction ID)
- Want custom deduplication logic
Example:
# Custom key based on order ID
Step(client)
.url("https://api.example.com/process")
.idempotent_key(f"order-{order_id}") # Dedup per order
.execute()
Workflow Chaining
Chain steps together with .on_success(), .on_failure(), and .fallback():
# Analytics step (cheap)
analytics = Step(client).url("https://analytics.com/track").type(StepType.FRUGAL)
# Notification (fast, distributed)
notification = (
Step(client)
.url("https://notify.com")
.type(StepType.PERFORMANCE)
.webhooks([{"url": "https://app.com/webhook"}])
.regions(["iad", "lax"])
.on_success(analytics)
)
# Primary API call (cheap local execution)
result = (
Step(client)
.url("https://api.example.com")
.type(StepType.FRUGAL)
.fallback_on_error([429, 500])
.on_success(notification)
.execute()
)
Fallback Chains
Handle failures with automatic fallback execution:
backup_api = Step(client).url("https://backup-api.com")
result = (
Step(client)
.url("https://primary-api.com")
.fallback(backup_api, trigger_on_error=[500, 502, 503])
.execute()
)
Multi-Region Racing
Submit jobs to multiple regions, fastest wins:
Step(client)
.url("https://api.example.com")
.regions(["iad", "lax", "ord"]) # Try all 3 regions
.region_policy("fallback") # Auto-route if region down
.execution_mode("race") # First completion wins
.webhooks([{"url": "https://app.com/webhook"}])
.execute()
Webhook Fanout (Multiple Webhooks)
Deliver job results to multiple services simultaneously:
Step(client)
.url("https://api.stripe.com/charges")
.method("POST")
.webhooks([
# Primary webhook (must succeed)
{"url": "https://app.com/payment-complete", "has_quorum_vote": True},
# Analytics webhook (optional)
{"url": "https://analytics.com/track", "has_quorum_vote": False},
# Notification service (must succeed)
{"url": "https://notify.com/alert", "has_quorum_vote": True},
# Multi-region webhook racing
{"url": "https://backup.com/webhook", "regions": ["iad", "lax"], "has_quorum_vote": True}
])
.webhook_quorum(2) # At least 2 webhooks with has_quorum_vote=true must succeed
.execute()
Webhook Options:
url- Webhook endpoint URLregions- (Optional) Deliver webhook from specific regionshas_quorum_vote- (Optional) Counts toward quorum (default: true)
Use Cases:
- Notify multiple services (payment processor + analytics + CRM)
- Redundancy (multiple backup webhooks)
- Multi-region delivery (low latency globally)
Retry Policies
Customize retry behavior:
Step(client)
.url("https://api.example.com")
.retry_policy({
"max_retries": 5,
"max_reroutes": 3,
"retry_codes": [429, 503], # Retry in same region
"reroute_codes": [500, 502, 504] # Try different region
})
.execute()
Rate Limiting & Tuning
EZThrottle intelligently manages rate limits for your API calls. By default, requests are throttled at 2 RPS (requests per second) to smooth rate limiting across distributed workers and prevent API overload.
Dynamic Rate Limiting via Response Headers
Your API can communicate rate limits back to EZThrottle using response headers:
# Your API responds with these headers:
X-EZTHROTTLE-RPS: 5 # Allow 5 requests per second
X-EZTHROTTLE-MAX-CONCURRENT: 10 # Allow 10 concurrent requests
Header Details:
X-EZTHROTTLE-RPS: Requests per second (e.g.,0.5= 1 request per 2 seconds,5= 5 requests per second)X-EZTHROTTLE-MAX-CONCURRENT: Maximum concurrent requests (default: 2 per machine)
EZThrottle automatically adjusts its rate limiting based on these headers, ensuring optimal throughput without overwhelming your APIs.
Performance Note: Server-side retry handling is significantly faster and more performant than client-side retry loops. EZThrottle's distributed architecture eliminates connection overhead and retry latency. Benchmarks coming soon.
Requesting Custom Defaults
Need different default rate limits for your account? Submit a configuration request:
👉 Request custom defaults at github.com/rjpruitt16/ezconfig
Webhook Payload
When EZThrottle completes your job, it sends a POST request to your webhook URL with the following JSON payload:
{
"job_id": "job_1763674210055_853341",
"idempotent_key": "custom_key_or_generated_hash",
"status": "success",
"response": {
"status_code": 200,
"headers": {
"content-type": "application/json"
},
"body": "{\"result\": \"data\"}"
},
"metadata": {}
}
Fields:
job_id- Unique identifier for this jobidempotent_key- Your custom key or auto-generated hashstatus-"success"or"failed"response.status_code- HTTP status code from the target APIresponse.headers- Response headers from the target APIresponse.body- Response body from the target API (as string)metadata- Custom metadata you provided during job submission
Example webhook handler (Flask):
from flask import Flask, request
app = Flask(__name__)
@app.route('/webhook', methods=['POST'])
def handle_webhook():
payload = request.json
job_id = payload['job_id']
status = payload['status']
if status == 'success':
response_body = payload['response']['body']
# Process successful result
print(f"Job {job_id} succeeded: {response_body}")
else:
# Handle failure
print(f"Job {job_id} failed")
return {'ok': True}
Webhook Security (HMAC Signatures)
Protect your webhooks from spoofing attacks with HMAC-SHA256 signature verification. EZThrottle signs all webhook requests when you configure a webhook secret.
Quick Start
- Create a webhook secret:
from ezthrottle import EZThrottle
client = EZThrottle(api_key="your_api_key")
# Create secret (min 16 characters)
client.create_webhook_secret(
primary_secret="your_secure_random_secret_min_16_chars"
)
- Verify signatures in your webhook handler:
from flask import Flask, request
from ezthrottle import verify_webhook_signature_strict, WebhookVerificationError
app = Flask(__name__)
WEBHOOK_SECRET = "your_secure_random_secret_min_16_chars"
@app.route('/webhook', methods=['POST'])
def webhook():
# Get signature from header
signature = request.headers.get('X-EZThrottle-Signature', '')
payload = request.get_data()
# Verify signature (raises exception if invalid)
try:
verify_webhook_signature_strict(payload, signature, WEBHOOK_SECRET)
except WebhookVerificationError as e:
return {'error': str(e)}, 401
# Signature valid - process webhook
data = request.json
print(f"Job {data['job_id']} completed: {data['status']}")
return {'ok': True}
Signature Format
EZThrottle includes an X-EZThrottle-Signature header with each webhook request:
X-EZThrottle-Signature: t=1234567890,v1=abc123def456...
Format: t=timestamp,v1=signature
t- Unix timestamp when signature was generatedv1- HMAC-SHA256 hex signature of{timestamp}.{json_body}
Verification Methods
1. Strict Verification (Recommended)
Raises WebhookVerificationError if signature is invalid:
from ezthrottle import verify_webhook_signature_strict, WebhookVerificationError
try:
verify_webhook_signature_strict(
payload=request.get_data(),
signature_header=request.headers.get('X-EZThrottle-Signature', ''),
secret=WEBHOOK_SECRET,
tolerance=300 # 5 minutes (default)
)
# Signature valid!
except WebhookVerificationError as e:
return {'error': f'Invalid signature: {str(e)}'}, 401
2. Boolean Verification
Returns (verified: bool, reason: str):
from ezthrottle import verify_webhook_signature
verified, reason = verify_webhook_signature(
payload=request.get_data(),
signature_header=request.headers.get('X-EZThrottle-Signature', ''),
secret=WEBHOOK_SECRET
)
if not verified:
print(f"Verification failed: {reason}")
return {'error': 'Invalid signature'}, 401
Failure reasons:
no_signature_header- Missing X-EZThrottle-Signature headermissing_v1_signature- Malformed signature headertimestamp_expired (diff=350s, tolerance=300s)- Timestamp too oldsignature_mismatch- Signature doesn't match payloadverification_error: {details}- Parsing or crypto error
Secret Rotation
Use primary + secondary secrets for zero-downtime rotation:
# Step 1: Add new secret as primary, keep old as secondary
client.create_webhook_secret(
primary_secret="new_secret_after_rotation_min_16",
secondary_secret="old_secret_before_rotation_min_16"
)
# Step 2: Update webhook handlers to verify with both secrets
from ezthrottle import try_verify_with_secrets
verified, reason = try_verify_with_secrets(
payload=request.get_data(),
signature_header=request.headers.get('X-EZThrottle-Signature', ''),
primary_secret="new_secret_after_rotation_min_16",
secondary_secret="old_secret_before_rotation_min_16"
)
if not verified:
return {'error': f'Invalid signature: {reason}'}, 401
print(f"Verified with: {reason}") # "valid_primary" or "valid_secondary"
# Step 3: After verifying all webhooks work with new secret,
# remove secondary secret
client.create_webhook_secret("new_secret_after_rotation_min_16")
Or use the convenience method:
# Automatically rotates: new → primary, old → secondary
client.rotate_webhook_secret("new_secret_min_16_chars")
Manage Secrets
# Create or update secrets
client.create_webhook_secret(
primary_secret="your_secret_min_16_chars",
secondary_secret="optional_backup_min_16_chars" # For rotation
)
# Get secrets (masked for security)
secrets = client.get_webhook_secret()
print(secrets)
# {
# "customer_id": "cust_XXX",
# "primary_secret": "your****ars",
# "secondary_secret": "opti****ars",
# "has_secondary": True
# }
# Delete secrets
client.delete_webhook_secret()
Quick Commands (One-Liners)
Manage secrets from command line without writing a script:
# Create secret
python -c "from ezthrottle import EZThrottle; EZThrottle(api_key='your_api_key').create_webhook_secret('your_secret_min_16_chars')"
# Get secrets (view masked)
python -c "from ezthrottle import EZThrottle; import json; print(json.dumps(EZThrottle(api_key='your_api_key').get_webhook_secret(), indent=2))"
# Rotate secret
python -c "from ezthrottle import EZThrottle; EZThrottle(api_key='your_api_key').rotate_webhook_secret('new_secret_min_16_chars')"
# Delete secrets
python -c "from ezthrottle import EZThrottle; print(EZThrottle(api_key='your_api_key').delete_webhook_secret())"
Tip: Store your API key in an environment variable:
export EZTHROTTLE_API_KEY="your_api_key"
# Then use in commands
python -c "import os; from ezthrottle import EZThrottle; EZThrottle(api_key=os.getenv('EZTHROTTLE_API_KEY')).create_webhook_secret('secret')"
Best Practices
- Always verify signatures in production - Prevent webhook spoofing attacks
- Use strong secrets - Generate random 32+ character secrets
- Rotate secrets periodically - Use primary + secondary for zero-downtime rotation
- Set tolerance appropriately - Default 5 minutes (300s) prevents replay attacks
- Secure secret storage - Store secrets in environment variables, never in code
Example: Production-Ready Webhook Handler
from flask import Flask, request
from ezthrottle import try_verify_with_secrets, WebhookVerificationError
import os
app = Flask(__name__)
PRIMARY_SECRET = os.environ.get('WEBHOOK_SECRET_PRIMARY')
SECONDARY_SECRET = os.environ.get('WEBHOOK_SECRET_SECONDARY') # Optional
@app.route('/webhook', methods=['POST'])
def webhook():
# Verify signature
verified, reason = try_verify_with_secrets(
payload=request.get_data(),
signature_header=request.headers.get('X-EZThrottle-Signature', ''),
primary_secret=PRIMARY_SECRET,
secondary_secret=SECONDARY_SECRET
)
if not verified:
app.logger.warning(f"Invalid webhook signature: {reason}")
return {'error': 'Invalid signature'}, 401
# Signature valid - process webhook
data = request.json
job_id = data.get('job_id')
status = data.get('status')
app.logger.info(f"Webhook verified ({reason}): job_id={job_id}, status={status}")
# Your business logic here
if status == 'success':
response_data = data.get('response', {})
# Process successful result
else:
# Handle failure
pass
return {'ok': True}
Mixed Workflow Chains (FRUGAL ↔ PERFORMANCE)
Mix FRUGAL and PERFORMANCE steps in the same workflow to optimize for both cost and speed:
Example 1: FRUGAL → PERFORMANCE (Save money, then fast delivery)
# Primary API call is cheap (local execution)
# But notification needs speed (multi-region racing)
result = (
Step(client)
.url("https://api.openai.com/v1/chat/completions")
.type(StepType.FRUGAL) # Execute locally first
.fallback_on_error([429, 500])
.on_success(
# Chain to PERFORMANCE for fast webhook delivery
Step(client)
.url("https://api.sendgrid.com/send")
.type(StepType.PERFORMANCE) # Distributed execution
.webhooks([{"url": "https://app.com/email-sent"}])
.regions(["iad", "lax", "ord"])
)
.execute()
)
Example 2: PERFORMANCE → FRUGAL (Fast payment, then cheap analytics)
# Critical payment needs speed (racing)
# But analytics is cheap (local execution when webhook arrives)
payment = (
Step(client)
.url("https://api.stripe.com/charges")
.type(StepType.PERFORMANCE) # Fast distributed execution
.webhooks([{"url": "https://app.com/payment-complete"}])
.regions(["iad", "lax"])
.on_success(
# Analytics doesn't need speed - save money!
Step(client)
.url("https://analytics.com/track")
.type(StepType.FRUGAL) # Client executes when webhook arrives
)
.execute()
)
Example 3: Complex Mixed Workflow
# Optimize every step for its requirements
workflow = (
Step(client)
.url("https://cheap-api.com")
.type(StepType.FRUGAL) # Try locally first
.fallback_on_error([429, 500])
.fallback(
Step().url("https://backup-api.com"), # Still FRUGAL
trigger_on_error=[500]
)
.on_success(
# Critical notification needs PERFORMANCE
Step(client)
.url("https://critical-webhook.com")
.type(StepType.PERFORMANCE)
.webhooks([{"url": "https://app.com/webhook"}])
.regions(["iad", "lax", "ord"])
.on_success(
# Analytics is cheap again
Step(client)
.url("https://analytics.com/track")
.type(StepType.FRUGAL)
)
)
.on_failure(
# Simple Slack alert doesn't need PERFORMANCE
Step(client)
.url("https://hooks.slack.com/webhook")
.type(StepType.FRUGAL)
)
.execute()
)
Why mix workflows?
- ✅ Cost optimization - Only pay for what needs speed
- ✅ Performance where it matters - Critical paths get multi-region racing
- ✅ Flexibility - Every step optimized for its specific requirements
@auto_forward Decorator (Legacy Code Integration)
The killer feature: Integrate EZThrottle into existing code without rewriting error handling!
from ezthrottle import auto_forward, ForwardToEZThrottle
@auto_forward(client)
def process_payment(order_id):
"""
Legacy payment processing code.
Just raise ForwardToEZThrottle on errors - decorator handles the rest!
"""
try:
response = requests.post(
"https://api.stripe.com/charges",
headers={"Authorization": "Bearer sk_live_..."},
json={"amount": 1000, "currency": "usd"}
)
if response.status_code == 429:
# Decorator catches this and auto-forwards to EZThrottle!
raise ForwardToEZThrottle(
url="https://api.stripe.com/charges",
method="POST",
headers={"Authorization": "Bearer sk_live_..."},
body='{"amount": 1000, "currency": "usd"}',
idempotent_key=f"order_{order_id}",
metadata={"order_id": order_id, "customer_id": "cust_123"},
webhooks=[{"url": "https://app.com/payment-complete"}]
)
return response.json()
except requests.RequestException as e:
# Network errors also auto-forwarded
raise ForwardToEZThrottle(
url="https://api.stripe.com/charges",
method="POST",
idempotent_key=f"order_{order_id}",
metadata={"error": str(e)}
)
# Call your legacy function - works exactly the same!
result = process_payment("order_12345")
# Returns: {"job_id": "...", "status": "queued"}
Why this is amazing:
- ✅ No code refactoring required
- ✅ Drop-in replacement for existing error handling
- ✅ Keep your existing function signatures
- ✅ Gradual migration path
- ✅ Works with any HTTP library (requests, httpx, urllib)
Production Ready ✅
This SDK is production-ready with working examples validated in CI on every push.
Reference Implementation: test-app/
The test-app/ directory contains real, working code you can learn from. Not toy examples - this is production code we run in automated tests against live EZThrottle backend.
Multi-Region Racing (test-app/app.py:134-145)
Step(client)
.url("https://httpbin.org/delay/1")
.type(StepType.PERFORMANCE)
.webhooks([{"url": f"{APP_URL}/webhook"}])
.regions(["iad", "lax", "ord"]) # Race across 3 regions
.execution_mode("race") # First completion wins
.execute()
Idempotent HASH (Deduplication) (test-app/app.py:274-281)
# Same request twice = same job_id (deduplicated)
Step(client)
.url(f"https://httpbin.org/get?run={run_id}")
.idempotent_strategy(IdempotentStrategy.HASH)
.execute()
Fallback Chain (test-app/app.py:168-182)
Step(client)
.url("https://httpbin.org/status/500")
.fallback(
Step().url("https://httpbin.org/status/200"),
trigger_on_error=[500, 502, 503]
)
.execute()
On-Success Workflow (test-app/app.py:198-213)
Step(client)
.url("https://httpbin.org/status/200")
.on_success(
Step().url("https://httpbin.org/delay/1")
)
.execute()
Auto-Forward Decorator (test-app/app.py:246-256)
@auto_forward(client, fallback_on_error=[429, 500])
def legacy_api_call():
response = requests.get("https://httpbin.org/status/429")
response.raise_for_status() # Raises on 429
return response.json()
# Automatically forwards to EZThrottle on error!
Validated in CI:
- ✅ GitHub Actions runs these examples against live backend on every push
- ✅ 7 integration tests covering all SDK features
- ✅ Proves the code actually works, not just documentation
Asyncio Streaming (Non-Blocking Webhook Waiting)
Wait for webhook results asynchronously using Python's asyncio. Perfect for workflows that need to continue processing while waiting for EZThrottle to complete jobs.
Basic Asyncio Example
import asyncio
from ezthrottle import EZThrottle, Step, StepType
client = EZThrottle(api_key="your_api_key")
async def process_with_webhook():
# Submit job to EZThrottle
result = (
Step(client)
.url("https://api.example.com/endpoint")
.method("POST")
.type(StepType.PERFORMANCE)
.webhooks([{"url": "https://app.com/webhook", "has_quorum_vote": True}])
.idempotent_key("async_job_123")
.execute()
)
print(f"Job submitted: {result['job_id']}")
# Continue processing while EZThrottle executes the job
# Your webhook endpoint will receive the result asynchronously
# Run async function
asyncio.run(process_with_webhook())
Concurrent Job Submission with asyncio.gather
Submit multiple jobs concurrently and process results as they arrive:
import asyncio
from ezthrottle import EZThrottle, Step, StepType
client = EZThrottle(api_key="your_api_key")
async def submit_job(order):
"""Submit a single job asynchronously"""
result = (
Step(client)
.url("https://api.example.com/process")
.method("POST")
.body(str(order))
.type(StepType.PERFORMANCE)
.webhooks([{"url": "https://app.com/webhook", "has_quorum_vote": True}])
.idempotent_key(f"order_{order['id']}")
.execute()
)
return {
"order_id": order["id"],
"job_id": result["job_id"],
"idempotent_key": result.get("idempotent_key")
}
async def process_batch_concurrently(orders):
# Submit all jobs concurrently
tasks = [submit_job(order) for order in orders]
submissions = await asyncio.gather(*tasks)
print(f"Submitted {len(submissions)} jobs concurrently")
for s in submissions:
print(f"Order {s['order_id']} → Job {s['job_id']}")
# Webhook results will arrive asynchronously at https://app.com/webhook
return submissions
# Example usage
orders = [
{"id": "order_1", "amount": 1000},
{"id": "order_2", "amount": 2000},
{"id": "order_3", "amount": 3000}
]
asyncio.run(process_batch_concurrently(orders))
Fault-Tolerant Batch Processing
Handle failures gracefully with asyncio exception handling:
import asyncio
from ezthrottle import EZThrottle, Step, StepType
client = EZThrottle(api_key="your_api_key")
async def submit_job_with_error_handling(order):
"""Submit job with exception handling"""
try:
result = (
Step(client)
.url("https://api.example.com/process")
.method("POST")
.body(str(order))
.type(StepType.PERFORMANCE)
.webhooks([{"url": "https://app.com/webhook"}])
.idempotent_key(f"order_{order['id']}")
.execute()
)
return {"order_id": order["id"], "job_id": result["job_id"], "success": True}
except Exception as e:
return {"order_id": order["id"], "error": str(e), "success": False}
async def process_batch_with_error_handling(orders):
tasks = [submit_job_with_error_handling(order) for order in orders]
results = await asyncio.gather(*tasks, return_exceptions=True)
succeeded = [r for r in results if isinstance(r, dict) and r.get("success")]
failed = [r for r in results if isinstance(r, dict) and not r.get("success")] + \
[r for r in results if isinstance(r, Exception)]
print(f"Succeeded: {len(succeeded)}, Failed: {len(failed)}")
return {"succeeded": succeeded, "failed": failed}
# Example usage
orders = [
{"id": "order_1", "amount": 1000},
{"id": "order_2", "amount": 2000},
{"id": "order_3", "amount": 3000}
]
asyncio.run(process_batch_with_error_handling(orders))
Integration with FastAPI Webhook Handler
from fastapi import FastAPI, Request
from ezthrottle import EZThrottle, Step, StepType
import asyncio
app = FastAPI()
client = EZThrottle(api_key="your_api_key")
# In-memory store for webhook results (use Redis/DB in production)
webhook_results = {}
@app.post("/webhook")
async def webhook_receiver(request: Request):
"""Receive webhooks from EZThrottle"""
data = await request.json()
job_id = data.get("job_id")
idempotent_key = data.get("idempotent_key")
status = data.get("status")
response = data.get("response")
# Store result
webhook_results[idempotent_key] = {
"job_id": job_id,
"status": status,
"response": response,
"received_at": datetime.now()
}
print(f"Webhook received for {idempotent_key}: {status}")
return {"ok": True}
@app.post("/submit")
async def submit_job():
"""Submit job and return immediately"""
idempotent_key = f"job_{int(time.time() * 1000)}"
result = (
Step(client)
.url("https://api.example.com/endpoint")
.method("POST")
.type(StepType.PERFORMANCE)
.webhooks([{"url": "https://app.com/webhook", "has_quorum_vote": True}])
.idempotent_key(idempotent_key)
.execute()
)
# Return immediately, don't wait for webhook
return {
"job_id": result["job_id"],
"idempotent_key": idempotent_key,
"message": "Job submitted, webhook will arrive asynchronously"
}
@app.get("/result/{idempotent_key}")
async def get_result(idempotent_key: str):
"""Poll for webhook result"""
result = webhook_results.get(idempotent_key)
if result:
return {"found": True, "result": result}
else:
return {"found": False, "message": "Webhook not yet received"}
Background Task Processing with asyncio
Process multiple jobs in the background while serving requests:
import asyncio
from ezthrottle import EZThrottle, Step, StepType
client = EZThrottle(api_key="your_api_key")
async def background_job_processor(queue):
"""Process jobs from a queue in the background"""
while True:
if queue.empty():
await asyncio.sleep(1)
continue
order = await queue.get()
try:
result = (
Step(client)
.url("https://api.example.com/process")
.method("POST")
.body(str(order))
.type(StepType.PERFORMANCE)
.webhooks([{"url": "https://app.com/webhook"}])
.idempotent_key(f"order_{order['id']}")
.execute()
)
print(f"Submitted job {result['job_id']} for order {order['id']}")
except Exception as e:
print(f"Failed to submit order {order['id']}: {e}")
finally:
queue.task_done()
async def main():
queue = asyncio.Queue()
# Start background processor
processor = asyncio.create_task(background_job_processor(queue))
# Add jobs to queue
orders = [
{"id": "order_1", "amount": 1000},
{"id": "order_2", "amount": 2000},
{"id": "order_3", "amount": 3000}
]
for order in orders:
await queue.put(order)
# Wait for all jobs to be processed
await queue.join()
# Cancel background processor
processor.cancel()
try:
await processor
except asyncio.CancelledError:
pass
print("All jobs processed!")
asyncio.run(main())
Legacy API (Deprecated)
For backward compatibility, the old queue_request() method is still available:
client.queue_request(
url="https://api.example.com",
webhook_url="https://your-app.com/webhook", # Note: singular
method="POST"
)
Prefer the new Step builder API for all new code!
Appendix
Environment Variables
EZTHROTTLE_API_KEY=your_api_key_here
License
MIT
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file ezthrottle-1.3.0.tar.gz.
File metadata
- Download URL: ezthrottle-1.3.0.tar.gz
- Upload date:
- Size: 39.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.1.3 CPython/3.13.4 Darwin/24.5.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
322dad3b68845ed63e8f59c835ab3f862731d16e2f4c2acdf3f9101c2d225a73
|
|
| MD5 |
e56d1a742434caeb1a46bffc10185848
|
|
| BLAKE2b-256 |
81bb5c9a6a70a3b889646de6b8cc1988232f1fa62daf907f8ce0f67b1f3bc88a
|
File details
Details for the file ezthrottle-1.3.0-py3-none-any.whl.
File metadata
- Download URL: ezthrottle-1.3.0-py3-none-any.whl
- Upload date:
- Size: 31.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.1.3 CPython/3.13.4 Darwin/24.5.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b141cb8d2e03506fd9c1ea8c535a9643cf07afe64d436438422df2bfd0c2aca8
|
|
| MD5 |
24c1fe190724c2e256bb9444681979e6
|
|
| BLAKE2b-256 |
4f8e021754574ee672640827a5440ead3b38acb5a5f3c3454edea22349241daa
|