A lightweight task scheduler
Project description
Queueing tasks in Python doesn’t have to be complicated.
Overview
simplepyq is a lightweight task queuing library designed for small Python projects that require background task execution without the complexity of heavy tools like Celery, Airflow, or Redis. It uses SQLite for task persistence, ensuring tasks survive application restarts, and provides features like task channels, automatic retries, and dynamic task deferral. With minimal dependencies, simplepyq is easy to set up and ideal for applications needing simple, reliable task queuing.
Features
Channels: Organize tasks by associating them with specific functions, enabling grouped task processing.
Persistence: Store tasks in a SQLite database to ensure they are not lost during application restarts or crashes.
Retries: Automatically retry failed tasks a specified number of times, improving resilience for transient errors.
DelayException: Dynamically defer tasks for a specified duration, allowing flexible scheduling based on runtime conditions.
Simple Setup: Minimal dependencies and straightforward API, requiring only Python and msgpack.
Task Management: Tools to clear failed tasks, requeue them, or remove entire channels, providing control over task lifecycle.
Concepts
Channels
Channels in simplepyq allow you to group tasks by their purpose or associated function. Each channel is linked to a specific Python function that processes tasks, and you can configure the number of worker threads for each channel. This is useful for separating different types of tasks, such as “email” for sending emails and “image_processing” for handling image uploads, ensuring organized and parallel task execution.
Persistence
Tasks are stored in a SQLite database, which provides lightweight persistence without requiring external systems. Each task is saved with its channel, arguments, status (pending, running, delayed, done, or failed), retries, and optional delay timestamp. This ensures tasks are not lost if the application restarts, making simplepyq reliable for long-running operations.
Retries
When a task raises an exception, simplepyq can automatically retry it based on a specified retry count. This is particularly useful for handling transient failures. If retries are exhausted, the task is marked as “failed” for later inspection or requeuing.
DelayException
The DelayException allows tasks to be deferred dynamically by raising an exception with a specified delay in seconds. This is useful for scenarios like rate-limited APIs, where a task needs to wait before retrying, or for scheduling tasks to run at a later time. The task is marked as “delayed” and automatically requeued when the delay period expires.
Task Management
simplepyq provides methods to manage tasks effectively: - clear_failed: Removes failed tasks from the database. - requeue_failed: Requeues failed tasks with their original or a new retry count. - remove_channel: Deletes a channel and all its tasks. - stop and run_until_complete: Control the scheduler’s execution, either running tasks in the background or until all tasks are complete.
Installation
Install simplepyq via pip:
pip install simplepyq
Usage Examples
Below are examples demonstrating each feature of simplepyq, designed to highlight its capabilities in real-world scenarios.
1. Basic Task Queuing with Channels
Organize tasks into a channel for web scraping, processing URLs in the background.
from simplepyq import SimplePyQ
def scrape_url(args):
url = args["url"]
print(f"Scraping {url}")
scheduler = SimplePyQ("tasks.db")
scheduler.add_channel("scrape", scrape_url, max_workers=2) # Two workers for parallel scraping
scheduler.enqueue("scrape", {"url": "https://example.com"})
scheduler.enqueue("scrape", {"url": "https://example.org"})
scheduler.start() # Runs in the background
# Tasks are processed concurrently by two worker threads
Explanation: The scrape channel is created with a function to process URLs, and two workers allow parallel execution. Tasks are enqueued with arguments and processed asynchronously.
2. Task Retries for Resilience
Handle transient failures, such as network issues, with automatic retries.
from simplepyq import SimplePyQ
import requests
def fetch_data(args):
url = args["url"]
response = requests.get(url)
if response.status_code != 200:
raise Exception("Failed to fetch data")
print(f"Fetched data from {url}")
scheduler = SimplePyQ("tasks.db")
scheduler.add_channel("fetch", fetch_data)
scheduler.enqueue("fetch", {"url": "https://api.example.com/data"}, retries=3) # Retry up to 3 times
scheduler.run_until_complete() # Runs until all tasks are complete
Explanation: If the API call fails, the task is retried up to three times before being marked as failed, ensuring resilience against temporary issues.
3. Dynamic Task Deferral with DelayException
Defer tasks dynamically, useful for rate-limited APIs.
from simplepyq import SimplePyQ, DelayException
def call_api(args):
url = args["url"]
response = requests.get(url)
if response.status_code == 403:
raise DelayException(60) # Wait 60 seconds before retrying
print(f"Calling {url}")
scheduler = SimplePyQ("tasks.db")
scheduler.add_channel("api", call_api)
scheduler.enqueue("api", {"url": "https://api.example.com/rate_limit"})
scheduler.start() # Task will be deferred for 60 seconds if rate-limited
Explanation: The DelayException defers the task for 60 seconds, allowing compliance with rate limits or scheduling retries at a later time.
4. Clearing Failed Tasks
Remove failed tasks to clean up the database.
from simplepyq import SimplePyQ
def risky_task(args):
raise Exception("Task failed intentionally")
scheduler = SimplePyQ("tasks.db")
scheduler.add_channel("risky", risky_task)
scheduler.enqueue("risky", {"data": "test"}, retries=1)
scheduler.run_until_complete() # Task fails after one retry
scheduler.clear_failed("risky") # Remove failed tasks for the 'risky' channel
Explanation: After the task fails and retries are exhausted, clear_failed removes it from the database, keeping it clean.
5. Requeuing Failed Tasks
Requeue failed tasks for another attempt.
from simplepyq import SimplePyQ
attempts = 0
def flaky_task(args):
global attempts
if attempts < 2: # Fail on first attempt
attempts += 1
raise Exception("Temporary failure")
print("Task succeeded")
scheduler = SimplePyQ("tasks.db")
scheduler.add_channel("flaky", flaky_task)
scheduler.enqueue("flaky", {}, retries=0)
scheduler.run_until_complete() # Task fails
scheduler.requeue_failed("flaky", retries=1) # Requeue with one retry
scheduler.run_until_complete() # Task succeeds on second attempt
Explanation: Failed tasks are requeued with a new retry count, allowing recovery from temporary issues without manual intervention.
6. Removing a Channel
Delete a channel and its tasks when no longer needed.
from simplepyq import SimplePyQ
def temp_task(args):
print(f"Processing {args['data']}")
scheduler = SimplePyQ("tasks.db")
scheduler.add_channel("temp", temp_task)
scheduler.enqueue("temp", {"data": "test"})
scheduler.run_until_complete()
scheduler.remove_channel("temp") # Removes channel and all its tasks
Explanation: The temp channel and its tasks are removed, useful for cleanup when a task type is no longer needed.
7. Running Until Completion
Process all tasks synchronously until complete.
from simplepyq import SimplePyQ
def process_data(args):
print(f"Processing {args['data']}")
scheduler = SimplePyQ("tasks.db")
scheduler.add_channel("data", process_data)
scheduler.enqueue("data", {"data": "item1"})
scheduler.enqueue("data", {"data": "item2"})
scheduler.run_until_complete() # Blocks until all tasks are done
Explanation: run_until_complete processes all tasks and stops the scheduler, ideal for scripts or batch processing.
Testing
To run the included unit tests:
python -m unittest discover -s tests
This executes all tests in the tests/ directory, covering task execution, retries, delays, and task management.
License
Licensed under the Apache License 2.0. See the LICENSE file for details.
SimplePyQ is a project powered by The California Open Source Company.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file simplepyq-0.1.3.tar.gz.
File metadata
- Download URL: simplepyq-0.1.3.tar.gz
- Upload date:
- Size: 11.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
41df2f8c62bae4ead6657a73ea95446d45288cf5e8ee56f905dbde1e02462b76
|
|
| MD5 |
48aac2c10d625f0e4308ac1330262462
|
|
| BLAKE2b-256 |
0269b734ff747a59be68d6dad5e64735d277015e2b2abd4aad09b2e4b5547036
|
Provenance
The following attestation bundles were made for simplepyq-0.1.3.tar.gz:
Publisher:
python-publish.yml on kdewald/simplepyq
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
simplepyq-0.1.3.tar.gz -
Subject digest:
41df2f8c62bae4ead6657a73ea95446d45288cf5e8ee56f905dbde1e02462b76 - Sigstore transparency entry: 236469462
- Sigstore integration time:
-
Permalink:
kdewald/simplepyq@9530271f39eb7acebb2bbc94c0ea100c48e0770d -
Branch / Tag:
refs/tags/v0.1.3 - Owner: https://github.com/kdewald
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
python-publish.yml@9530271f39eb7acebb2bbc94c0ea100c48e0770d -
Trigger Event:
release
-
Statement type:
File details
Details for the file simplepyq-0.1.3-py3-none-any.whl.
File metadata
- Download URL: simplepyq-0.1.3-py3-none-any.whl
- Upload date:
- Size: 12.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4c9cfd8ec23b8c6517f8f5ff81c3fcf04c74e98659f03173bdd426adb89377fc
|
|
| MD5 |
b0c5e2e23dd5049d050582caacaad289
|
|
| BLAKE2b-256 |
17883ef21e3c0a07cf875f51d3293222cfbeca55c3a307da10d5f3aa5f0bb84e
|
Provenance
The following attestation bundles were made for simplepyq-0.1.3-py3-none-any.whl:
Publisher:
python-publish.yml on kdewald/simplepyq
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
simplepyq-0.1.3-py3-none-any.whl -
Subject digest:
4c9cfd8ec23b8c6517f8f5ff81c3fcf04c74e98659f03173bdd426adb89377fc - Sigstore transparency entry: 236469471
- Sigstore integration time:
-
Permalink:
kdewald/simplepyq@9530271f39eb7acebb2bbc94c0ea100c48e0770d -
Branch / Tag:
refs/tags/v0.1.3 - Owner: https://github.com/kdewald
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
python-publish.yml@9530271f39eb7acebb2bbc94c0ea100c48e0770d -
Trigger Event:
release
-
Statement type: