Skip to main content

Process background jobs with a database-driven worker.

Project description

plain.worker

Process background jobs with a database-driven worker.

Overview

Jobs are defined using the Job base class and the run() method at a minimum.

from plain.worker import Job, register_job
from plain.email import send_mail


@register_job
class WelcomeUserJob(Job):
    def __init__(self, user):
        self.user = user

    def run(self):
        send_mail(
            subject="Welcome!",
            message=f"Hello from Plain, {self.user}",
            from_email="welcome@plainframework.com",
            recipient_list=[self.user.email],
        )

You can then create an instance of the job and call run_in_worker() to enqueue it for a background worker to pick up.

user = User.query.get(id=1)
WelcomeUserJob(user).run_in_worker()

Workers are run using the plain worker run command.

Jobs can be defined in any Python file, but it is suggested to use app/jobs.py or app/{pkg}/jobs.py as those will be imported automatically so the @register_job decorator will fire.

Run database migrations after installation:

plain migrate

Local development

In development, you will typically want to run the worker alongside your app. With plain.dev you can do this by adding it to the [tool.plain.dev.run] section of your pyproject.toml file. Currently, you will need to use something like watchfiles to add auto-reloading to the worker.

# pyproject.toml
[tool.plain.dev.run]
worker = {cmd = "watchfiles --filter python \"plain worker run --stats-every 0 --max-processes 2\" ."}
worker-slow = {cmd = "watchfiles --filter python \"plain worker run --queue slow --stats-every 0 --max-processes 2\" ."}

Job parameters

When calling run_in_worker(), you can specify several parameters to control job execution:

job.run_in_worker(
    queue="slow",  # Target a specific queue (default: "default")
    delay=60,  # Delay in seconds (or timedelta/datetime)
    priority=10,  # Higher numbers run first (default: 0, use negatives for lower priority)
    retries=3,  # Number of retry attempts (default: 0)
    unique_key="user-123-welcome",  # Prevent duplicate jobs
)

For more advanced parameter options, see Job.run_in_worker().

Job methods

The Job base class provides several methods you can override to customize behavior:

class MyJob(Job):
    def run(self):
        # Required: The main job logic
        pass

    def get_queue(self) -> str:
        # Specify the default queue for this job type
        return "default"

    def get_priority(self) -> int:
        # Set the default priority
        # Higher numbers run first: 10 > 5 > 0 > -5 > -10
        # Use positive numbers for high priority, negative for low priority
        return 0

    def get_retries(self) -> int:
        # Number of retry attempts on failure
        return 0

    def get_retry_delay(self, attempt: int) -> int:
        # Delay in seconds before retry (attempt starts at 1)
        return 0

    def get_unique_key(self) -> str:
        # Return a key to prevent duplicate jobs
        return ""

Scheduled jobs

You can schedule jobs to run at specific times using the Schedule class:

from plain.worker import Job, register_job
from plain.worker.scheduling import Schedule

@register_job
class DailyReportJob(Job):
    schedule = Schedule.from_cron("0 9 * * *")  # Every day at 9 AM

    def run(self):
        # Generate daily report
        pass

The Schedule class supports standard cron syntax and special strings:

  • @yearly or @annually - Run once a year
  • @monthly - Run once a month
  • @weekly - Run once a week
  • @daily or @midnight - Run once a day
  • @hourly - Run once an hour

For custom schedules, see Schedule.

Admin interface

The worker package includes admin views for monitoring jobs. The admin interface provides:

  • Job Requests: View pending jobs in the queue
  • Jobs: Monitor currently running jobs
  • Job Results: Review completed and failed job history

Dashboard cards show at-a-glance statistics for successful and errored jobs.

Job history

Job execution history is stored in the JobResult model. This includes:

  • Job class and parameters
  • Start and end times
  • Success/failure status
  • Error messages and tracebacks for failed jobs
  • Worker information

History retention can be configured in your settings:

# app/settings.py
WORKER_JOB_HISTORY_DAYS = 30

Monitoring

Workers report statistics and can be monitored using the --stats-every option:

# Report stats every 60 seconds
plain worker run --stats-every 60

The worker integrates with OpenTelemetry for distributed tracing. Spans are created for:

  • Job scheduling (run_in_worker)
  • Job execution
  • Job completion/failure

Jobs can be linked to the originating trace context, allowing you to track jobs initiated from web requests.

FAQs

How do I ensure a job only runs once?

Return a unique key from the get_unique_key() method:

class ProcessUserDataJob(Job):
    def __init__(self, user_id):
        self.user_id = user_id

    def get_unique_key(self):
        return f"process-user-{self.user_id}"

Can I run multiple workers?

Yes, you can run multiple worker processes:

plain worker run --max-processes 4

Or run workers for specific queues:

plain worker run --queue slow --max-processes 2

How do I handle job failures?

Set the number of retries and implement retry delays:

class MyJob(Job):
    def get_retries(self):
        return 3

    def get_retry_delay(self, attempt):
        # Exponential backoff: 1s, 2s, 4s
        return 2 ** (attempt - 1)

Installation

Install the plain.worker package from PyPI:

uv add plain.worker

Add to your INSTALLED_PACKAGES:

# app/settings.py
INSTALLED_PACKAGES = [
    ...
    "plain.worker",
]

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

plain_worker-0.31.0.tar.gz (26.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

plain_worker-0.31.0-py3-none-any.whl (31.9 kB view details)

Uploaded Python 3

File details

Details for the file plain_worker-0.31.0.tar.gz.

File metadata

  • Download URL: plain_worker-0.31.0.tar.gz
  • Upload date:
  • Size: 26.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.8.22

File hashes

Hashes for plain_worker-0.31.0.tar.gz
Algorithm Hash digest
SHA256 30e8a5513e6f38df40b72b55d6816124e41e1344fa86348499827c62e5912418
MD5 eb5e8634e9ec5553486d822c440ba44e
BLAKE2b-256 c717fe1edf6ba81a68f06b62101c4ca18ac5bd74d4351e5f42006e05014bda7f

See more details on using hashes here.

File details

Details for the file plain_worker-0.31.0-py3-none-any.whl.

File metadata

File hashes

Hashes for plain_worker-0.31.0-py3-none-any.whl
Algorithm Hash digest
SHA256 94f4041afc4fcf954df6b0062be511894e5f489193e8897fe5f4641ee322bedd
MD5 d0dd85f201537be98dbef2c46fc1e335
BLAKE2b-256 73fa7bb522f100869a654fd783767ef28b2b5a320ff366882a805ac7956efb09

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page