Skip to main content

Background task processing for Lexigram Framework - Scheduling, workers, and job queues

Project description

lexigram-tasks

Background task processing for Lexigram Framework — Scheduling, workers, and job queues


Overview

lexigram-tasks provides background task execution, scheduling, and async job queues with support for Redis, RabbitMQ, PostgreSQL, and in-memory backends. It features configurable retry policies, dead-letter queues, priority queues, rate limiting, cron scheduling, and Prometheus metrics. All services are wired via TasksProvider, which registers task protocols with the DI container.


Install

uv add lexigram-tasks
# Optional extras
uv add "lexigram-tasks[redis]"     # Redis backend
uv add "lexigram-tasks[rabbitmq]"  # RabbitMQ backend

Quick Start

from lexigram import Application
from lexigram.di.module import Module, module

# Import the module from the package
from lexigram.tasks import TasksModule

@module(imports=[TasksModule.configure(...)])
class AppModule(Module):
    pass

app = Application(modules=[AppModule])
if __name__ == "__main__":
    app.run()

Configuration

Zero-config usage: Call TasksModule.configure() with no arguments to use defaults.

Option 1 — YAML file

# application.yaml
tasks:
  backend:
    type: redis
    redis_url: redis://localhost:6379/0
  worker:
    worker_count: 4
    max_concurrent_tasks: 10
  scheduler:
    enabled: true
    timezone: UTC

Option 2 — Profiles + Environment Variables (recommended)

export LEX_TASKS__ENABLED=true
# Environment variables for each field

Option 3 — Python

from lexigram.tasks.config import TaskConfig
from lexigram.tasks import TasksModule

config = TaskConfig(
    backend=TaskBackendConfig(type="redis", redis_url="redis://localhost:6379/0"),
    worker=TaskWorkerConfig(worker_count=4, max_concurrent_tasks=10),
)
TasksModule.configure(queue=queue, worker_count=4, enable_scheduler=True)

Config reference

Field Default Env var Description
backend.type memory LEX_TASKS__BACKEND__TYPE Queue backend (redis, rabbitmq, postgres, memory)
backend.redis_url redis://localhost:6379/0 LEX_TASKS__BACKEND__REDIS_URL Redis connection URL
backend.amqp_url amqp://localhost LEX_TASKS__BACKEND__AMQP_URL AMQP connection URL
worker.worker_count 4 LEX_TASKS__WORKER__WORKER_COUNT Number of worker processes
worker.max_concurrent_tasks 10 LEX_TASKS__WORKER__MAX_CONCURRENT_TASKS Max tasks executed in parallel per worker
worker.default_timeout 300 LEX_TASKS__WORKER__DEFAULT_TIMEOUT Task execution timeout in seconds
worker.max_retries 3 LEX_TASKS__WORKER__MAX_RETRIES Maximum retry attempts per task
scheduler.enabled true LEX_TASKS__SCHEDULER__ENABLED Enable cron-based task scheduling
scheduler.timezone UTC LEX_TASKS__SCHEDULER__TIMEZONE Timezone for cron schedule evaluation

Module Factory Methods

Method Description
TasksModule.configure(queue, worker_count, enable_scheduler) Configure with explicit queue and worker settings
TasksModule.stub() Minimal config for testing

Key Features

  • Multiple backends — Redis, RabbitMQ, PostgreSQL (transactional), in-memory
  • Retry policies — Configurable retries, backoff, max_delay per task
  • Dead-letter queue — Failed jobs routed to DLQ for inspection / replay
  • Priority queuesurgent, default, bulk built-in; custom queues supported
  • Rate limiting — Per-queue and per-task throughput caps
  • Concurrency — Bounded worker pool with backpressure
  • Cron scheduling — Cron-expression task scheduling via @scheduled decorator
  • Observability — Prometheus metrics for queue depth, latency, error rate
  • Health checks/health/tasks endpoint via lexigram-monitor

Testing

async with Application.boot(modules=[TasksModule.stub()]) as app:
    # your test code
    ...

BackgroundTaskManager

A container-injectable service for fire-and-go tasks that ensures no task handle is lost and all pending work is cancelled on framework shutdown.

from lexigram.tasks import BackgroundTaskManager

class MyService:
    def __init__(self, task_manager: BackgroundTaskManager) -> None:
        self._tasks = task_manager

    async def kick_off_work(self) -> None:
        self._tasks.track(self._do_something())

    async def kick_off_named_work(self) -> None:
        self._tasks.track_named("my-named-job", self._do_something())

    async def check_pending(self) -> int:
        return self._tasks.pending_count

# In your Provider.shutdown():
await task_manager.shutdown(timeout=30.0)

Register as a singleton in your provider:

from lexigram.tasks import BackgroundTaskManager
from lexigram.di.provider import Provider

class MyProvider(Provider):
    async def register(self, container):
        container.singleton(BackgroundTaskManager, BackgroundTaskManager())

    async def shutdown(self):
        mgr = await self._container.resolve(BackgroundTaskManager)
        await mgr.shutdown(timeout=30.0)

ScheduledWorker

A base class for services that run a cycle of work on a fixed interval — replacing hand-rolled while not stop_event loops.

from lexigram.tasks import BackgroundTaskManager, OnErrorPolicy, ScheduledWorker

class RetentionWorker(ScheduledWorker):
    interval_seconds = 3600.0          # run every hour
    initial_delay_seconds = 5.0        # wait 5 s before the first cycle
    on_error_policy = OnErrorPolicy.LOG_AND_CONTINUE  # (default)

    async def run_cycle(self) -> None:
        await self._repo.delete_expired_records()

# In your provider.boot():
task_manager = await container.resolve(BackgroundTaskManager)
self._worker = RetentionWorker(task_manager=task_manager)
await self._worker.start()

# In your provider.shutdown():
await self._worker.stop()

Override at construction time to tune per-instance without subclassing:

worker = RetentionWorker(
    task_manager=task_manager,
    interval_seconds=300.0,
    max_jitter_seconds=30.0,
    on_error_policy=OnErrorPolicy.BACKOFF,
)

OnErrorPolicy values:

Value Behaviour on run_cycle error
LOG_AND_CONTINUE Log the exception and resume on the next interval (default).
BACKOFF Log and double the sleep time (up to 10× interval).
STOP Log and stop the worker permanently.

Key Source Files

File What it contains
src/lexigram/tasks/module.py TasksModule class with factory methods
src/lexigram/tasks/di/provider.py TasksProvider — wires task protocols into DI container
src/lexigram/tasks/config.py TaskConfig and sub-config classes
src/lexigram/tasks/backends/ Backend implementations (memory, redis, rabbitmq, postgres)
src/lexigram/tasks/scheduling/ Cron scheduler and scheduled task decorators
src/lexigram/tasks/background_task_manager.py BackgroundTaskManager — lifecycle-aware task tracking (LEX-006)
src/lexigram/tasks/scheduled_worker.py ScheduledWorker + OnErrorPolicy — periodic worker base class (LEX-005)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

lexigram_tasks-0.1.1-py3-none-any.whl (141.4 kB view details)

Uploaded Python 3

File details

Details for the file lexigram_tasks-0.1.1-py3-none-any.whl.

File metadata

File hashes

Hashes for lexigram_tasks-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 ab02a8a39acce364ca64fdb46e9d8ea3b8e5639a8cf0f0a6a80157a909304a18
MD5 4269b387e0201bd7a33439b874e6693c
BLAKE2b-256 037e797a5346a216323fcfddaf247d83ab750cd3257ae9f3dbcc98f40d260a76

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page