Celery任务结果分片管理
Project description
celery-streaming-result
Celery任务结果分片管理。
安装
pip install celery-streaming-result
使用方法
服务端
import time
import redis
from celery.app import app_or_default
from celery_streaming_result import CeleryStreamingResultManager
app = app_or_default()
redis_instance = redis.Redis()
csrm = CeleryStreamingResultManager(redis_instance)
@app.task(bind=True)
def task1(celery_task):
result = []
for i in range(10):
csrm.append_result_chunk(celery_task, i)
result.append(i)
csrm.append_ended_chunk(celery_task)
return result
客户端
import redis
from celery.app import app_or_default
from celery_streaming_result import CeleryStreamingResultManager
from test_server import task1 # 根据你的task定义,正确引用
app = app_or_default()
redis_instance = redis.Redis()
csrm = CeleryStreamingResultManager(redis_instance)
atask1 = task1.delay() # 生成一个异步任务
for chunk in csrm.get_result_chunks(atask1): # 读取该异步任务的结果分片
print(chunk, end="-", flush=True)
版本记录
v0.1.0
- 首次发布。
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Close
Hashes for celery_streaming_result-0.1.0.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | 9402d9bfaedd69589b9053cd689544bb6e7b5985ac81dce2b0a8a326159b1db3 |
|
MD5 | f761ef2f3efd357be4c471dd792b40a4 |
|
BLAKE2b-256 | 47a937a6aecf6c03484b3116a8766da5f163d8cae607f3187e62f2bf743ea340 |
Close
Hashes for celery_streaming_result-0.1.0-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 4a3ee0c5dd460478124174539c0c6ce27d6b3d7da9529936ad35e6746031de60 |
|
MD5 | 61391c147b6afe2f2dff5a9226a97ba2 |
|
BLAKE2b-256 | ae2de008f850c371a9bb9b7bebe62faf62a76ad24d27098018e6a6c008102f92 |