Skip to main content

Process background jobs with a database-driven job queue.

Project description

plain.jobs

Process background jobs with a database-driven job queue.

Overview

Jobs are defined using the Job base class and the run() method at a minimum.

from plain.jobs import Job, register_job
from plain.email import send_mail


@register_job
class WelcomeUserJob(Job):
    def __init__(self, user):
        self.user = user

    def run(self):
        send_mail(
            subject="Welcome!",
            message=f"Hello from Plain, {self.user}",
            from_email="welcome@plainframework.com",
            recipient_list=[self.user.email],
        )

You can then create an instance of the job and call run_in_worker() to enqueue it for a background worker to pick up.

user = User.query.get(id=1)
WelcomeUserJob(user).run_in_worker()

Workers are run using the plain jobs worker command.

Jobs can be defined in any Python file, but it is suggested to use app/jobs.py or app/{pkg}/jobs.py as those will be imported automatically so the @register_job decorator will fire.

Run database migrations after installation:

plain migrate

Local development

In development, you will typically want to run the worker alongside your app with auto-reloading enabled. With plain.dev you can do this by adding it to the [tool.plain.dev.run] section of your pyproject.toml file.

# pyproject.toml
[tool.plain.dev.run]
worker = {cmd = "plain jobs worker --reload --stats-every 0 --max-processes 2"}
worker-slow = {cmd = "plain jobs worker --reload --queue slow --stats-every 0 --max-processes 2"}

The --reload flag will automatically watch .py and .env* files for changes and restart the worker when changes are detected.

Job parameters

When calling run_in_worker(), you can specify several parameters to control job execution:

job.run_in_worker(
    queue="slow",  # Target a specific queue (default: "default")
    delay=60,  # Delay in seconds (or timedelta/datetime)
    priority=10,  # Higher numbers run first (default: 0, use negatives for lower priority)
    retries=3,  # Number of retry attempts (default: 0)
    concurrency_key="user-123-welcome",  # Identifier for grouping/deduplication
)

For more advanced parameter options, see Job.run_in_worker().

Job methods

The Job base class provides several methods you can override to customize behavior:

class MyJob(Job):
    def run(self):
        # Required: The main job logic
        pass

    # Defaults (can be overridden in run_in_worker)
    def default_queue(self) -> str:
        return "default"

    def default_priority(self) -> int:
        # Higher numbers run first: 10 > 5 > 0 > -5 > -10
        return 0

    def default_retries(self) -> int:
        return 0

    def default_concurrency_key(self) -> str:
        # Identifier for grouping/deduplication
        return ""

    # Computed values
    def calculate_retry_delay(self, attempt: int) -> int:
        # Delay in seconds before retry (attempt starts at 1)
        return 0

    # Hooks
    def should_enqueue(self, concurrency_key: str) -> bool:
        # Called before enqueueing - return False to skip
        # Use for concurrency limits, rate limits, etc.
        return True

Scheduled jobs

You can schedule jobs to run at specific times using the Schedule class:

from plain.jobs import Job, register_job
from plain.jobs.scheduling import Schedule

@register_job
class DailyReportJob(Job):
    schedule = Schedule.from_cron("0 9 * * *")  # Every day at 9 AM

    def run(self):
        # Generate daily report
        pass

The Schedule class supports standard cron syntax and special strings:

  • @yearly or @annually - Run once a year
  • @monthly - Run once a month
  • @weekly - Run once a week
  • @daily or @midnight - Run once a day
  • @hourly - Run once an hour

For custom schedules, see Schedule.

Admin interface

The jobs package includes admin views for monitoring jobs under the "Jobs" section. The admin interface provides:

  • Requests: View pending jobs in the queue
  • Processes: Monitor currently running jobs
  • Results: Review completed and failed job history

Dashboard cards show at-a-glance statistics for successful, errored, lost, and retried jobs.

Job history

Job execution history is stored in the JobResult model. This includes:

  • Job class and parameters
  • Start and end times
  • Success/failure status
  • Error messages and tracebacks for failed jobs
  • Worker information

See Settings for configuring job retention and timeouts.

Monitoring

Workers report statistics and can be monitored using the --stats-every option:

# Report stats every 60 seconds
plain jobs worker --stats-every 60

The worker integrates with OpenTelemetry for distributed tracing. Spans are created for:

  • Job scheduling (run_in_worker)
  • Job execution
  • Job completion/failure

Jobs can be linked to the originating trace context, allowing you to track jobs initiated from web requests.

Settings

Setting Default Env var
JOBS_RESULTS_RETENTION 604800 (7 days) PLAIN_JOBS_RESULTS_RETENTION
JOBS_TIMEOUT 86400 (1 day) PLAIN_JOBS_TIMEOUT
JOBS_MIDDLEWARE [...] PLAIN_JOBS_MIDDLEWARE
JOBS_SCHEDULE [] PLAIN_JOBS_SCHEDULE

See default_settings.py for more details.

FAQs

How do I ensure only one job runs at a time?

Set a concurrency_key to automatically enforce uniqueness - only one job with the same key can be pending or processing:

from plain.jobs import Job, register_job

@register_job
class ProcessUserJob(Job):
    def __init__(self, user_id):
        self.user_id = user_id

    def default_concurrency_key(self):
        return f"user-{self.user_id}"

    def run(self):
        process_user(self.user_id)

# Usage
ProcessUserJob(123).run_in_worker()  # Enqueued
ProcessUserJob(123).run_in_worker()  # Returns None (blocked - job already pending/processing)

Alternatively, pass concurrency_key as a parameter to run_in_worker() instead of overriding the method.

How do I implement custom concurrency limits?

Use the should_enqueue() hook to implement custom concurrency control:

class ProcessUserDataJob(Job):
    def __init__(self, user_id):
        self.user_id = user_id

    def default_concurrency_key(self):
        return f"user-{self.user_id}"

    def should_enqueue(self, concurrency_key):
        # Only allow 1 job per user at a time
        processing = self.get_processing_jobs(concurrency_key).count()
        pending = self.get_requested_jobs(concurrency_key).count()
        return processing == 0 and pending == 0

For more patterns like rate limiting and global limits, see should_enqueue() in the source code.

How are race conditions prevented?

Plain uses PostgreSQL's advisory locks to ensure should_enqueue() checks are atomic with job creation. The lock is acquired during the transaction and automatically released when the transaction completes. This eliminates race conditions where multiple threads might simultaneously pass the should_enqueue() check.

For custom locking behavior (Redis, etc.), override get_enqueue_lock().

Can I run multiple workers?

Yes, you can run multiple worker processes:

plain jobs worker --max-processes 4

Or run workers for specific queues:

plain jobs worker --queue slow --max-processes 2

How do I handle job failures?

Set the number of retries and implement retry delays:

class MyJob(Job):
    def default_retries(self):
        return 3

    def calculate_retry_delay(self, attempt):
        # Exponential backoff: 1s, 2s, 4s
        return 2 ** (attempt - 1)

Idempotency

Jobs may retry on failure, so design them so re-execution is safe.

# Bad — sends duplicate emails on retry
@register_job
class WelcomeUserJob(Job):
    def __init__(self, user):
        self.user = user

    def run(self):
        send_welcome_email(self.user)

# Good — check before acting
@register_job
class WelcomeUserJob(Job):
    def __init__(self, user):
        self.user = user

    def run(self):
        if not self.user.welcome_email_sent:
            send_welcome_email(self.user)
            self.user.welcome_email_sent = True
            self.user.save()

Installation

Install the plain.jobs package from PyPI:

uv add plain.jobs

Add to your INSTALLED_PACKAGES:

# app/settings.py
INSTALLED_PACKAGES = [
    ...
    "plain.jobs",
]

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

plain_jobs-0.44.0.tar.gz (38.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

plain_jobs-0.44.0-py3-none-any.whl (47.1 kB view details)

Uploaded Python 3

File details

Details for the file plain_jobs-0.44.0.tar.gz.

File metadata

  • Download URL: plain_jobs-0.44.0.tar.gz
  • Upload date:
  • Size: 38.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.10.3 {"installer":{"name":"uv","version":"0.10.3","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for plain_jobs-0.44.0.tar.gz
Algorithm Hash digest
SHA256 50c5b57cdb912e8515134dd09e01ea14ca5257e7d9c1e847966a8e4e0a7fc717
MD5 505539c5f821e1ad39038fd5de89516c
BLAKE2b-256 8cab3cc1aa1e0be55b6e4fc3993acb83c213b01364b8c90299df4c162c7de6cc

See more details on using hashes here.

File details

Details for the file plain_jobs-0.44.0-py3-none-any.whl.

File metadata

  • Download URL: plain_jobs-0.44.0-py3-none-any.whl
  • Upload date:
  • Size: 47.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.10.3 {"installer":{"name":"uv","version":"0.10.3","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for plain_jobs-0.44.0-py3-none-any.whl
Algorithm Hash digest
SHA256 3d5b37a5b42121b4c0a76b1bf518ceeb714edbb7ecb84bccbb034c821e3d8ac3
MD5 ba6a4a386dd534e1b27135b6a0b1a509
BLAKE2b-256 7a7bff032d86f1c9a8fb6eeb93e7b8e59f76dae5d34c18a983dd088db22ca04c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page