Skip to main content

Process background jobs with a database-driven worker.

Project description

plain.worker

Process background jobs with a database-driven worker.

Overview

Jobs are defined using the Job base class and the run() method at a minimum.

from plain.worker import Job, register_job
from plain.email import send_mail


@register_job
class WelcomeUserJob(Job):
    def __init__(self, user):
        self.user = user

    def run(self):
        send_mail(
            subject="Welcome!",
            message=f"Hello from Plain, {self.user}",
            from_email="welcome@plainframework.com",
            recipient_list=[self.user.email],
        )

You can then create an instance of the job and call run_in_worker() to enqueue it for a background worker to pick up.

user = User.objects.get(id=1)
WelcomeUserJob(user).run_in_worker()

Workers are run using the plain worker run command.

Jobs can be defined in any Python file, but it is suggested to use app/jobs.py or app/{pkg}/jobs.py as those will be imported automatically so the @register_job decorator will fire.

Run database migrations after installation:

plain migrate

Local development

In development, you will typically want to run the worker alongside your app. With plain.dev you can do this by adding it to the [tool.plain.dev.run] section of your pyproject.toml file. Currently, you will need to use something like watchfiles to add auto-reloading to the worker.

# pyproject.toml
[tool.plain.dev.run]
worker = {cmd = "watchfiles --filter python \"plain worker run --stats-every 0 --max-processes 2\" ."}
worker-slow = {cmd = "watchfiles --filter python \"plain worker run --queue slow --stats-every 0 --max-processes 2\" ."}

Job parameters

When calling run_in_worker(), you can specify several parameters to control job execution:

job.run_in_worker(
    queue="slow",  # Target a specific queue (default: "default")
    delay=60,  # Delay in seconds (or timedelta/datetime)
    priority=10,  # Higher numbers run first (default: 0, use negatives for lower priority)
    retries=3,  # Number of retry attempts (default: 0)
    unique_key="user-123-welcome",  # Prevent duplicate jobs
)

For more advanced parameter options, see Job.run_in_worker().

Job methods

The Job base class provides several methods you can override to customize behavior:

class MyJob(Job):
    def run(self):
        # Required: The main job logic
        pass

    def get_queue(self) -> str:
        # Specify the default queue for this job type
        return "default"

    def get_priority(self) -> int:
        # Set the default priority
        # Higher numbers run first: 10 > 5 > 0 > -5 > -10
        # Use positive numbers for high priority, negative for low priority
        return 0

    def get_retries(self) -> int:
        # Number of retry attempts on failure
        return 0

    def get_retry_delay(self, attempt: int) -> int:
        # Delay in seconds before retry (attempt starts at 1)
        return 0

    def get_unique_key(self) -> str:
        # Return a key to prevent duplicate jobs
        return ""

Scheduled jobs

You can schedule jobs to run at specific times using the Schedule class:

from plain.worker import Job, register_job
from plain.worker.scheduling import Schedule

@register_job
class DailyReportJob(Job):
    schedule = Schedule.from_cron("0 9 * * *")  # Every day at 9 AM

    def run(self):
        # Generate daily report
        pass

The Schedule class supports standard cron syntax and special strings:

  • @yearly or @annually - Run once a year
  • @monthly - Run once a month
  • @weekly - Run once a week
  • @daily or @midnight - Run once a day
  • @hourly - Run once an hour

For custom schedules, see Schedule.

Admin interface

The worker package includes admin views for monitoring jobs. The admin interface provides:

  • Job Requests: View pending jobs in the queue
  • Jobs: Monitor currently running jobs
  • Job Results: Review completed and failed job history

Dashboard cards show at-a-glance statistics for successful and errored jobs.

Job history

Job execution history is stored in the JobResult model. This includes:

  • Job class and parameters
  • Start and end times
  • Success/failure status
  • Error messages and tracebacks for failed jobs
  • Worker information

History retention can be configured in your settings:

# app/settings.py
WORKER_JOB_HISTORY_DAYS = 30

Monitoring

Workers report statistics and can be monitored using the --stats-every option:

# Report stats every 60 seconds
plain worker run --stats-every 60

The worker integrates with OpenTelemetry for distributed tracing. Spans are created for:

  • Job scheduling (run_in_worker)
  • Job execution
  • Job completion/failure

Jobs can be linked to the originating trace context, allowing you to track jobs initiated from web requests.

FAQs

How do I ensure a job only runs once?

Return a unique key from the get_unique_key() method:

class ProcessUserDataJob(Job):
    def __init__(self, user_id):
        self.user_id = user_id

    def get_unique_key(self):
        return f"process-user-{self.user_id}"

Can I run multiple workers?

Yes, you can run multiple worker processes:

plain worker run --max-processes 4

Or run workers for specific queues:

plain worker run --queue slow --max-processes 2

How do I handle job failures?

Set the number of retries and implement retry delays:

class MyJob(Job):
    def get_retries(self):
        return 3

    def get_retry_delay(self, attempt):
        # Exponential backoff: 1s, 2s, 4s
        return 2 ** (attempt - 1)

Installation

Install the plain.worker package from PyPI:

uv add plain.worker

Add to your INSTALLED_PACKAGES:

# app/settings.py
INSTALLED_PACKAGES = [
    ...
    "plain.worker",
]

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

plain_worker-0.28.1.tar.gz (25.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

plain_worker-0.28.1-py3-none-any.whl (30.6 kB view details)

Uploaded Python 3

File details

Details for the file plain_worker-0.28.1.tar.gz.

File metadata

  • Download URL: plain_worker-0.28.1.tar.gz
  • Upload date:
  • Size: 25.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.8.15

File hashes

Hashes for plain_worker-0.28.1.tar.gz
Algorithm Hash digest
SHA256 08056edc36aee1666154cc1f4d20d8a1a54fe99977a05d2ec835d0c69d30d1de
MD5 9e8cfa2901752940669d51c2cc75c66a
BLAKE2b-256 67af314b5c2567d9bc04791bf25f596207c9b72c9c8fd3933c354cc66116e724

See more details on using hashes here.

File details

Details for the file plain_worker-0.28.1-py3-none-any.whl.

File metadata

File hashes

Hashes for plain_worker-0.28.1-py3-none-any.whl
Algorithm Hash digest
SHA256 42086ad806008288f95491543e2a4081c03a632535810246d182770554f3c2bc
MD5 a39761bdb36b615a1aabd2070533a496
BLAKE2b-256 9f18fff5307777908f3c261bc597b46e2367f8012f5170c85270d74ce1093b4c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page