The task queue that works out of the box — no Redis, no RabbitMQ, just pip install and go.
Project description
⚡ FlashQ
The task queue that works out of the box — no Redis, no RabbitMQ, just pip install flashq and go.
Why FlashQ?
Every Python developer has been there: you need background tasks, you look at Celery, and suddenly you need Redis or RabbitMQ running, a separate broker config, and 200 lines of boilerplate before your first task runs.
FlashQ changes that. SQLite is the default backend — zero external dependencies, zero config. Your tasks persist across restarts, and you can scale to PostgreSQL or Redis when you need to.
from flashq import FlashQ
app = FlashQ() # That's it. Uses SQLite by default.
@app.task()
def send_email(to: str, subject: str) -> None:
print(f"Sending email to {to}: {subject}")
# Enqueue a task
send_email.delay(to="user@example.com", subject="Welcome!")
Features
| Feature | FlashQ | Celery | Dramatiq | Huey | TaskIQ |
|---|---|---|---|---|---|
| Zero-config setup | ✅ | ❌ | ❌ | ⚠️ | ❌ |
| SQLite backend | ✅ | ❌ | ❌ | ✅ | ❌ |
| PostgreSQL backend | ✅ | ❌ | ❌ | ❌ | ⚠️ |
| Redis backend | ✅ | ✅ | ✅ | ✅ | ✅ |
| Async + Sync tasks | ✅ | ❌ | ❌ | ❌ | Async only |
Type-safe .delay() |
✅ | ❌ | ⚠️ | ❌ | ✅ |
| Task chains/groups | ✅ | ✅ | ❌ | ❌ | ❌ |
| Middleware system | ✅ | ✅ | ✅ | ❌ | ✅ |
| Rate limiting | ✅ | ✅ | ❌ | ❌ | ❌ |
| Web dashboard | ✅ | ⚠️ | ❌ | ❌ | ❌ |
| Dead letter queue | ✅ | ❌ | ❌ | ❌ | ❌ |
| Task timeouts | ✅ | ✅ | ✅ | ❌ | ✅ |
| Periodic/cron scheduler | ✅ | ⚠️ | ❌ | ✅ | ⚠️ |
| Zero dependencies | ✅ | ❌ | ❌ | ❌ | ❌ |
Installation
# Core (SQLite only — zero dependencies!)
pip install flashq
# With Redis
pip install "flashq[redis]"
# With PostgreSQL
pip install "flashq[postgres]"
# Development
pip install "flashq[dev]"
Try It Now!
See FlashQ in action with the built-in demo — zero config needed:
pip install flashq
python -m flashq.demo
You'll see 20 tasks being created, enqueued, processed by workers, and results printed in real-time.
Quick Start
1. Define tasks
# tasks.py
from flashq import FlashQ
app = FlashQ()
@app.task()
def add(x: int, y: int) -> int:
return x + y
@app.task(queue="emails", max_retries=5, retry_delay=30.0)
def send_email(to: str, subject: str, body: str) -> dict:
return {"status": "sent", "to": to}
@app.task(timeout=120.0) # Kill if takes >2 min
async def process_image(url: str) -> str:
# Async tasks just work™
result = await download_and_resize(url)
return result
2. Enqueue tasks
from tasks import add, send_email
# Simple dispatch
handle = add.delay(2, 3)
# With options
handle = send_email.apply(
kwargs={"to": "user@example.com", "subject": "Hi", "body": "Hello!"},
countdown=60, # delay by 60 seconds
)
# Check result
result = handle.get_result()
if result and result.is_success:
print(f"Result: {result.result}")
3. Start the worker
flashq worker tasks:app
⚡ FlashQ Worker
├─ name: worker-12345
├─ backend: SQLiteBackend
├─ queues: default
├─ concurrency: 4
├─ tasks: 3
│ └─ tasks.add
│ └─ tasks.send_email
│ └─ tasks.process_image
└─ Ready! Waiting for tasks...
Task Composition
Chain tasks sequentially, run them in parallel, or combine both:
from flashq import chain, group, chord
# Chain: sequential — result of each passed to next
pipe = chain(
download.s("https://example.com/data.csv"),
parse_csv.s(),
store_results.s(table="imports"),
)
pipe.delay(app)
# Group: parallel execution
batch = group(
send_email.s(to="alice@test.com", subject="Hi"),
send_email.s(to="bob@test.com", subject="Hi"),
send_email.s(to="carol@test.com", subject="Hi"),
)
handle = batch.delay(app)
results = handle.get_results(timeout=30)
# Chord: parallel + callback when all complete
workflow = chord(
group(fetch_price.s("AAPL"), fetch_price.s("GOOG"), fetch_price.s("MSFT")),
aggregate_prices.s(),
)
workflow.delay(app)
Middleware
Intercept task lifecycle events for logging, monitoring, or custom logic:
from flashq import FlashQ, Middleware
class MetricsMiddleware(Middleware):
def before_execute(self, message):
self.start = time.time()
return message
def after_execute(self, message, result):
duration = time.time() - self.start
statsd.timing(f"task.{message.task_name}.duration", duration)
def on_error(self, message, exc):
sentry.capture_exception(exc)
return False # Don't suppress
def on_dead(self, message, exc):
alert_ops_team(f"Task {message.task_name} permanently failed: {exc}")
app = FlashQ()
app.add_middleware(MetricsMiddleware())
Built-in middlewares: LoggingMiddleware, TimeoutMiddleware, RateLimiter.
Rate Limiting
from flashq.ratelimit import RateLimiter
limiter = RateLimiter(default_rate="100/m") # 100 tasks/minute global
limiter.configure("send_email", rate="10/m") # 10 emails/minute
limiter.configure("api_call", rate="60/h") # 60 API calls/hour
app.add_middleware(limiter)
Web Dashboard
Built-in monitoring UI — no extra services needed:
flashq dashboard myapp:app --port 5555
Open http://localhost:5555 to see:
- Real-time task counts by state (pending, running, success, failure)
- Task list with filtering by state, queue, and search
- Task detail modal with args, result, error, traceback
- Actions: cancel, revoke, purge queue
- Auto-refreshing every 5 seconds
# Or embed in your own ASGI app
from flashq.dashboard import create_dashboard
dashboard = create_dashboard(app, prefix="/admin/tasks")
# Mount alongside your FastAPI/Starlette app
Dead Letter Queue
Inspect and replay permanently failed tasks:
from flashq.dlq import DeadLetterQueue
dlq = DeadLetterQueue(app)
app.add_middleware(dlq.middleware()) # Auto-capture dead tasks
# Later...
for task in dlq.list():
print(f"{task.task_name}: {task.error}")
dlq.replay(task_id="abc123") # Re-enqueue with reset retries
dlq.replay_all() # Replay everything
dlq.purge() # Clear DLQ
Periodic Tasks
from flashq import FlashQ, every, cron
from flashq.scheduler import Scheduler
app = FlashQ()
@app.task(name="cleanup")
def cleanup_old_data():
delete_old_records(days=30)
@app.task(name="daily_report")
def daily_report():
generate_and_send_report()
scheduler = Scheduler(app)
scheduler.add("cleanup", every(hours=6))
scheduler.add("daily_report", cron("0 9 * * 1-5")) # 9 AM weekdays
scheduler.start()
Retry & Error Handling
@app.task(max_retries=5, retry_delay=30.0, retry_backoff=True)
def flaky_task():
# Retries: 30s → 60s → 120s → 240s → 480s (exponential backoff)
response = requests.get("https://unreliable-api.com")
response.raise_for_status()
from flashq.exceptions import TaskRetryError
@app.task(max_retries=10)
def smart_retry():
try:
do_something()
except TemporaryError:
raise TaskRetryError(countdown=5.0) # Custom retry delay
Backends
SQLite (Default — Zero Config)
app = FlashQ() # Creates flashq.db in current dir
app = FlashQ(backend=SQLiteBackend(path="/var/lib/flashq/tasks.db"))
app = FlashQ(backend=SQLiteBackend(path=":memory:")) # For testing
PostgreSQL
Uses LISTEN/NOTIFY for instant task delivery + FOR UPDATE SKIP LOCKED for atomic dequeue.
from flashq.backends.postgres import PostgresBackend
app = FlashQ(backend=PostgresBackend("postgresql://localhost/mydb"))
Redis
Uses sorted sets for scheduling and Lua scripts for atomic operations.
from flashq.backends.redis import RedisBackend
app = FlashQ(backend=RedisBackend("redis://localhost:6379/0"))
CLI
flashq worker myapp:app # Start worker
flashq worker myapp:app -q emails,sms # Specific queues
flashq worker myapp:app -c 16 # 16 concurrent threads
flashq dashboard myapp:app # Start web dashboard
flashq dashboard myapp:app -p 8080 # Custom port
flashq info myapp:app # Queue stats
flashq purge myapp:app -f # Purge queue
Benchmarks
Measured on Apple Silicon (M-series), Python 3.12, SQLite backend:
| Benchmark | Tasks | Time | Throughput | Avg Latency |
|---|---|---|---|---|
| Enqueue (write only) | 10,000 | 0.52s | 19,298/s | 0.05ms |
| Roundtrip (c=4) | 1,000 | 25.7s | 39/s | 0.02ms |
| I/O-bound (c=8) | 500 | 6.7s | 75/s | 11.6ms |
| CPU-bound (c=4) | 500 | 12.8s | 39/s | 0.03ms |
Concurrency scaling (500 tasks):
| Workers | Throughput |
|---|---|
| 1 | 10 tasks/s |
| 2 | 19 tasks/s |
| 4 | 38 tasks/s |
| 8 | 100 tasks/s |
💡 Throughput scales linearly with concurrency. For I/O-bound workloads, use higher concurrency.
Run benchmarks yourself:
python benchmarks/bench.py
Architecture
Your App → FlashQ → Backend (SQLite/PG/Redis) → Worker(s)
↕
Middleware Stack
Rate Limiter
Scheduler
Dead Letter Queue
FlashQ uses a clean, modular architecture:
- Backend: Pluggable storage (SQLite, PostgreSQL, Redis)
- Worker: Thread pool executor with graceful shutdown
- Middleware: Intercepts every stage of task lifecycle
- Scheduler: Interval and cron-based periodic dispatch
- Canvas: Task composition (chain, group, chord)
Roadmap
- Core engine with SQLite backend
- PostgreSQL backend (LISTEN/NOTIFY)
- Redis backend (Lua scripts)
- Task timeouts (non-blocking)
- Middleware system
- Rate limiting (token bucket)
- Dead letter queue
- Task chains, groups, chords
- Periodic/cron scheduler
- CLI (worker, info, purge)
- 240 tests, 95% core coverage
- Web dashboard (real-time monitoring)
- Task result streaming
- PyPI publish
Contributing
See CONTRIBUTING.md for development setup and guidelines.
License
MIT License. See LICENSE for details.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file flashq-0.2.0.tar.gz.
File metadata
- Download URL: flashq-0.2.0.tar.gz
- Upload date:
- Size: 106.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
be7b11da0a1800b4bbe32a8b333121dd37b09c4daccc872640ab5dc17275bc8b
|
|
| MD5 |
2e762edc92202c8f3b879c7376913f8f
|
|
| BLAKE2b-256 |
b28344a6e6ffc382c6d6584e8e4272e550a8544e7ad356e5a9d9cf11e27ba546
|
File details
Details for the file flashq-0.2.0-py3-none-any.whl.
File metadata
- Download URL: flashq-0.2.0-py3-none-any.whl
- Upload date:
- Size: 59.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
1c3cf2a574a5acb6e8a921c88d084118fb964ccefbd0bce6975b074228607195
|
|
| MD5 |
1956908e222dc95c3f292d9e13f6547f
|
|
| BLAKE2b-256 |
b0b6fe2a7dfa8712201ddcb7d21f3efe5aee2cb6df15b8b04ef58e33cebe6400
|