Skip to main content

A simple arq library.

Project description

ff-arq

一个简洁易用的 arq 封装库,简化arq使用流程。

主要功能

  • 支持任务/定时任务装饰器
  • 支持自动加载指定目录下的任务
  • 支持arq worker服务重启后,任务快速重启

安装

pip install ff-arq

快速开始

tasks/__init__.py文件内定义好任务

import asyncio
import logging
from ff_arq import task, cron_task

@task()
async def test_task(message: str):
    """
    测试任务
    """
    await asyncio.sleep(5)
    logging.debug(f"test_task completed, message: {message}")


@cron_task(second={0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50,55})  # 每分钟执行一次
async def test_cron_task():
    """
    每隔5秒,输出日志 test_cron_task executed
    """
    logging.debug("test_cron_task executed")

main.py 文件内编写一下代码

import asyncio
import logging
from ff_arq import TaskManager


logging.basicConfig(level=logging.DEBUG)

async def start_task_queue():
    """
    启动任务
    """
    # 创建任务队列和任务管理器
    config = {
        "redis_dsn": "redis://localhost:6379/0",
        "task_dir_path": "src/tasks", # auto load arq task from dir
        "queues": {
            # queue_name: queue_config
            "default": {
                "concurrency": 10,
                "task_timeout": 300,
                "keep_result": 3600,
                "retry_jobs": True,
                "max_tries": 5,
                "handle_signals": False,
            }
        }
    }
    await TaskManager.init(**config).run()

async def run_test_task():
    """
    执行task
    """
    from tasks import test_task
    queue_job = await test_task(message="hello ff-arq", enqueue=True)
    logging.debug(f"queue job: {queue_job}")
    # 等待5秒后,在日志里面可以看到:test_task completed, message: hello ff-arq

async def main():

    tasks = [
        start_task_queue(),
        run_test_task()
    ]
    await asyncio.gather(*tasks)

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

更多示例

请参考 tests 目录下的用例。

贡献指南

欢迎提交 Issue 或 PR,完善功能或修复问题。请确保代码风格与项目保持一致,并补充必要的测试。

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ff_arq-0.1.0.tar.gz (9.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

ff_arq-0.1.0-py2.py3-none-any.whl (8.1 kB view details)

Uploaded Python 2Python 3

File details

Details for the file ff_arq-0.1.0.tar.gz.

File metadata

  • Download URL: ff_arq-0.1.0.tar.gz
  • Upload date:
  • Size: 9.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.4

File hashes

Hashes for ff_arq-0.1.0.tar.gz
Algorithm Hash digest
SHA256 b7702bd974664bd2b36bad3d3649bacc72e25be8d93f3407dec196ad3cd1d450
MD5 8385fa97dd628cfabde29429cead0ff2
BLAKE2b-256 18699da90c999c3666d68e1929bea7df4d4a0de4b086dece5046a142e5cb1674

See more details on using hashes here.

File details

Details for the file ff_arq-0.1.0-py2.py3-none-any.whl.

File metadata

  • Download URL: ff_arq-0.1.0-py2.py3-none-any.whl
  • Upload date:
  • Size: 8.1 kB
  • Tags: Python 2, Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.4

File hashes

Hashes for ff_arq-0.1.0-py2.py3-none-any.whl
Algorithm Hash digest
SHA256 8af45c62d4ec742fa59c5a1e0972ff5b50a3832b3f583a4819d7d51ed3f36b2e
MD5 de277a91b7ae9ded7f1779dfb78f2eb5
BLAKE2b-256 2e37051204c331d9fcc06e62c55e2759172cae53c7a2c56511731fdf0cb5b549

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page