Skip to main content

Process background jobs with a database-driven worker.

Project description

plain.worker

Process background jobs with a database-driven worker.

Overview

Jobs are defined using the Job base class and the run() method at a minimum.

from plain.worker import Job, register_job
from plain.email import send_mail


@register_job
class WelcomeUserJob(Job):
    def __init__(self, user):
        self.user = user

    def run(self):
        send_mail(
            subject="Welcome!",
            message=f"Hello from Plain, {self.user}",
            from_email="welcome@plainframework.com",
            recipient_list=[self.user.email],
        )

You can then create an instance of the job and call run_in_worker() to enqueue it for a background worker to pick up.

user = User.objects.get(id=1)
WelcomeUserJob(user).run_in_worker()

Workers are run using the plain worker run command.

Jobs can be defined in any Python file, but it is suggested to use app/jobs.py or app/{pkg}/jobs.py as those will be imported automatically so the @register_job decorator will fire.

Run database migrations after installation:

plain migrate

Local development

In development, you will typically want to run the worker alongside your app. With plain.dev you can do this by adding it to the [tool.plain.dev.run] section of your pyproject.toml file. Currently, you will need to use something like watchfiles to add auto-reloading to the worker.

# pyproject.toml
[tool.plain.dev.run]
worker = {cmd = "watchfiles --filter python \"plain worker run --stats-every 0 --max-processes 2\" ."}
worker-slow = {cmd = "watchfiles --filter python \"plain worker run --queue slow --stats-every 0 --max-processes 2\" ."}

Job parameters

When calling run_in_worker(), you can specify several parameters to control job execution:

job.run_in_worker(
    queue="slow",  # Target a specific queue (default: "default")
    delay=60,  # Delay in seconds (or timedelta/datetime)
    priority=10,  # Higher numbers run first (default: 0, use negatives for lower priority)
    retries=3,  # Number of retry attempts (default: 0)
    unique_key="user-123-welcome",  # Prevent duplicate jobs
)

For more advanced parameter options, see Job.run_in_worker().

Job methods

The Job base class provides several methods you can override to customize behavior:

class MyJob(Job):
    def run(self):
        # Required: The main job logic
        pass

    def get_queue(self) -> str:
        # Specify the default queue for this job type
        return "default"

    def get_priority(self) -> int:
        # Set the default priority
        # Higher numbers run first: 10 > 5 > 0 > -5 > -10
        # Use positive numbers for high priority, negative for low priority
        return 0

    def get_retries(self) -> int:
        # Number of retry attempts on failure
        return 0

    def get_retry_delay(self, attempt: int) -> int:
        # Delay in seconds before retry (attempt starts at 1)
        return 0

    def get_unique_key(self) -> str:
        # Return a key to prevent duplicate jobs
        return ""

Scheduled jobs

You can schedule jobs to run at specific times using the Schedule class:

from plain.worker import Job, register_job
from plain.worker.scheduling import Schedule

@register_job
class DailyReportJob(Job):
    schedule = Schedule.from_cron("0 9 * * *")  # Every day at 9 AM

    def run(self):
        # Generate daily report
        pass

The Schedule class supports standard cron syntax and special strings:

  • @yearly or @annually - Run once a year
  • @monthly - Run once a month
  • @weekly - Run once a week
  • @daily or @midnight - Run once a day
  • @hourly - Run once an hour

For custom schedules, see Schedule.

Admin interface

The worker package includes admin views for monitoring jobs. The admin interface provides:

  • Job Requests: View pending jobs in the queue
  • Jobs: Monitor currently running jobs
  • Job Results: Review completed and failed job history

Dashboard cards show at-a-glance statistics for successful and errored jobs.

Job history

Job execution history is stored in the JobResult model. This includes:

  • Job class and parameters
  • Start and end times
  • Success/failure status
  • Error messages and tracebacks for failed jobs
  • Worker information

History retention can be configured in your settings:

# app/settings.py
WORKER_JOB_HISTORY_DAYS = 30

Monitoring

Workers report statistics and can be monitored using the --stats-every option:

# Report stats every 60 seconds
plain worker run --stats-every 60

The worker integrates with OpenTelemetry for distributed tracing. Spans are created for:

  • Job scheduling (run_in_worker)
  • Job execution
  • Job completion/failure

Jobs can be linked to the originating trace context, allowing you to track jobs initiated from web requests.

FAQs

How do I ensure a job only runs once?

Return a unique key from the get_unique_key() method:

class ProcessUserDataJob(Job):
    def __init__(self, user_id):
        self.user_id = user_id

    def get_unique_key(self):
        return f"process-user-{self.user_id}"

Can I run multiple workers?

Yes, you can run multiple worker processes:

plain worker run --max-processes 4

Or run workers for specific queues:

plain worker run --queue slow --max-processes 2

How do I handle job failures?

Set the number of retries and implement retry delays:

class MyJob(Job):
    def get_retries(self):
        return 3

    def get_retry_delay(self, attempt):
        # Exponential backoff: 1s, 2s, 4s
        return 2 ** (attempt - 1)

Installation

Install the plain.worker package from PyPI:

uv add plain.worker

Add to your INSTALLED_PACKAGES:

# app/settings.py
INSTALLED_PACKAGES = [
    ...
    "plain.worker",
]

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

plain_worker-0.27.1.tar.gz (25.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

plain_worker-0.27.1-py3-none-any.whl (30.5 kB view details)

Uploaded Python 3

File details

Details for the file plain_worker-0.27.1.tar.gz.

File metadata

  • Download URL: plain_worker-0.27.1.tar.gz
  • Upload date:
  • Size: 25.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.8.13

File hashes

Hashes for plain_worker-0.27.1.tar.gz
Algorithm Hash digest
SHA256 65949c698f3eb841d61761624cea7b43d66d649f0341a0c1fb26509348033e45
MD5 56b6273f111a5d49dcaf63b66ef2114e
BLAKE2b-256 b3ae51d6a5a01050eb983ea79aa874b73b2e54e48c5a30cc8a4b599623b350b8

See more details on using hashes here.

File details

Details for the file plain_worker-0.27.1-py3-none-any.whl.

File metadata

File hashes

Hashes for plain_worker-0.27.1-py3-none-any.whl
Algorithm Hash digest
SHA256 996f727050502bcd7ba997e0fa4a0bc3674d4792eac122630c82193662064c0f
MD5 c5d03472938c4f8d637d6f0e50e8e3ca
BLAKE2b-256 56eecff1880dd1a844962c3e2c8bea1d2f5e042e4f0ebdd5f9b4e7c44313f9f9

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page