A simple queue for Python tasks
Project description
pytask
A simple sqlite3-based job queue with a worker. Main purpose is to run jobs in a queue. Jobs are not popped from the queue, which means the queue can act as a history.
Usage
The worker will run the function func for each job. The function will be passed a Job object. Which means that you can alter the job object in the function, and the newly updated job will be saved to the queue.
from pytask import Queue, Worker, Job, SQLDataType, SQLColumnConditions
def func(job: Job):
# Do something with the job
job.data["foo"] += 1
queue = Queue(schema=[
("foo", SQLDataType.INTEGER, [SQLColumnConditions.NOT_NULL]),
("bar", SQLDataType.TEXT, [SQLColumnConditions.NOT_NULL]),
("baz", SQLDataType.JSON, [SQLColumnConditions.NOT_NULL])
])
worker = Worker(queue, func)
queue.insert(Job(data={"foo": 1, "bar": "test", "baz": {"foo": "bar"}}))
worker.run()
Creating multiple queues or multiple workers is possible. Creating a new queue object won't actually create a new queue, it just creates a new connection to the queue. Which means you can have multiple queue objects pointing to the same queue, or you can use the same queue object for multiple workers.
Be careful to avoid race conditions when using the same queue object for multiple workers.
Flags
Flags are used to configure the behavior of the queue and worker.
Current flags:
auto_convert_json_keys: If True, the queue will automatically convert JSON keys to strings. Useful for retrieving and manipulating JSON data.pop_after_processing: If True, the job will be popped from the queue after processing.
from pytask import Queue, Worker, Job, SQLDataType, SQLColumnConditions, Flags
flags = Flags(auto_convert_json_keys=True, pop_after_processing=True)
queue = Queue(schema=[("foo", SQLDataType.INTEGER, [SQLColumnConditions.NOT_NULL])], flags=flags)
worker = Worker(queue, func, logger=logger)
worker.run()
Concurrent Worker
The concurrent worker is a worker that runs jobs in parallel. It uses a thread pool to run the jobs.
from pytask import Queue, ConcurrentWorker, Job, SQLDataType, SQLColumnConditions
worker = ConcurrentWorker(queue, func, logger=logger, interval=1, max_workers=16)
worker.run()
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pytask_queue-0.1.6.tar.gz.
File metadata
- Download URL: pytask_queue-0.1.6.tar.gz
- Upload date:
- Size: 8.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: pdm/2.18.2 CPython/3.10.12 Linux/5.15.167.4-microsoft-standard-WSL2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
1cb3379d62d5d67f27a95deff9655e0ab30d7e33458d7cc9b1e8043f5c169553
|
|
| MD5 |
2c9ebc255096f95e79777f276a4c39f7
|
|
| BLAKE2b-256 |
6fa9f6fc181d22c991f999fdf75bbdad20bd3da3c822dd3fc476f72fff1a8bd4
|
File details
Details for the file pytask_queue-0.1.6-py3-none-any.whl.
File metadata
- Download URL: pytask_queue-0.1.6-py3-none-any.whl
- Upload date:
- Size: 10.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: pdm/2.18.2 CPython/3.10.12 Linux/5.15.167.4-microsoft-standard-WSL2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
1ba04e78665651c1225bf069644e8530c399a23b13abeab117fdf6b5fc633c32
|
|
| MD5 |
a998325febb185cb35e70d01231197eb
|
|
| BLAKE2b-256 |
7614bdbdb83ea23033017072e6adf0b145a828c40da26611b9ddd678666d7624
|