Skip to main content

Extensible task queue used in DVC.

Project description

PyPI Status Python Version License

Tests Codecov pre-commit Black

dvc-task is a library for queuing, running and managing background jobs (processes) from standalone Python applications. dvc-task is built on Celery, but does not require a full AMQP messaging server (or any other “heavy” servers which are traditionally used as Celery brokers).

Features

  • dvc_task.proc module for running and managing background processes in Celery tasks

  • Preconfigured Celery app intended for use in standalone desktop applications

    • Uses Kombu filesystem transport as the message broker, and the standard filesystem Celery results backend

    • Allows standalone applications to make use of Celery without the use of additional messaging and results backend servers

  • Preconfigured “temporary” Celery worker which will automatically terminate itself when the Celery queue is empty

    • Allows standalone applications to start Celery workers as needed directly from Python code (as opposed to requiring a “run-forever” daemonized CLI celery worker)

Requirements

  • Celery 5.3 or later

  • Kombu 5.3 or later

Note: Windows is not officially supported in Celery, but dvc-task is tested on Windows (and used in DVC on Windows).

Installation

You can install dvc-task via pip from PyPI:

$ pip install dvc-task

Usage

Processes (dvc_task.proc)

The process module provides a simple API for managing background processes in background tasks. Background processes are run in Celery tasks, but process state is stored separately from Celery, so information about managed processes can be accessed from outside of the Celery producer or consumer application.

After you have configured a Celery application, jobs can be queued (and run) via ProcessManager.run (which returns a signature for the proc.tasks.run Celery task):

from dvc_task.proc import ProcessManager

@app.task
def my_task():
    manager = ProcessManager(wdir=".")
    manager.run(["echo", "hello world"], name="foo").delay()

The ProcessManager will create a subdirectory in wdir for each managed process.

$ tree .
.
└── 25mYD6MyLNewXXdMVYCCr3
    ├── 25mYD6MyLNewXXdMVYCCr3.json
    ├── 25mYD6MyLNewXXdMVYCCr3.out
    └── 25mYD6MyLNewXXdMVYCCr3.pid
1 directory, 3 files

At a minimum, the directory will contain <id>.pid and <id>.json files.

  • <id>.json: A JSON file describing the process containing the following dictionary keys:
    • pid: Process PID

    • stdout: Redirected stdout file path for the process (redirected to <id>.out by default)

    • stderr: Redirected stderr file path for the process (stderr is redirected to stdout by default)

    • stdin: Redirected stdin file path for the process (interactive processes are not yet supported, stdin is currently always null)

    • returncode: Return code for the process (null if the process has not exited)

  • <id>.pid: A standard pidfile containing only the process PID

ProcessManager instances can be created outside of a Celery task to manage and monitor processes as needed:

>>> from dvc_task.proc import ProcessManager
>>> manager = ProcessManager(wdir=".")
>>> names = [name for name, _info in manager.processes()]
['25mYD6MyLNewXXdMVYCCr3']
>>> for line in manager.follow(names[0]):
...     print(line)
...
hello world

Celery Workers (dvc_task.worker)

dvc-task includes a pre-configured Celery worker (TemporaryWorker) which can be started from Python code. The TemporaryWorker will consume Celery tasks until the queue is empty. Once the queue is empty, the worker will wait up until a specified timeout for new tasks to be added to the queue. If the queue remains empty after the timeout expires, the worker will exit.

To instantiante a worker with a 60-second timeout, with the Celery worker name my-worker-1:

>>> from dvc_task.worker import TemporaryWorker
>>> worker = TemporaryWorker(my_app, timeout=60)
>>> worker.start("my-worker-1")

Note that worker.start runs the Celery worker within the calling thread.

Celery Applications (dvc_task.app)

dvc-task includes a pre-configured Celery application (FSApp) which uses the Kombu filesystem transport as the Celery broker along with the Celery filesystem results storage backend. FSApp is intended to be used in standalone Python applications where a traditional Celery producer/consumer setup (with the appropriate messaging and storage backends) is unavailable.

>>> from dvc_task.app import FSApp
>>> my_app = FSApp(wdir=".")

FSApp provides iterators for accessing Kombu messages which are either waiting in the queue or have already been processed. This allows the caller to access Celery task information without using the Celery inspect API (which is only functional when a Celery worker is actively running).

>>> for msg in my_app.iter_processed():
...     msg
<Message object at 0x102e7f0d0 with details {'state': 'RECEIVED', 'content_type': 'application/json', 'delivery_tag': '0244c11a-1bcc-47fc-8587-66909a55fdc6', ...}>
<Message object at 0x1027fd4c0 with details {'state': 'RECEIVED', 'content_type': 'application/json', 'delivery_tag': '491415d1-9527-493a-a5d7-88ed355da77c', ...}>
<Message object at 0x102e6f160 with details {'state': 'RECEIVED', 'content_type': 'application/json', 'delivery_tag': 'ea6ab7a4-0398-42ab-9f12-8da1f8e12a8a', ...}>
<Message object at 0x102e6f310 with details {'state': 'RECEIVED', 'content_type': 'application/json', 'delivery_tag': '77c4a335-2102-4bee-9cb8-ef4d8ef9713f', ...}>

Contributing

Contributions are very welcome. To learn more, see the Contributor Guide.

License

Distributed under the terms of the Apache 2.0 license, dvc-task is free and open source software.

Issues

If you encounter any problems, please file an issue along with a detailed description.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dvc-task-0.1.5.tar.gz (36.3 kB view details)

Uploaded Source

Built Distribution

dvc_task-0.1.5-py3-none-any.whl (23.1 kB view details)

Uploaded Python 3

File details

Details for the file dvc-task-0.1.5.tar.gz.

File metadata

  • Download URL: dvc-task-0.1.5.tar.gz
  • Upload date:
  • Size: 36.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.1 CPython/3.9.15

File hashes

Hashes for dvc-task-0.1.5.tar.gz
Algorithm Hash digest
SHA256 7ed64b1d83df8be5523852e33485c5c245582cbfc40201733ffc9133de9a60dd
MD5 18f8e113f4cc6184ba427c2a4167d3df
BLAKE2b-256 486570e4d9d92dc13ebdcc0575ca77e4e1d8bad06b3021992707425f6b3db95c

See more details on using hashes here.

File details

Details for the file dvc_task-0.1.5-py3-none-any.whl.

File metadata

  • Download URL: dvc_task-0.1.5-py3-none-any.whl
  • Upload date:
  • Size: 23.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.1 CPython/3.9.15

File hashes

Hashes for dvc_task-0.1.5-py3-none-any.whl
Algorithm Hash digest
SHA256 69a022d4842af62a655da258a7203f9594e19e7bac211abb2cbf6b7ad6a97ded
MD5 a4e2d4d4ed9d90e2f792f18956bca0ee
BLAKE2b-256 2341cb28e41d24deadabfcd3729925513e6ea9b6f4b4e4f0c261c1492ca0f93d

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page