Skip to main content

A lightweight task scheduler

Project description

Queueing tasks in Python doesn’t have to be complicated.

Overview

simplepyq is a lightweight task queuing library designed for small Python projects that require background task execution without the complexity of heavy tools like Celery, Airflow, or Redis. It uses SQLite for task persistence, ensuring tasks survive application restarts, and provides features like task channels, automatic retries, and dynamic task deferral. With minimal dependencies, simplepyq is easy to set up and ideal for applications needing simple, reliable task queuing.

Features

  • Channels: Organize tasks by associating them with specific functions, enabling grouped task processing.

  • Persistence: Store tasks in a SQLite database to ensure they are not lost during application restarts or crashes.

  • Retries: Automatically retry failed tasks a specified number of times, improving resilience for transient errors.

  • DelayException: Dynamically defer tasks for a specified duration, allowing flexible scheduling based on runtime conditions.

  • Simple Setup: Minimal dependencies and straightforward API, requiring only Python and msgpack.

  • Task Management: Tools to clear failed tasks, requeue them, or remove entire channels, providing control over task lifecycle.

Concepts

Channels

Channels in simplepyq allow you to group tasks by their purpose or associated function. Each channel is linked to a specific Python function that processes tasks, and you can configure the number of worker threads for each channel. This is useful for separating different types of tasks, such as “email” for sending emails and “image_processing” for handling image uploads, ensuring organized and parallel task execution.

Persistence

Tasks are stored in a SQLite database, which provides lightweight persistence without requiring external systems. Each task is saved with its channel, arguments, status (pending, running, delayed, done, or failed), retries, and optional delay timestamp. This ensures tasks are not lost if the application restarts, making simplepyq reliable for long-running operations.

Retries

When a task raises an exception, simplepyq can automatically retry it based on a specified retry count. This is particularly useful for handling transient failures. If retries are exhausted, the task is marked as “failed” for later inspection or requeuing.

DelayException

The DelayException allows tasks to be deferred dynamically by raising an exception with a specified delay in seconds. This is useful for scenarios like rate-limited APIs, where a task needs to wait before retrying, or for scheduling tasks to run at a later time. The task is marked as “delayed” and automatically requeued when the delay period expires.

Task Management

simplepyq provides methods to manage tasks effectively: - clear_failed: Removes failed tasks from the database. - requeue_failed: Requeues failed tasks with their original or a new retry count. - remove_channel: Deletes a channel and all its tasks. - stop and run_until_complete: Control the scheduler’s execution, either running tasks in the background or until all tasks are complete.

Installation

Install simplepyq via pip:

pip install simplepyq

Usage Examples

Below are examples demonstrating each feature of simplepyq, designed to highlight its capabilities in real-world scenarios.

1. Basic Task Queuing with Channels

Organize tasks into a channel for web scraping, processing URLs in the background.

from simplepyq import SimplePyQ

def scrape_url(args):
    url = args["url"]
    print(f"Scraping {url}")

scheduler = SimplePyQ("tasks.db")
scheduler.add_channel("scrape", scrape_url, max_workers=2)  # Two workers for parallel scraping
scheduler.enqueue("scrape", {"url": "https://example.com"})
scheduler.enqueue("scrape", {"url": "https://example.org"})
scheduler.start()  # Runs in the background
# Tasks are processed concurrently by two worker threads

Explanation: The scrape channel is created with a function to process URLs, and two workers allow parallel execution. Tasks are enqueued with arguments and processed asynchronously.

2. Task Retries for Resilience

Handle transient failures, such as network issues, with automatic retries.

from simplepyq import SimplePyQ
import requests

def fetch_data(args):
    url = args["url"]
    response = requests.get(url)
    if response.status_code != 200:
        raise Exception("Failed to fetch data")
    print(f"Fetched data from {url}")

scheduler = SimplePyQ("tasks.db")
scheduler.add_channel("fetch", fetch_data)
scheduler.enqueue("fetch", {"url": "https://api.example.com/data"}, retries=3)  # Retry up to 3 times
scheduler.run_until_complete()  # Runs until all tasks are complete

Explanation: If the API call fails, the task is retried up to three times before being marked as failed, ensuring resilience against temporary issues.

3. Dynamic Task Deferral with DelayException

Defer tasks dynamically, useful for rate-limited APIs.

from simplepyq import SimplePyQ, DelayException

def call_api(args):
    url = args["url"]
    response = requests.get(url)
    if response.status_code == 403:
        raise DelayException(60)  # Wait 60 seconds before retrying
    print(f"Calling {url}")

scheduler = SimplePyQ("tasks.db")
scheduler.add_channel("api", call_api)
scheduler.enqueue("api", {"url": "https://api.example.com/rate_limit"})
scheduler.start()  # Task will be deferred for 60 seconds if rate-limited

Explanation: The DelayException defers the task for 60 seconds, allowing compliance with rate limits or scheduling retries at a later time.

4. Clearing Failed Tasks

Remove failed tasks to clean up the database.

from simplepyq import SimplePyQ

def risky_task(args):
    raise Exception("Task failed intentionally")

scheduler = SimplePyQ("tasks.db")
scheduler.add_channel("risky", risky_task)
scheduler.enqueue("risky", {"data": "test"}, retries=1)
scheduler.run_until_complete()  # Task fails after one retry
scheduler.clear_failed("risky")  # Remove failed tasks for the 'risky' channel

Explanation: After the task fails and retries are exhausted, clear_failed removes it from the database, keeping it clean.

5. Requeuing Failed Tasks

Requeue failed tasks for another attempt.

from simplepyq import SimplePyQ

attempts = 0

def flaky_task(args):
    global attempts
    if attempts < 2:  # Fail on first attempt
        attempts += 1
        raise Exception("Temporary failure")
    print("Task succeeded")

scheduler = SimplePyQ("tasks.db")
scheduler.add_channel("flaky", flaky_task)
scheduler.enqueue("flaky", {}, retries=0)
scheduler.run_until_complete()  # Task fails
scheduler.requeue_failed("flaky", retries=1)  # Requeue with one retry
scheduler.run_until_complete()  # Task succeeds on second attempt

Explanation: Failed tasks are requeued with a new retry count, allowing recovery from temporary issues without manual intervention.

6. Removing a Channel

Delete a channel and its tasks when no longer needed.

from simplepyq import SimplePyQ

def temp_task(args):
    print(f"Processing {args['data']}")

scheduler = SimplePyQ("tasks.db")
scheduler.add_channel("temp", temp_task)
scheduler.enqueue("temp", {"data": "test"})
scheduler.run_until_complete()
scheduler.remove_channel("temp")  # Removes channel and all its tasks

Explanation: The temp channel and its tasks are removed, useful for cleanup when a task type is no longer needed.

7. Running Until Completion

Process all tasks synchronously until complete.

from simplepyq import SimplePyQ

def process_data(args):
    print(f"Processing {args['data']}")

scheduler = SimplePyQ("tasks.db")
scheduler.add_channel("data", process_data)
scheduler.enqueue("data", {"data": "item1"})
scheduler.enqueue("data", {"data": "item2"})
scheduler.run_until_complete()  # Blocks until all tasks are done

Explanation: run_until_complete processes all tasks and stops the scheduler, ideal for scripts or batch processing.

Testing

To run the included unit tests:

python -m unittest discover -s tests

This executes all tests in the tests/ directory, covering task execution, retries, delays, and task management.

License

Licensed under the Apache License 2.0. See the LICENSE file for details.


SimplePyQ is a project powered by The California Open Source Company.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

simplepyq-0.1.3.tar.gz (11.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

simplepyq-0.1.3-py3-none-any.whl (12.4 kB view details)

Uploaded Python 3

File details

Details for the file simplepyq-0.1.3.tar.gz.

File metadata

  • Download URL: simplepyq-0.1.3.tar.gz
  • Upload date:
  • Size: 11.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for simplepyq-0.1.3.tar.gz
Algorithm Hash digest
SHA256 41df2f8c62bae4ead6657a73ea95446d45288cf5e8ee56f905dbde1e02462b76
MD5 48aac2c10d625f0e4308ac1330262462
BLAKE2b-256 0269b734ff747a59be68d6dad5e64735d277015e2b2abd4aad09b2e4b5547036

See more details on using hashes here.

Provenance

The following attestation bundles were made for simplepyq-0.1.3.tar.gz:

Publisher: python-publish.yml on kdewald/simplepyq

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file simplepyq-0.1.3-py3-none-any.whl.

File metadata

  • Download URL: simplepyq-0.1.3-py3-none-any.whl
  • Upload date:
  • Size: 12.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for simplepyq-0.1.3-py3-none-any.whl
Algorithm Hash digest
SHA256 4c9cfd8ec23b8c6517f8f5ff81c3fcf04c74e98659f03173bdd426adb89377fc
MD5 b0c5e2e23dd5049d050582caacaad289
BLAKE2b-256 17883ef21e3c0a07cf875f51d3293222cfbeca55c3a307da10d5f3aa5f0bb84e

See more details on using hashes here.

Provenance

The following attestation bundles were made for simplepyq-0.1.3-py3-none-any.whl:

Publisher: python-publish.yml on kdewald/simplepyq

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page