Background task processing for Lexigram Framework - Scheduling, workers, and job queues
Project description
lexigram-tasks
Background task processing for Lexigram Framework — Scheduling, workers, and job queues
Overview
lexigram-tasks provides background task execution, scheduling, and async job queues with support for Redis, RabbitMQ, PostgreSQL, and in-memory backends. It features configurable retry policies, dead-letter queues, priority queues, rate limiting, cron scheduling, and Prometheus metrics. All services are wired via TasksProvider, which registers task protocols with the DI container.
Install
uv add lexigram-tasks
# Optional extras
uv add "lexigram-tasks[redis]" # Redis backend
uv add "lexigram-tasks[rabbitmq]" # RabbitMQ backend
Quick Start
from lexigram import Application
from lexigram.di.module import Module, module
# Import the module from the package
from lexigram.tasks import TasksModule
@module(imports=[TasksModule.configure(...)])
class AppModule(Module):
pass
app = Application(modules=[AppModule])
if __name__ == "__main__":
app.run()
Configuration
Zero-config usage: Call
TasksModule.configure()with no arguments to use defaults.
Option 1 — YAML file
# application.yaml
tasks:
backend:
type: redis
redis_url: redis://localhost:6379/0
worker:
worker_count: 4
max_concurrent_tasks: 10
scheduler:
enabled: true
timezone: UTC
Option 2 — Profiles + Environment Variables (recommended)
export LEX_TASKS__ENABLED=true
# Environment variables for each field
Option 3 — Python
from lexigram.tasks.config import TaskConfig
from lexigram.tasks import TasksModule
config = TaskConfig(
backend=TaskBackendConfig(type="redis", redis_url="redis://localhost:6379/0"),
worker=TaskWorkerConfig(worker_count=4, max_concurrent_tasks=10),
)
TasksModule.configure(queue=queue, worker_count=4, enable_scheduler=True)
Config reference
| Field | Default | Env var | Description |
|---|---|---|---|
backend.type |
memory |
LEX_TASKS__BACKEND__TYPE |
Queue backend (redis, rabbitmq, postgres, memory) |
backend.redis_url |
redis://localhost:6379/0 |
LEX_TASKS__BACKEND__REDIS_URL |
Redis connection URL |
backend.amqp_url |
amqp://localhost |
LEX_TASKS__BACKEND__AMQP_URL |
AMQP connection URL |
worker.worker_count |
4 |
LEX_TASKS__WORKER__WORKER_COUNT |
Number of worker processes |
worker.max_concurrent_tasks |
10 |
LEX_TASKS__WORKER__MAX_CONCURRENT_TASKS |
Max tasks executed in parallel per worker |
worker.default_timeout |
300 |
LEX_TASKS__WORKER__DEFAULT_TIMEOUT |
Task execution timeout in seconds |
worker.max_retries |
3 |
LEX_TASKS__WORKER__MAX_RETRIES |
Maximum retry attempts per task |
scheduler.enabled |
true |
LEX_TASKS__SCHEDULER__ENABLED |
Enable cron-based task scheduling |
scheduler.timezone |
UTC |
LEX_TASKS__SCHEDULER__TIMEZONE |
Timezone for cron schedule evaluation |
Module Factory Methods
| Method | Description |
|---|---|
TasksModule.configure(queue, worker_count, enable_scheduler) |
Configure with explicit queue and worker settings |
TasksModule.stub() |
Minimal config for testing |
Key Features
- Multiple backends — Redis, RabbitMQ, PostgreSQL (transactional), in-memory
- Retry policies — Configurable
retries,backoff,max_delayper task - Dead-letter queue — Failed jobs routed to DLQ for inspection / replay
- Priority queues —
urgent,default,bulkbuilt-in; custom queues supported - Rate limiting — Per-queue and per-task throughput caps
- Concurrency — Bounded worker pool with backpressure
- Cron scheduling — Cron-expression task scheduling via
@scheduleddecorator - Observability — Prometheus metrics for queue depth, latency, error rate
- Health checks —
/health/tasksendpoint vialexigram-monitor
Testing
async with Application.boot(modules=[TasksModule.stub()]) as app:
# your test code
...
BackgroundTaskManager
A container-injectable service for fire-and-go tasks that ensures no task handle is lost and all pending work is cancelled on framework shutdown.
from lexigram.tasks import BackgroundTaskManager
class MyService:
def __init__(self, task_manager: BackgroundTaskManager) -> None:
self._tasks = task_manager
async def kick_off_work(self) -> None:
self._tasks.track(self._do_something())
async def kick_off_named_work(self) -> None:
self._tasks.track_named("my-named-job", self._do_something())
async def check_pending(self) -> int:
return self._tasks.pending_count
# In your Provider.shutdown():
await task_manager.shutdown(timeout=30.0)
Register as a singleton in your provider:
from lexigram.tasks import BackgroundTaskManager
from lexigram.di.provider import Provider
class MyProvider(Provider):
async def register(self, container):
container.singleton(BackgroundTaskManager, BackgroundTaskManager())
async def shutdown(self):
mgr = await self._container.resolve(BackgroundTaskManager)
await mgr.shutdown(timeout=30.0)
ScheduledWorker
A base class for services that run a cycle of work on a fixed interval — replacing hand-rolled while not stop_event loops.
from lexigram.tasks import BackgroundTaskManager, OnErrorPolicy, ScheduledWorker
class RetentionWorker(ScheduledWorker):
interval_seconds = 3600.0 # run every hour
initial_delay_seconds = 5.0 # wait 5 s before the first cycle
on_error_policy = OnErrorPolicy.LOG_AND_CONTINUE # (default)
async def run_cycle(self) -> None:
await self._repo.delete_expired_records()
# In your provider.boot():
task_manager = await container.resolve(BackgroundTaskManager)
self._worker = RetentionWorker(task_manager=task_manager)
await self._worker.start()
# In your provider.shutdown():
await self._worker.stop()
Override at construction time to tune per-instance without subclassing:
worker = RetentionWorker(
task_manager=task_manager,
interval_seconds=300.0,
max_jitter_seconds=30.0,
on_error_policy=OnErrorPolicy.BACKOFF,
)
OnErrorPolicy values:
| Value | Behaviour on run_cycle error |
|---|---|
LOG_AND_CONTINUE |
Log the exception and resume on the next interval (default). |
BACKOFF |
Log and double the sleep time (up to 10× interval). |
STOP |
Log and stop the worker permanently. |
Key Source Files
| File | What it contains |
|---|---|
src/lexigram/tasks/module.py |
TasksModule class with factory methods |
src/lexigram/tasks/di/provider.py |
TasksProvider — wires task protocols into DI container |
src/lexigram/tasks/config.py |
TaskConfig and sub-config classes |
src/lexigram/tasks/backends/ |
Backend implementations (memory, redis, rabbitmq, postgres) |
src/lexigram/tasks/scheduling/ |
Cron scheduler and scheduled task decorators |
src/lexigram/tasks/background_task_manager.py |
BackgroundTaskManager — lifecycle-aware task tracking (LEX-006) |
src/lexigram/tasks/scheduled_worker.py |
ScheduledWorker + OnErrorPolicy — periodic worker base class (LEX-005) |
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file lexigram_tasks-0.1.1-py3-none-any.whl.
File metadata
- Download URL: lexigram_tasks-0.1.1-py3-none-any.whl
- Upload date:
- Size: 141.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.8.14
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ab02a8a39acce364ca64fdb46e9d8ea3b8e5639a8cf0f0a6a80157a909304a18
|
|
| MD5 |
4269b387e0201bd7a33439b874e6693c
|
|
| BLAKE2b-256 |
037e797a5346a216323fcfddaf247d83ab750cd3257ae9f3dbcc98f40d260a76
|