Skip to main content

Process background jobs with a database-driven worker.

Project description

plain.worker

Process background jobs with a database-driven worker.

Overview

Jobs are defined using the Job base class and the run() method at a minimum.

from plain.worker import Job, register_job
from plain.email import send_mail


@register_job
class WelcomeUserJob(Job):
    def __init__(self, user):
        self.user = user

    def run(self):
        send_mail(
            subject="Welcome!",
            message=f"Hello from Plain, {self.user}",
            from_email="welcome@plainframework.com",
            recipient_list=[self.user.email],
        )

You can then create an instance of the job and call run_in_worker() to enqueue it for a background worker to pick up.

user = User.query.get(id=1)
WelcomeUserJob(user).run_in_worker()

Workers are run using the plain worker run command.

Jobs can be defined in any Python file, but it is suggested to use app/jobs.py or app/{pkg}/jobs.py as those will be imported automatically so the @register_job decorator will fire.

Run database migrations after installation:

plain migrate

Local development

In development, you will typically want to run the worker alongside your app. With plain.dev you can do this by adding it to the [tool.plain.dev.run] section of your pyproject.toml file. Currently, you will need to use something like watchfiles to add auto-reloading to the worker.

# pyproject.toml
[tool.plain.dev.run]
worker = {cmd = "watchfiles --filter python \"plain worker run --stats-every 0 --max-processes 2\" ."}
worker-slow = {cmd = "watchfiles --filter python \"plain worker run --queue slow --stats-every 0 --max-processes 2\" ."}

Job parameters

When calling run_in_worker(), you can specify several parameters to control job execution:

job.run_in_worker(
    queue="slow",  # Target a specific queue (default: "default")
    delay=60,  # Delay in seconds (or timedelta/datetime)
    priority=10,  # Higher numbers run first (default: 0, use negatives for lower priority)
    retries=3,  # Number of retry attempts (default: 0)
    unique_key="user-123-welcome",  # Prevent duplicate jobs
)

For more advanced parameter options, see Job.run_in_worker().

Job methods

The Job base class provides several methods you can override to customize behavior:

class MyJob(Job):
    def run(self):
        # Required: The main job logic
        pass

    def get_queue(self) -> str:
        # Specify the default queue for this job type
        return "default"

    def get_priority(self) -> int:
        # Set the default priority
        # Higher numbers run first: 10 > 5 > 0 > -5 > -10
        # Use positive numbers for high priority, negative for low priority
        return 0

    def get_retries(self) -> int:
        # Number of retry attempts on failure
        return 0

    def get_retry_delay(self, attempt: int) -> int:
        # Delay in seconds before retry (attempt starts at 1)
        return 0

    def get_unique_key(self) -> str:
        # Return a key to prevent duplicate jobs
        return ""

Scheduled jobs

You can schedule jobs to run at specific times using the Schedule class:

from plain.worker import Job, register_job
from plain.worker.scheduling import Schedule

@register_job
class DailyReportJob(Job):
    schedule = Schedule.from_cron("0 9 * * *")  # Every day at 9 AM

    def run(self):
        # Generate daily report
        pass

The Schedule class supports standard cron syntax and special strings:

  • @yearly or @annually - Run once a year
  • @monthly - Run once a month
  • @weekly - Run once a week
  • @daily or @midnight - Run once a day
  • @hourly - Run once an hour

For custom schedules, see Schedule.

Admin interface

The worker package includes admin views for monitoring jobs. The admin interface provides:

  • Job Requests: View pending jobs in the queue
  • Jobs: Monitor currently running jobs
  • Job Results: Review completed and failed job history

Dashboard cards show at-a-glance statistics for successful and errored jobs.

Job history

Job execution history is stored in the JobResult model. This includes:

  • Job class and parameters
  • Start and end times
  • Success/failure status
  • Error messages and tracebacks for failed jobs
  • Worker information

History retention can be configured in your settings:

# app/settings.py
WORKER_JOB_HISTORY_DAYS = 30

Monitoring

Workers report statistics and can be monitored using the --stats-every option:

# Report stats every 60 seconds
plain worker run --stats-every 60

The worker integrates with OpenTelemetry for distributed tracing. Spans are created for:

  • Job scheduling (run_in_worker)
  • Job execution
  • Job completion/failure

Jobs can be linked to the originating trace context, allowing you to track jobs initiated from web requests.

FAQs

How do I ensure a job only runs once?

Return a unique key from the get_unique_key() method:

class ProcessUserDataJob(Job):
    def __init__(self, user_id):
        self.user_id = user_id

    def get_unique_key(self):
        return f"process-user-{self.user_id}"

Can I run multiple workers?

Yes, you can run multiple worker processes:

plain worker run --max-processes 4

Or run workers for specific queues:

plain worker run --queue slow --max-processes 2

How do I handle job failures?

Set the number of retries and implement retry delays:

class MyJob(Job):
    def get_retries(self):
        return 3

    def get_retry_delay(self, attempt):
        # Exponential backoff: 1s, 2s, 4s
        return 2 ** (attempt - 1)

Installation

Install the plain.worker package from PyPI:

uv add plain.worker

Add to your INSTALLED_PACKAGES:

# app/settings.py
INSTALLED_PACKAGES = [
    ...
    "plain.worker",
]

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

plain_worker-0.32.0.tar.gz (27.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

plain_worker-0.32.0-py3-none-any.whl (33.1 kB view details)

Uploaded Python 3

File details

Details for the file plain_worker-0.32.0.tar.gz.

File metadata

  • Download URL: plain_worker-0.32.0.tar.gz
  • Upload date:
  • Size: 27.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.9.0

File hashes

Hashes for plain_worker-0.32.0.tar.gz
Algorithm Hash digest
SHA256 9ea46a068eba2bf62e1aa253bb21b31ddd87878ccd5f43c7e432d92afcfa3746
MD5 b21e52ed831927874946a49c59cc30ed
BLAKE2b-256 5b4ca1e23c669906bc38229e7b342fdea448380a0b89dc0dfec968bdb5a77e68

See more details on using hashes here.

File details

Details for the file plain_worker-0.32.0-py3-none-any.whl.

File metadata

File hashes

Hashes for plain_worker-0.32.0-py3-none-any.whl
Algorithm Hash digest
SHA256 8d61cf718e329f06525e487cdf9e6c6e0701ca97b96ccd9f63deee3162854b28
MD5 21369d2cedb64733b2b8e349d1ab9223
BLAKE2b-256 067d5fb4d90e832be7c3d458786b127b9080a340c3e1ba60f37b8cd589e92906

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page