Celery任务结果分片管理
Project description
celery-streaming-result
Celery任务结果分片管理。
安装
pip install celery-streaming-result
使用方法
服务端
import time
import redis
from celery.app import app_or_default
from celery_streaming_result import CeleryStreamingResultManager
app = app_or_default()
redis_instance = redis.Redis()
csrm = CeleryStreamingResultManager(redis_instance)
@app.task(bind=True)
def task1(celery_task):
result = []
for i in range(10):
csrm.append_result_chunk(celery_task, i)
result.append(i)
csrm.append_ended_chunk(celery_task)
return result
客户端(同步)
import redis
from celery.app import app_or_default
from celery_streaming_result import CeleryStreamingResultManager
# 根据你的task定义,正确引用
from test_server import task1
app = app_or_default()
redis_instance = redis.Redis()
csrm = CeleryStreamingResultManager(redis_instance)
# 生成一个异步任务
atask1 = task1.delay()
# 读取该异步任务的结果分片
for chunk in csrm.get_result_chunks(atask1):
print(chunk, end="-", flush=True)
客户端(异步)
from redis import asyncio as aioredis
from celery.app import app_or_default
from celery_streaming_result import CeleryStreamingResultManager
from celery_streaming_result import start_celery_task_async
from celery_streaming_result import get_celery_task_result_async
from test_server import task1 # 根据你的task定义,正确引用
app = app_or_default()
redis_instance = aioredis.Redis()
csrm = CeleryStreamingResultManager(redis_instance)
async def on_finished(celery_task, break_flag=False):
print("on finished...")
task_result = await get_celery_task_result_async(
celery_task,
)
# 这里的task_result值是celery任务的返回值。
# 一般来说是所有结果分片的集合,但实际只取决于celery任务的实现。
atask1 = await start_celery_task_async(
task1
) # task1.delay()是一个同步函数。需要使用`sync_to_async`进行转化。
# 读取该异步任务的结果分片,如果任务结果,则回调on_finished函数。
async for chunk in csrm.get_result_chunks(
atask1,
on_finished=on_finished,
):
print(chunk, end="-", flush=True)
版本记录
v0.1.0
- 首次发布。
v0.1.1
- 添加asyncio支持。
- 获取结果支持on_finished回调。
v0.1.3
- 流式中断支持。
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Close
Hashes for celery_streaming_result-0.1.3.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | 9a2a7f60c60a02600143ae122e3ba92d57d23137d2c4a160a66b2d762ea59a1b |
|
MD5 | 8c1f627cea0de228928b167e7c1eb5ca |
|
BLAKE2b-256 | adef37cc840f6a3ed0927d8fe2d18445d26993c076a27b9bdabca9e8484ddddb |
Close
Hashes for celery_streaming_result-0.1.3-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 88651642c032c42b00136af41ab048cb3baafbabc4d281428a4abd5f25babb55 |
|
MD5 | aa441195bd156c1d9b2314d0e454595f |
|
BLAKE2b-256 | 627a694656ce345f6f3639aec6fd34bf615f736bf339731054254f3a788b6680 |