Skip to main content

The task queue that works out of the box — no Redis, no RabbitMQ, just pip install and go.

Project description

⚡ FlashQ

The task queue that works out of the box — no Redis, no RabbitMQ, just pip install flashq and go.

CI Python License: MIT Tests


Why FlashQ?

Every Python developer has been there: you need background tasks, you look at Celery, and suddenly you need Redis or RabbitMQ running, a separate broker config, and 200 lines of boilerplate before your first task runs.

FlashQ changes that. SQLite is the default backend — zero external dependencies, zero config. Your tasks persist across restarts, and you can scale to PostgreSQL or Redis when you need to.

from flashq import FlashQ

app = FlashQ()  # That's it. Uses SQLite by default.

@app.task()
def send_email(to: str, subject: str) -> None:
    print(f"Sending email to {to}: {subject}")

# Enqueue a task
send_email.delay(to="user@example.com", subject="Welcome!")

Features

Feature FlashQ Celery Dramatiq Huey TaskIQ
Zero-config setup ⚠️
SQLite backend
PostgreSQL backend ⚠️
Redis backend
Async + Sync tasks Async only
Type-safe .delay() ⚠️
Task chains/groups
Middleware system
Rate limiting
Web dashboard ⚠️
Dead letter queue
Task timeouts
Periodic/cron scheduler ⚠️ ⚠️
Zero dependencies

Installation

# Core (SQLite only — zero dependencies!)
pip install flashq

# With Redis
pip install "flashq[redis]"

# With PostgreSQL
pip install "flashq[postgres]"

# Development
pip install "flashq[dev]"

Try It Now!

See FlashQ in action with the built-in demo — zero config needed:

pip install flashq
python -m flashq.demo

You'll see 20 tasks being created, enqueued, processed by workers, and results printed in real-time.

Quick Start

1. Define tasks

# tasks.py
from flashq import FlashQ

app = FlashQ()

@app.task()
def add(x: int, y: int) -> int:
    return x + y

@app.task(queue="emails", max_retries=5, retry_delay=30.0)
def send_email(to: str, subject: str, body: str) -> dict:
    return {"status": "sent", "to": to}

@app.task(timeout=120.0)  # Kill if takes >2 min
async def process_image(url: str) -> str:
    # Async tasks just work™
    result = await download_and_resize(url)
    return result

2. Enqueue tasks

from tasks import add, send_email

# Simple dispatch
handle = add.delay(2, 3)

# With options
handle = send_email.apply(
    kwargs={"to": "user@example.com", "subject": "Hi", "body": "Hello!"},
    countdown=60,  # delay by 60 seconds
)

# Check result
result = handle.get_result()
if result and result.is_success:
    print(f"Result: {result.result}")

3. Start the worker

flashq worker tasks:app
 ⚡ FlashQ Worker
 ├─ name:        worker-12345
 ├─ backend:     SQLiteBackend
 ├─ queues:      default
 ├─ concurrency: 4
 ├─ tasks:       3
 │    └─ tasks.add
 │    └─ tasks.send_email
 │    └─ tasks.process_image
 └─ Ready! Waiting for tasks...

Task Composition

Chain tasks sequentially, run them in parallel, or combine both:

from flashq import chain, group, chord

# Chain: sequential — result of each passed to next
pipe = chain(
    download.s("https://example.com/data.csv"),
    parse_csv.s(),
    store_results.s(table="imports"),
)
pipe.delay(app)

# Group: parallel execution
batch = group(
    send_email.s(to="alice@test.com", subject="Hi"),
    send_email.s(to="bob@test.com", subject="Hi"),
    send_email.s(to="carol@test.com", subject="Hi"),
)
handle = batch.delay(app)
results = handle.get_results(timeout=30)

# Chord: parallel + callback when all complete
workflow = chord(
    group(fetch_price.s("AAPL"), fetch_price.s("GOOG"), fetch_price.s("MSFT")),
    aggregate_prices.s(),
)
workflow.delay(app)

Middleware

Intercept task lifecycle events for logging, monitoring, or custom logic:

from flashq import FlashQ, Middleware

class MetricsMiddleware(Middleware):
    def before_execute(self, message):
        self.start = time.time()
        return message

    def after_execute(self, message, result):
        duration = time.time() - self.start
        statsd.timing(f"task.{message.task_name}.duration", duration)

    def on_error(self, message, exc):
        sentry.capture_exception(exc)
        return False  # Don't suppress

    def on_dead(self, message, exc):
        alert_ops_team(f"Task {message.task_name} permanently failed: {exc}")

app = FlashQ()
app.add_middleware(MetricsMiddleware())

Built-in middlewares: LoggingMiddleware, TimeoutMiddleware, RateLimiter.

Rate Limiting

from flashq.ratelimit import RateLimiter

limiter = RateLimiter(default_rate="100/m")  # 100 tasks/minute global
limiter.configure("send_email", rate="10/m")  # 10 emails/minute
limiter.configure("api_call", rate="60/h")    # 60 API calls/hour

app.add_middleware(limiter)

Web Dashboard

Built-in monitoring UI — no extra services needed:

flashq dashboard myapp:app --port 5555

Open http://localhost:5555 to see:

  • Real-time task counts by state (pending, running, success, failure)
  • Task list with filtering by state, queue, and search
  • Task detail modal with args, result, error, traceback
  • Actions: cancel, revoke, purge queue
  • Auto-refreshing every 5 seconds
# Or embed in your own ASGI app
from flashq.dashboard import create_dashboard

dashboard = create_dashboard(app, prefix="/admin/tasks")
# Mount alongside your FastAPI/Starlette app

Dead Letter Queue

Inspect and replay permanently failed tasks:

from flashq.dlq import DeadLetterQueue

dlq = DeadLetterQueue(app)
app.add_middleware(dlq.middleware())  # Auto-capture dead tasks

# Later...
for task in dlq.list():
    print(f"{task.task_name}: {task.error}")

dlq.replay(task_id="abc123")  # Re-enqueue with reset retries
dlq.replay_all()              # Replay everything
dlq.purge()                   # Clear DLQ

Periodic Tasks

from flashq import FlashQ, every, cron
from flashq.scheduler import Scheduler

app = FlashQ()

@app.task(name="cleanup")
def cleanup_old_data():
    delete_old_records(days=30)

@app.task(name="daily_report")
def daily_report():
    generate_and_send_report()

scheduler = Scheduler(app)
scheduler.add("cleanup", every(hours=6))
scheduler.add("daily_report", cron("0 9 * * 1-5"))  # 9 AM weekdays
scheduler.start()

Retry & Error Handling

@app.task(max_retries=5, retry_delay=30.0, retry_backoff=True)
def flaky_task():
    # Retries: 30s → 60s → 120s → 240s → 480s (exponential backoff)
    response = requests.get("https://unreliable-api.com")
    response.raise_for_status()
from flashq.exceptions import TaskRetryError

@app.task(max_retries=10)
def smart_retry():
    try:
        do_something()
    except TemporaryError:
        raise TaskRetryError(countdown=5.0)  # Custom retry delay

Backends

SQLite (Default — Zero Config)

app = FlashQ()  # Creates flashq.db in current dir
app = FlashQ(backend=SQLiteBackend(path="/var/lib/flashq/tasks.db"))
app = FlashQ(backend=SQLiteBackend(path=":memory:"))  # For testing

PostgreSQL

Uses LISTEN/NOTIFY for instant task delivery + FOR UPDATE SKIP LOCKED for atomic dequeue.

from flashq.backends.postgres import PostgresBackend
app = FlashQ(backend=PostgresBackend("postgresql://localhost/mydb"))

Redis

Uses sorted sets for scheduling and Lua scripts for atomic operations.

from flashq.backends.redis import RedisBackend
app = FlashQ(backend=RedisBackend("redis://localhost:6379/0"))

CLI

flashq worker myapp:app                  # Start worker
flashq worker myapp:app -q emails,sms    # Specific queues
flashq worker myapp:app -c 16            # 16 concurrent threads
flashq dashboard myapp:app               # Start web dashboard
flashq dashboard myapp:app -p 8080       # Custom port
flashq info myapp:app                    # Queue stats
flashq purge myapp:app -f                # Purge queue

Benchmarks

Measured on Apple Silicon (M-series), Python 3.12, SQLite backend:

Benchmark Tasks Time Throughput Avg Latency
Enqueue (write only) 10,000 0.52s 19,298/s 0.05ms
Roundtrip (c=4) 1,000 25.7s 39/s 0.02ms
I/O-bound (c=8) 500 6.7s 75/s 11.6ms
CPU-bound (c=4) 500 12.8s 39/s 0.03ms

Concurrency scaling (500 tasks):

Workers Throughput
1 10 tasks/s
2 19 tasks/s
4 38 tasks/s
8 100 tasks/s

💡 Throughput scales linearly with concurrency. For I/O-bound workloads, use higher concurrency.

Run benchmarks yourself:

python benchmarks/bench.py

Architecture

Your App → FlashQ → Backend (SQLite/PG/Redis) → Worker(s)
                ↕
          Middleware Stack
          Rate Limiter
          Scheduler
          Dead Letter Queue

FlashQ uses a clean, modular architecture:

  • Backend: Pluggable storage (SQLite, PostgreSQL, Redis)
  • Worker: Thread pool executor with graceful shutdown
  • Middleware: Intercepts every stage of task lifecycle
  • Scheduler: Interval and cron-based periodic dispatch
  • Canvas: Task composition (chain, group, chord)

Roadmap

  • Core engine with SQLite backend
  • PostgreSQL backend (LISTEN/NOTIFY)
  • Redis backend (Lua scripts)
  • Task timeouts (non-blocking)
  • Middleware system
  • Rate limiting (token bucket)
  • Dead letter queue
  • Task chains, groups, chords
  • Periodic/cron scheduler
  • CLI (worker, info, purge)
  • 240 tests, 95% core coverage
  • Web dashboard (real-time monitoring)
  • Task result streaming
  • PyPI publish

Contributing

See CONTRIBUTING.md for development setup and guidelines.

License

MIT License. See LICENSE for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

flashq-0.2.2.tar.gz (113.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

flashq-0.2.2-py3-none-any.whl (60.1 kB view details)

Uploaded Python 3

File details

Details for the file flashq-0.2.2.tar.gz.

File metadata

  • Download URL: flashq-0.2.2.tar.gz
  • Upload date:
  • Size: 113.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.3

File hashes

Hashes for flashq-0.2.2.tar.gz
Algorithm Hash digest
SHA256 dc1e45681eb832a97c33d6d217e6b563ce8c4a1e1f0d57655d698b9195c54ba3
MD5 4cbf892096ca1a43cff1c9b21183457b
BLAKE2b-256 a19889a8b3849d970f914cc791aeb514986d84319ee4cd34aeb4d325f5b79ed9

See more details on using hashes here.

File details

Details for the file flashq-0.2.2-py3-none-any.whl.

File metadata

  • Download URL: flashq-0.2.2-py3-none-any.whl
  • Upload date:
  • Size: 60.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.3

File hashes

Hashes for flashq-0.2.2-py3-none-any.whl
Algorithm Hash digest
SHA256 3c234df2d44c749debd378577273517961c8c3963f0e80d447ce0dda3168955b
MD5 b7087d16c7b5c142d84fd17d170abe89
BLAKE2b-256 e4bc5a9a144a4c389e6bcb1e1e98e628e2fd6023dbe974f3718a2d81fca07fda

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page