Celery任务结果分片管理
Project description
celery-streaming-result
Celery任务结果分片管理。
安装
pip install celery-streaming-result
使用方法
服务端
import time
import redis
from celery.app import app_or_default
from celery_streaming_result import CeleryStreamingResultManager
app = app_or_default()
redis_instance = redis.Redis()
csrm = CeleryStreamingResultManager(redis_instance)
@app.task(bind=True)
def task1(celery_task):
result = []
for i in range(10):
csrm.append_result_chunk(celery_task, i)
result.append(i)
csrm.append_ended_chunk(celery_task)
return result
客户端(同步)
import redis
from celery.app import app_or_default
from celery_streaming_result import CeleryStreamingResultManager
# 根据你的task定义,正确引用
from test_server import task1
app = app_or_default()
redis_instance = redis.Redis()
csrm = CeleryStreamingResultManager(redis_instance)
# 生成一个异步任务
atask1 = task1.delay()
# 读取该异步任务的结果分片
for chunk in csrm.get_result_chunks(atask1):
print(chunk, end="-", flush=True)
客户端(异步)
from redis import asyncio as aioredis
from celery.app import app_or_default
from celery_streaming_result import CeleryStreamingResultManager
from celery_streaming_result import start_celery_task_async
from celery_streaming_result import get_celery_task_result_async
from test_server import task1 # 根据你的task定义,正确引用
app = app_or_default()
redis_instance = aioredis.Redis()
csrm = CeleryStreamingResultManager(redis_instance)
async def on_finished(
celery_task,
):
print("on finished...")
task_result = await get_celery_task_result_async(
celery_task,
)
# 这里的task_result值是celery任务的返回值。
# 一般来说是所有结果分片的集合,但实际只取决于celery任务的实现。
atask1 = await start_celery_task_async(
task1
) # task1.delay()是一个同步函数。需要使用`sync_to_async`进行转化。
# 读取该异步任务的结果分片,如果任务结果,则回调on_finished函数。
async for chunk in csrm.get_result_chunks(
atask1,
on_finished=on_finished,
):
print(chunk, end="-", flush=True)
版本记录
v0.1.0
- 首次发布。
v0.1.1
- 添加asyncio支持。
- 获取结果支持on_finished回调。
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Close
Hashes for celery_streaming_result-0.1.1.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | fd49c74015689664001c7c367369081a3c107b4dde7021ee1499a3c9f58ae6c1 |
|
MD5 | 4a5057caccd75af3312d21f20ddf7160 |
|
BLAKE2b-256 | 9605b4da58ce02ffca4ac198cec15d54f2f465fa887a70f8ca35292cd303d302 |
Close
Hashes for celery_streaming_result-0.1.1-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 65b6e4469ff4f92cd14dd1d670c57c9f6f47b6b52120675375f302ab40019e38 |
|
MD5 | d96df951e435fb9fd84c1a9f45a52e09 |
|
BLAKE2b-256 | 170d99347a09f383929d3d85ea808244fef41cc2239e476843a5b9732f42ebe7 |