Skip to main content

A simple queue for Python tasks

Project description

pytask

A simple sqlite3-based job queue with a worker. Main purpose is to run jobs in a queue. Jobs are not popped from the queue, which means the queue can act as a history.

Usage

The worker will run the function func for each job. The function will be passed a Job object. Which means that you can alter the job object in the function, and the newly updated job will be saved to the queue.

from pytask import Queue, Worker, Job, SQLDataType, SQLColumnConditions

def func(job: Job):
    # Do something with the job
    job.data["foo"] += 1

queue = Queue(schema=[
    ("foo", SQLDataType.INTEGER, [SQLColumnConditions.NOT_NULL]), 
    ("bar", SQLDataType.TEXT, [SQLColumnConditions.NOT_NULL]), 
    ("baz", SQLDataType.JSON, [SQLColumnConditions.NOT_NULL])
])
worker = Worker(queue, func)

queue.insert(Job(data={"foo": 1, "bar": "test", "baz": {"foo": "bar"}}))

worker.run()

Creating multiple queues or multiple workers is possible. Creating a new queue object won't actually create a new queue, it just creates a new connection to the queue. Which means you can have multiple queue objects pointing to the same queue, or you can use the same queue object for multiple workers.

Be careful to avoid race conditions when using the same queue object for multiple workers.

Flags

Flags are used to configure the behavior of the queue and worker.

Current flags:

  • auto_convert_json_keys: If True, the queue will automatically convert JSON keys to strings. Useful for retrieving and manipulating JSON data.
  • pop_after_processing: If True, the job will be popped from the queue after processing.
from pytask import Queue, Worker, Job, SQLDataType, SQLColumnConditions, Flags

flags = Flags(auto_convert_json_keys=True, pop_after_processing=True)
queue = Queue(schema=[("foo", SQLDataType.INTEGER, [SQLColumnConditions.NOT_NULL])], flags=flags)

worker = Worker(queue, func, logger=logger)
worker.run()

Concurrent Worker

The concurrent worker is a worker that runs jobs in parallel. It uses a thread pool to run the jobs.

from pytask import Queue, ConcurrentWorker, Job, SQLDataType, SQLColumnConditions

worker = ConcurrentWorker(queue, func, logger=logger, interval=1, max_workers=16)
worker.run()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pytask_queue-1.0.0.tar.gz (8.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pytask_queue-1.0.0-py3-none-any.whl (10.3 kB view details)

Uploaded Python 3

File details

Details for the file pytask_queue-1.0.0.tar.gz.

File metadata

  • Download URL: pytask_queue-1.0.0.tar.gz
  • Upload date:
  • Size: 8.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: pdm/2.18.2 CPython/3.10.12 Linux/5.15.167.4-microsoft-standard-WSL2

File hashes

Hashes for pytask_queue-1.0.0.tar.gz
Algorithm Hash digest
SHA256 92b88ac741e0c74e6990cbd80d30c7068c6b43cf46a3390ec1ac14d7fb6bf1e1
MD5 51d5c081b9bbe04e9bdfa9307991506d
BLAKE2b-256 6f6e8afba960e2fe5d3a9d0a0036fa50001e9364ba6219d2ae038b6c7c7d771d

See more details on using hashes here.

File details

Details for the file pytask_queue-1.0.0-py3-none-any.whl.

File metadata

  • Download URL: pytask_queue-1.0.0-py3-none-any.whl
  • Upload date:
  • Size: 10.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: pdm/2.18.2 CPython/3.10.12 Linux/5.15.167.4-microsoft-standard-WSL2

File hashes

Hashes for pytask_queue-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 67ca897012885c1f4d98e71d66795397be5089a4c60b558070e1a2eb7e9542a6
MD5 2087fb91ee3f1403e7da6b62861299d1
BLAKE2b-256 969274c63a9efb6af3600c187a5fac434d52a5dbae5dec19dd300b7bd51dd0e6

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page