Async task manager with Celery-like features and type checking. Fork of arq.
Project description
[ ~ Dependencies scanned by PyUp.io ~ ]
Async task manager with Celery-like features. Fork of arq.
Features
Celery-like @task decorator, adds .delay() to enqueue job
Proper mypy type checking: all arguments passed to .delay() will be checked against the original function signature
Graceful shutdown: waits until running tasks are finished
Installation
Darq uses aioredis 1.x as Redis client. Unfortunately, this library has been abandoned, and does not support Python 3.11. I made a fork with compatability fixes: evo-aioredis (https://github.com/evo-company/aioredis-py).
Because of this, aioredis is not currently added as Darq dependency, and you must install it yourself:
For Python<3.11 you can use:
pip install aioredis<2.0.0
For Python 3.11 (and older versions too) you can use fork:
pip install evo-aioredis<2.0.0
Quick start
# some_project/darq_app.py
import asyncio
import darq
darq = darq.Darq(redis_settings=darq.RedisSettings(host='redis'))
@darq.task
async def add_to_42(a: int) -> int:
return 42 + a
async def main():
# Before adding tasks to queue we should connect darq instance to redis
await darq.connect()
# Direct call job as function:
result = await add_to_42(5) # result == 47
# Celery-like add task to queue:
await add_to_42.delay(a=5)
await darq.disconnect()
if __name__ == '__main__':
asyncio.run(main())
And start worker:
python3 -m darq.cli -A some_project.darq_app.darq worker
Worker output:
15:24:42: Starting worker for 1 functions: some_project.darq_app.add_to_42
15:24:42: redis_version=5.0.7 mem_usage=834.87K clients_connected=1 db_keys=2
15:25:08: 0.22s → 1315f27608e9408392bf5d3310bca38c:darq_app.add_to_42(a=5)
15:25:08: 0.00s ← 1315f27608e9408392bf5d3310bca38c:darq_app.add_to_42 ● 47
Changelog
0.11.1 (2022-11-30)
Add Python 3.11 support (with evo-aioredis dependency instead of aioredis)
Remove pydantic dependency
Remove aioredis from dependencies to allow choose between aioredis and evo-aioredis - fork with Python 3.11 compatability
0.11.0 (2022-08-03)
Added ability to optionally pass ctx to the task, like this:
@task(with_ctx=True)
def foobar(ctx):
log.info('Foobar try %s', ctx['job_try'])
ctx contains: job_id, job_try, enqueue_time, score, metadata + all worker’s ctx (including custom context which can be passed via on_startup). Thanks to @kindermax (https://github.com/seedofjoy/darq/pull/426) !
0.10.2 (2022-02-03)
Add proper typing for functions wrapped with the @task decorator. Mypy will now check that parameters are passed correctly when calling func() and func.delay()
0.10.1 (2021-07-29)
Add sentinel_timeout (defaults to 0.2) param to RedisSettings
0.10.0 (2021-07-09)
Breaking change: Rename darq.worker.Function to darq.worker.Task
Made job to task naming migration
Add max_jobs parameter to CLI (thanks to @antonmyronyuk)
Fixed bug with expires argument: default_job_expires could not be replaced with None in @task or .apply_async
0.9.0 (2020-06-24)
Breaking change: Add scheduler_ctx param to on_scheduler_startup and on_scheduler_shutdown to share data between this callbacks. It already has ctx['redis'] - instance of ArqRedis
0.8.0 (2020-06-22)
Breaking change: Changed CLI command format. Before: darq some_project.darq_app.darq. Now: darq -A some_project.darq_app.darq worker
Breaking change: Scheduler (cron jobs) now run’s seperate from worker (see darq scheduler command)
Breaking change: Changed some function signatures (rename arguments)
Breaking change: Remove redis_pool param from Darq app
Add on_scheduler_startup and on_scheduler_shutdown callbacks
0.7.2 (2020-06-18)
Fix some types (cron, OnJobPrepublishType)
on_job_prerun now runs before “task started” log and on_job_postrun now runs after “task finished” log
0.7.1 (2020-05-25)
.apply_async: Make args and kwargs arguments optional
0.7.0 (2020-05-25)
Fork arq to project and merge it with darq (It was easier to rewrite arq than to write a wrapper)
Breaking change: Remove “magic” params from .delay. For enqueue job with special params added .apply_async.
Add watch-mode to CLI.
Fix: Now worker will not run cronjob if it’s functions queue not match with worker’s
0.6.0 (2020-03-08)
Breaking change: Changed Darq constructor from single config param to separate params.
arq_function.coroutine now has .delay method.
0.5.0 (2020-03-03)
Add on_job_prepublish(metadata, arq_function, args, kwargs) callback. metadata is mutable dict, which will be available at ctx['metadata'].
0.4.0 (2020-03-03)
Add default_job_expires param to Darq (if the job still hasn’t started after this duration, do not run it). Default - 1 day
Add expires param to @task (if set - overwrites default_job_expires)
0.3.1 (2020-03-02)
Rewrite warm shutdown: now during warm shutdown cron is disabled, on second signal the warm shutdown will be canceled
0.3.0 (2020-02-27)
Breaking change: on_job_prerun and on_job_postrun now accepts arq.worker.Function instead of the original function (it can still be accessed at arq_function.coroutine)
0.2.1 (2020-02-26)
Fix add_cron_jobs method. Tests added.
0.2.0 (2020-02-26)
Add on_job_prerun(ctx, function, args, kwargs) and on_job_postrun(ctx, function, args, kwargs, result) callbacks.
0.1.0 (2020-02-26)
Breaking change: Jobs no longer explicitly get JobCtx as the first argument, as in 99.9% cases it doesn’t need it. In future release will be possible to optionally pass JobCtx in some way.
Breaking change: All cron jobs should be wrapped in @task decorator
Directly pass functions to arq.Worker, not names.
0.0.3 (2020-02-25)
.delay() now returns arq_redis.enqueue_job result (Optional[Job])
Add py.typed file
Fixed add_cron_jobs typing
0.0.2 (2020-02-24)
Add add_cron_jobs method
0.0.1 (2020-02-21)
First release
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.