Skip to main content

A simple queue for Python tasks

Project description

Logo Picture

pytask

A simple sqlite3-based job queue with a worker. Main purpose is to run jobs in a queue. Jobs are not popped from the queue, which means the queue can act as a history.

Installation

pip install pytask-queue

Usage

The worker will run the function func for each job. The function will be passed a Job object. Which means that you can alter the job object in the function, and the newly updated job will be saved to the queue.

# python process 1
from pytask import Queue, Job, SQLDataType, SQLColumnConditions

queue = Queue(schema=[
    ("foo", SQLDataType.INTEGER, [SQLColumnConditions.NOT_NULL]), 
    ("bar", SQLDataType.TEXT, [SQLColumnConditions.NOT_NULL]), 
    ("baz", SQLDataType.JSON, [SQLColumnConditions.NOT_NULL])
])
queue.insert(Job(data={"foo": 1, "bar": "test", "baz": {"foo": "bar"}}))
# python process 2
from <relative_file> import queue
from pytask import Job

def func(job: Job):
    # Do something with job
    job.data["foo"] += 1

worker = Worker(queue, func)
worker.run()

Creating multiple queues or multiple workers is possible. Creating a new queue object won't actually create a new queue, it just creates a new connection to the queue. Which means you can have multiple queue objects pointing to the same queue, or you can use the same queue object for multiple workers.

Be careful to avoid race conditions when using the same queue object for multiple workers.

Flags

Flags are used to configure the behavior of the queue and worker.

Current flags:

  • auto_convert_json_keys: If True, the queue will automatically convert JSON keys to strings. Useful for retrieving and manipulating JSON data.
  • pop_after_processing: If True, the job will be popped from the queue after processing.
from pytask import Queue, Worker, Job, SQLDataType, SQLColumnConditions, Flags

flags = Flags(auto_convert_json_keys=True, pop_after_processing=True)
queue = Queue(schema=[("foo", SQLDataType.INTEGER, [SQLColumnConditions.NOT_NULL])], flags=flags)

worker = Worker(queue, func, logger=logger)
worker.run()

Concurrent Worker

The concurrent worker is a worker that runs jobs in parallel. It uses a thread pool to run the jobs.

from pytask import Queue, ConcurrentWorker, Job, SQLDataType, SQLColumnConditions

worker = ConcurrentWorker(queue, func, logger=logger, interval=1, max_workers=16)
worker.run()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pytask_queue-1.0.3.tar.gz (9.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pytask_queue-1.0.3-py3-none-any.whl (12.0 kB view details)

Uploaded Python 3

File details

Details for the file pytask_queue-1.0.3.tar.gz.

File metadata

  • Download URL: pytask_queue-1.0.3.tar.gz
  • Upload date:
  • Size: 9.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: pdm/2.22.1 CPython/3.10.5 Linux/6.5.0-1025-azure

File hashes

Hashes for pytask_queue-1.0.3.tar.gz
Algorithm Hash digest
SHA256 e39be0d57ea067f463df757e35901c98db5b009cd9b3eb41fd707d6eaa61b5ec
MD5 5f3b18bcf4b55efb76be12f8662ea3fe
BLAKE2b-256 20f3837fcf7c4c337baa2124cc83583640f24de804eb947435ba182ab5228596

See more details on using hashes here.

File details

Details for the file pytask_queue-1.0.3-py3-none-any.whl.

File metadata

  • Download URL: pytask_queue-1.0.3-py3-none-any.whl
  • Upload date:
  • Size: 12.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: pdm/2.22.1 CPython/3.10.5 Linux/6.5.0-1025-azure

File hashes

Hashes for pytask_queue-1.0.3-py3-none-any.whl
Algorithm Hash digest
SHA256 a49fb1749ab0497b10e417d1c07d60a8cb36c0045a52bfc4e5c0c51a12435d7c
MD5 e79b860eff9e911d06cf51b2feade1e5
BLAKE2b-256 a335693d63d66fd203b9ac92a24ae7d06eb805c661f123800c9aa5795fa0d473

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page