Skip to main content

简单异步任务队列。版本[1.0.1] 更新内容:又是造轮子系列咯,Python上有很多成熟完善的异步任务队列框架可以用,比如Celery,或者RQ,不过这些都不自带消息队列服务,都需要使用Redis、RabbitMQ之类的消息队列才行,我用到小项目中又不需要附带这么多东西,于是自己动手来实现咯。

Project description

又是造轮子系列咯,Python上有很多成熟完善的异步任务队列框架可以用,比如Celery,或者RQ,不过这些都不自带消息队列服务,都需要使用Redis、RabbitMQ之类的消息队列才行,我用到小项目中又不需要附带这么多东西,于是自己动手来实现咯。

思路

  1. 将需要异步执行的任务添加到队列
  2. 自动从队列中取出任务,创建新线程执行
  3. 保存任务的执行结果和输出
  4. 任务完成,调用回调函数,处理返回的数据
  5. 使用输出重定向处理任务的输出

Demo

附上使用任务队列的demo代码:

if __name__ == '__main__':
    task_queue = TaskQueue(output_redirect=True)


    def fun1():
        time.sleep(2)
        return 1


    def fun2():
        time.sleep(3)
        return 2


    task_queue.put(Task(
        func=fun1,
        callback=lambda task, result: print(f'task1 result: {result}'),
    ))
    task_queue.put(Task(
        func=fun2,
        callback=lambda task, result: print(f'task2 result: {result}'),
    ))


    def custom_output(msg):
        print(f'[custom] {msg}')


    def fun3(num1, num2):
        print(f'num1={num1}')
        print(f'num2={num2}')
        return num1 + num2


    def callback(task_obj, result):
        print(f'task3 result={result}')
        output = task_obj.outputs
        output.custom = custom_output
        output.reset()
        output.to_custom()


    task_queue.put(Task(func=fun3, callback=callback, args=[2, 3]))

    print('task queue run')
    task_queue.run()
    print('do other things...')

    for i in range(0, 100):
        print(i * i)

wheel构建

python3.6 setup.py sdist bdist_wheel
twine upload dist/* -u "用户名" -p "密码"

下面分析一下几个关键部分的代码实现

优先级队列

Python标准库已经有实现好的优先级队列了,但是当在相同优先级下传入同样的对象时,他会自动比较这些对象,不过我们的Task类没有实现相关的运算符重载,所以无法比较。

要解决这个问题,要不就在Task里面实现运算符重载,要不就是自己实现一个优先级队列,我选择后者。

class PriorityQueue:
    """
    自己实现的优先级队列,使用了Python的堆
    """
    def __init__(self):
        self._index = 0
        self.queue = []

    def push(self, priority, val):
        heapq.heappush(self.queue, (priority, self._index, val))
        self._index += 1

    def pop(self):
        return heapq.heappop(self.queue)[-1]

    @property
    def empty(self):
        return len(self.queue) == 0

输出重定向

其实这个挺坑的了,毕竟是异步任务队列,难免涉及到线程的竞争问题,又没办法单独控制每个任务的输出,不过我还是做了,小项目的话还是可以用的(其实就是懒得移植一下已有代码)

代码如下,同样是使用队列来实现输出缓冲区,大小可以自己调整,默认支持输出到控制台、文件或者返回列表。

一般重定向输出的话需要自己实现可以输出处理函数,直接给Redirection类的custom 属性赋值即可。

class Redirection:
    def __init__(self, buffer_size=512):
        self.buffer = Queue(maxsize=512)
        self._console = sys.stdout
        # 自定义的输出端
        self.custom = None

    def write(self, output_stream):
        # 加入缓冲区队列
        self.buffer.put(output_stream)

    def to_console(self):
        sys.stdout = self._console
        # 出列
        while not self.buffer.empty():
            print(self.buffer.get())

    def to_file(self, file_path):
        with open(file_path, 'w+') as f:
            sys.stdout = f
            while not self.buffer.empty():
                print(self.buffer.get())
            f.close()

    def to_custom(self):
        while not self.buffer.empty():
            self.custom(self.buffer.get())

    def to_list(self):
        data = []
        while not self.buffer.empty():
            data.append(self.buffer.get())
        return data

    def flush(self):
        self.buffer.empty()

    def reset(self):
        sys.stdout = self._console

Task类

没什么好说的,定义任务类,Task,代码如下:

class Task:
    def __init__(self, func, callback=None, priority=Priority.MIDDLE, args=(), kwargs={}):
        """
        Args:
            func: 需要执行的函数
            callback:  执行完的回调函数
            priority: 优先级
            *args:
            **kwargs:
        """
        self._id = uuid.uuid4().hex
        self.function = func
        self.callback = callback
        self.priority = priority
        self.args = args
        self.kwargs = kwargs
        # 任务运行过程的输出,stdout的输出
        self._outputs: Redirection = None

    @property
    def id(self):
        return self._id

    @property
    def outputs(self) -> Redirection:
        return self._outputs

    @outputs.setter
    def outputs(self, value: Redirection):
        self._outputs = value

    def run(self):
        try:
            if self.callback:
                # 回调函数原型 callback(task_obj, result)
                result = self.callback(self, self.function(*self.args, **self.kwargs))
            else:
                result = self.function(*self.args, **self.kwargs)
            return result
        except Exception as e:
            if self.callback:
                result = self.callback(self, e)
            else:
                result = e
            return result

使用起来很简单,如下:

def fun1(num1, num2):
    print(f'num1={num1}')
    print(f'num2={num2}')
    return num1 + num2

Task(
    func=fun1,
    callback=lambda task, result: print(f'task result: {result}'),
)

# 也可以写成这样
Task(
    func=lambda num1, num2: num1 + num2,
    callback=lambda task, result: print(f'task result: {result}'),
    args=[2, 3]
)

任务队列

最后是实现任务队列,也很简单,根据任务的优先级进行调度即可。

class TaskQueue:
    """
    基于线程的异步任务队列
    todo 下次做一个基于进程的队列,充分利用多核CPU性能
    """

    def __init__(self, output_redirect=False):
        self.queue = PriorityQueue()
        self.output_redirect = output_redirect
        self._redirect_objs = {}
        self._results = {}

    def put(self, task: Task):
        """
        将task加入任务列表
        Args:
            task:
        Returns:返回task id
        """
        self.queue.push(task.priority, task)
        return task.id

    def get(self):
        return self.queue.pop()

    def run(self):
        while not self.queue.empty:
            task = self.get()
            # 开启新线程
            t = threading.Thread(target=self._task_wrapper, name=task.id, args=[task])
            self._log(f'Start thread {task.id}')
            t.start()

    def get_output(self, task_id: str) -> Redirection:
        return self._redirect_objs.get(task_id, None)

    def get_result(self, task_id: str):
        return self._results.get(task_id, None)

    @staticmethod
    def _log(msg: str):
        """日志输出接口,可以替换为日志组件"""
        print(f'[TaskQueue] {msg}')

    def _task_wrapper(self, task: Task):
        if self.output_redirect:
            if task.id in self._redirect_objs:
                redirect_obj = self._redirect_objs[task.id]
            else:
                redirect_obj = Redirection(2048)
                self._redirect_objs[task.id] = redirect_obj
            # 重定向输出
            sys.stdout = redirect_obj
            task.outputs = redirect_obj
            result = task.run()
            # 恢复默认输出
            redirect_obj.reset()
            self._log(f'Task finished. {task.id}')
        else:
            result = task.run()

        # 保存结果
        self._results[task.id] = result

整个项目代码就在这了,很简单,也有很多不足的地方,不过小项目用用勉强还可以吧~

过几天发布到pip,有需要的同学直接pip安装就可以使用,请关注博客更新。

参考资料

欢迎交流

我整理了一系列的技术文章和资料,在公众号「程序设计实验室」后台回复 linux、flutter、c#、netcore、android、kotlin、java、python 等可获取相关技术文章和资料,同时有任何问题都可以在公众号后台留言~

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

onecat_task_queue-1.0.1.tar.gz (6.9 kB view details)

Uploaded Source

File details

Details for the file onecat_task_queue-1.0.1.tar.gz.

File metadata

  • Download URL: onecat_task_queue-1.0.1.tar.gz
  • Upload date:
  • Size: 6.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.2.0 pkginfo/1.5.0.1 requests/2.24.0 setuptools/49.2.0 requests-toolbelt/0.9.1 tqdm/4.48.0 CPython/3.6.9

File hashes

Hashes for onecat_task_queue-1.0.1.tar.gz
Algorithm Hash digest
SHA256 61b55def064f1919e343cb827208aa07debc9cfba3aeabab0981733d7c344115
MD5 6b0e69cdc50fe90b1983f4a0f655d694
BLAKE2b-256 2697e7ab87ab4964fa6c614f7a5a2aa36d73ed73da0c7b26bcf0fb14b7dd4c4b

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page