Skip to main content

基于多进程、异步IO的任务处理器

Project description

多进程任务

计算密集型应用

# 实例化多进程任务句柄
import time
from kordar_task import task, processmgr

# work_size=工作进程(请设置小于cpu数据)
# queue_buff_len=投递任务最大数量,send队列满则阻塞
handler = processmgr.ProcessTaskHandler(work_size=5, queue_buff_len=20)
# 添加任务逻辑
handler.add_task(task.DefaultTask())
# 启动任务处理器
handler.start_work_pool()

# 发送任务数据到进程队列
handler.send(task.DefaultBody())
handler.send(task.DefaultBody())

time.sleep(10)

基于异步IO的任务

IO密集型应用

# 实例化异步处理对象
import time
from kordar_task import task, asyncmgr

# work_size=工作池数量,根据实际情况设置,越大并发越高
# queue_buff_len=投递任务最大数量,send队列满则异常
h = asyncmgr.AsyncTaskHandler(work_size=5, queue_buff_len=20)
# 添加异步任务
# TODO 注意:设置task为异步处理函数
h.add_task(task.DefaultAsyncTask())

# 运行异步任务
h.start_work_pool()

# 向异步队列发送数据
h.send(task.DefaultAsyncBody(1))
h.send(task.DefaultAsyncBody(2))

time.sleep(10)

自定义任务

from kordar_task.task import ibody, itask
from kordar_task import write_logger
import asyncio

## 同步任务
class DefaultBody(ibody):
    def task_id(self):
        return "default-kordar_task"


class DefaultTask(itask):
    def id(self):
        return "default-kordar_task"

    def execute(self, body):
        write_logger("defaultTask, name = %s" % body.task_id())

# 异步任务
class DefaultAsyncBody(ibody):
    def __init__(self, tid):
        self.id = tid

    def task_id(self):
        return "default-kordar_async_task"


class DefaultAsyncTask(itask):
    def id(self):
        return "default-kordar_async_task"

    async def execute(self, body):
        i = 0
        while i < 5:
            write_logger("DefaultAsyncTask, id = %s" % body.id)
            i = i + 1
            await asyncio.sleep(1)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kordar-task-0.0.3.tar.gz (4.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

kordar_task-0.0.3-py3-none-any.whl (6.4 kB view details)

Uploaded Python 3

File details

Details for the file kordar-task-0.0.3.tar.gz.

File metadata

  • Download URL: kordar-task-0.0.3.tar.gz
  • Upload date:
  • Size: 4.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.11.0

File hashes

Hashes for kordar-task-0.0.3.tar.gz
Algorithm Hash digest
SHA256 fa5bf87f8f1d70e75656b456b407612a6c89572114ef67457a3b6ef71422fc34
MD5 4fe639257764f5e13f3e96f30f395949
BLAKE2b-256 179fe5c7296dcfb43478a4a8b8d95392d017e5b1f2ad4b5c050e5b7f594920cd

See more details on using hashes here.

File details

Details for the file kordar_task-0.0.3-py3-none-any.whl.

File metadata

  • Download URL: kordar_task-0.0.3-py3-none-any.whl
  • Upload date:
  • Size: 6.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.11.0

File hashes

Hashes for kordar_task-0.0.3-py3-none-any.whl
Algorithm Hash digest
SHA256 d5ac620f02425992865a9b7c1ffa8424f6ec81a11eb949eeb583f22efcc75185
MD5 ab092c476f60ec3be9ba844d13f18dae
BLAKE2b-256 72502a0989f0a15bef5994ca5a6d69531e3927260ab51fde1a3948d181b0476c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page