Process background jobs with a database-driven worker.
Project description
plain.worker
Process background jobs with a database-driven worker.
- Overview
- Local development
- Job parameters
- Job methods
- Scheduled jobs
- Admin interface
- Job history
- Monitoring
- FAQs
- Installation
Overview
Jobs are defined using the Job base class and the run() method at a minimum.
from plain.worker import Job, register_job
from plain.email import send_mail
@register_job
class WelcomeUserJob(Job):
def __init__(self, user):
self.user = user
def run(self):
send_mail(
subject="Welcome!",
message=f"Hello from Plain, {self.user}",
from_email="welcome@plainframework.com",
recipient_list=[self.user.email],
)
You can then create an instance of the job and call run_in_worker() to enqueue it for a background worker to pick up.
user = User.objects.get(id=1)
WelcomeUserJob(user).run_in_worker()
Workers are run using the plain worker run command.
Jobs can be defined in any Python file, but it is suggested to use app/jobs.py or app/{pkg}/jobs.py as those will be imported automatically so the @register_job decorator will fire.
Run database migrations after installation:
plain migrate
Local development
In development, you will typically want to run the worker alongside your app. With plain.dev you can do this by adding it to the [tool.plain.dev.run] section of your pyproject.toml file. Currently, you will need to use something like watchfiles to add auto-reloading to the worker.
# pyproject.toml
[tool.plain.dev.run]
worker = {cmd = "watchfiles --filter python \"plain worker run --stats-every 0 --max-processes 2\" ."}
worker-slow = {cmd = "watchfiles --filter python \"plain worker run --queue slow --stats-every 0 --max-processes 2\" ."}
Job parameters
When calling run_in_worker(), you can specify several parameters to control job execution:
job.run_in_worker(
queue="slow", # Target a specific queue (default: "default")
delay=60, # Delay in seconds (or timedelta/datetime)
priority=10, # Higher priority jobs run first (default: 0)
retries=3, # Number of retry attempts (default: 0)
unique_key="user-123-welcome", # Prevent duplicate jobs
)
For more advanced parameter options, see Job.run_in_worker().
Job methods
The Job base class provides several methods you can override to customize behavior:
class MyJob(Job):
def run(self):
# Required: The main job logic
pass
def get_queue(self) -> str:
# Specify the default queue for this job type
return "default"
def get_priority(self) -> int:
# Set the default priority (higher runs first)
return 0
def get_retries(self) -> int:
# Number of retry attempts on failure
return 0
def get_retry_delay(self, attempt: int) -> int:
# Delay in seconds before retry (attempt starts at 1)
return 0
def get_unique_key(self) -> str:
# Return a key to prevent duplicate jobs
return ""
Scheduled jobs
You can schedule jobs to run at specific times using the Schedule class:
from plain.worker import Job, register_job
from plain.worker.scheduling import Schedule
@register_job
class DailyReportJob(Job):
schedule = Schedule.from_cron("0 9 * * *") # Every day at 9 AM
def run(self):
# Generate daily report
pass
The Schedule class supports standard cron syntax and special strings:
@yearlyor@annually- Run once a year@monthly- Run once a month@weekly- Run once a week@dailyor@midnight- Run once a day@hourly- Run once an hour
For custom schedules, see Schedule.
Admin interface
The worker package includes admin views for monitoring jobs. The admin interface provides:
- Job Requests: View pending jobs in the queue
- Jobs: Monitor currently running jobs
- Job Results: Review completed and failed job history
Dashboard cards show at-a-glance statistics for successful and errored jobs.
Job history
Job execution history is stored in the JobResult model. This includes:
- Job class and parameters
- Start and end times
- Success/failure status
- Error messages and tracebacks for failed jobs
- Worker information
History retention can be configured in your settings:
# app/settings.py
WORKER_JOB_HISTORY_DAYS = 30
Monitoring
Workers report statistics and can be monitored using the --stats-every option:
# Report stats every 60 seconds
plain worker run --stats-every 60
The worker integrates with OpenTelemetry for distributed tracing. Spans are created for:
- Job scheduling (
run_in_worker) - Job execution
- Job completion/failure
Jobs can be linked to the originating trace context, allowing you to track jobs initiated from web requests.
FAQs
How do I ensure a job only runs once?
Return a unique key from the get_unique_key() method:
class ProcessUserDataJob(Job):
def __init__(self, user_id):
self.user_id = user_id
def get_unique_key(self):
return f"process-user-{self.user_id}"
Can I run multiple workers?
Yes, you can run multiple worker processes:
plain worker run --max-processes 4
Or run workers for specific queues:
plain worker run --queue slow --max-processes 2
How do I handle job failures?
Set the number of retries and implement retry delays:
class MyJob(Job):
def get_retries(self):
return 3
def get_retry_delay(self, attempt):
# Exponential backoff: 1s, 2s, 4s
return 2 ** (attempt - 1)
Installation
Install the plain.worker package from PyPI:
uv add plain.worker
Add to your INSTALLED_PACKAGES:
# app/settings.py
INSTALLED_PACKAGES = [
...
"plain.worker",
]
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file plain_worker-0.26.0.tar.gz.
File metadata
- Download URL: plain_worker-0.26.0.tar.gz
- Upload date:
- Size: 23.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.8.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
79f35863f1937cc5d6dd59943d4b290a089e114d35234cfdcaa2527b0952686f
|
|
| MD5 |
f6f81fa1011e7a625dd18b4f8c37f974
|
|
| BLAKE2b-256 |
0c68ff5ba32dfd4f1087b616433898a20ee3269798edcf0d132234c3a2bf8563
|
File details
Details for the file plain_worker-0.26.0-py3-none-any.whl.
File metadata
- Download URL: plain_worker-0.26.0-py3-none-any.whl
- Upload date:
- Size: 29.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.8.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
1dff88effa3d2f420594a336a31bb18ed18cbccfa96dfa54c80a204522d4c89a
|
|
| MD5 |
8cb440828906c71180ebde190c9acdae
|
|
| BLAKE2b-256 |
5e00871b3d359cf9d11d8f38d6ee331df036c2cb2bd555b84c4baf7f0aad15fa
|