Skip to main content

Celery任务结果分片管理

Project description

celery-streaming-result

Celery任务结果分片管理。

安装

pip install celery-streaming-result

使用方法

服务端

import time
import redis
from celery.app import app_or_default
from celery_streaming_result import CeleryStreamingResultManager

app = app_or_default()
redis_instance = redis.Redis()
csrm = CeleryStreamingResultManager(redis_instance)

@app.task(bind=True)
def task1(celery_task):
    result = []
    for i in range(10):
        csrm.append_result_chunk(celery_task, i)
        result.append(i)
    csrm.append_ended_chunk(celery_task)
    return result

客户端(同步)

import redis
from celery.app import app_or_default
from celery_streaming_result import CeleryStreamingResultManager

# 根据你的task定义,正确引用
from test_server import task1

app = app_or_default()
redis_instance = redis.Redis()
csrm = CeleryStreamingResultManager(redis_instance)

# 生成一个异步任务
atask1 = task1.delay()
# 读取该异步任务的结果分片
for chunk in csrm.get_result_chunks(atask1):
    print(chunk, end="-", flush=True)

客户端(异步)

from redis import asyncio as aioredis
from celery.app import app_or_default
from celery_streaming_result import CeleryStreamingResultManager
from celery_streaming_result import start_celery_task_async
from celery_streaming_result import get_celery_task_result_async
from test_server import task1  # 根据你的task定义,正确引用

app = app_or_default()
redis_instance = aioredis.Redis()
csrm = CeleryStreamingResultManager(redis_instance)


async def on_finished(
    celery_task,
):
    print("on finished...")
    task_result = await get_celery_task_result_async(
        celery_task,
    )
    # 这里的task_result值是celery任务的返回值。
    # 一般来说是所有结果分片的集合,但实际只取决于celery任务的实现。


atask1 = await start_celery_task_async(
    task1
)  # task1.delay()是一个同步函数。需要使用`sync_to_async`进行转化。
# 读取该异步任务的结果分片,如果任务结果,则回调on_finished函数。
async for chunk in csrm.get_result_chunks(
    atask1,
    on_finished=on_finished,
):
    print(chunk, end="-", flush=True)

版本记录

v0.1.0

  1. 首次发布。

v0.1.1

  1. 添加asyncio支持。
  2. 获取结果支持on_finished回调。

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

celery_streaming_result-0.1.1.tar.gz (5.2 kB view details)

Uploaded Source

Built Distribution

celery_streaming_result-0.1.1-py3-none-any.whl (5.3 kB view details)

Uploaded Python 3

File details

Details for the file celery_streaming_result-0.1.1.tar.gz.

File metadata

File hashes

Hashes for celery_streaming_result-0.1.1.tar.gz
Algorithm Hash digest
SHA256 fd49c74015689664001c7c367369081a3c107b4dde7021ee1499a3c9f58ae6c1
MD5 4a5057caccd75af3312d21f20ddf7160
BLAKE2b-256 9605b4da58ce02ffca4ac198cec15d54f2f465fa887a70f8ca35292cd303d302

See more details on using hashes here.

File details

Details for the file celery_streaming_result-0.1.1-py3-none-any.whl.

File metadata

File hashes

Hashes for celery_streaming_result-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 65b6e4469ff4f92cd14dd1d670c57c9f6f47b6b52120675375f302ab40019e38
MD5 d96df951e435fb9fd84c1a9f45a52e09
BLAKE2b-256 170d99347a09f383929d3d85ea808244fef41cc2239e476843a5b9732f42ebe7

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page