生产者-消费者工厂模块,支持多生产者、多消费者、多队列、资源控制、磁盘溢出
Project description
Factory Queue
生产者-消费者工厂模块,支持多生产者、多消费者、多队列、资源控制、磁盘溢出。
功能特性
- ✅ 多生产者/多消费者 - 支持并发处理,自由配置线程数
- ✅ 多队列管理 - 一个生产者可输出到多个队列
- ✅ 自动绑定 - 消费者自动绑定生产者,简化配置
- ✅ 资源控制 - 可设置内存上限、队列大小
- ✅ 磁盘溢出 - 内存不足时自动写入磁盘,防止OOM
- ✅ 优雅退出 - 完整的生产者-消费者同步机制
- ✅ 批量消费 - 支持按批次处理数据
- ✅ 实时监控 - 定时输出队列和工作者状态
- ✅ 彩色日志 - 不同级别日志使用不同颜色显示
安装
pip install factory-queue
快速开始
from factory_queue import Factory, ResourceConfig
import logging
# 设置日志
logging.basicConfig(level=logging.INFO)
# 定义处理函数
def my_process(data, producer):
"""生产者处理函数"""
result = {"consumer_a": None, "consumer_b": None}
processed = data * 2
if processed % 2 == 0:
result["consumer_a"] = processed
else:
result["consumer_b"] = processed
return result
def my_consume_a(data, consumer):
"""消费者A处理函数"""
print(f"消费者A处理: {data}")
def my_consume_b(data, consumer):
"""消费者B处理函数"""
print(f"消费者B处理: {data}")
# 创建工厂
config = ResourceConfig(
max_memory_mb=512,
max_queue_size=1000,
temp_dir="./temp_queue"
)
with Factory(resource_config=config) as factory:
# 创建输入队列
factory.create_queue("input")
# 创建生产者组
factory.create_producer_group(
name="main_producer",
input_queue_name="input",
output_consumer_names=["consumer_a", "consumer_b"],
process_func=my_process,
num_workers=2
)
# 创建消费者组(自动创建队列,自动绑定生产者)
factory.create_consumer_group(
name="consumer_a",
consume_func=my_consume_a,
num_workers=2,
batch_size=5000
)
factory.create_consumer_group(
name="consumer_b",
consume_func=my_consume_b,
num_workers=1
)
# 启动工厂
factory.start()
# 投放数据
for i in range(100):
factory.feed("input", i)
# 通知生产者:没有更多数据了
factory.end_feed(name="main_producer")
# 等待完成
factory.wait_complete()
主要类说明
Factory
工厂主类,管理整个生产消费流程。
主要方法:
create_queue(name)- 创建队列create_producer_group(...)- 创建生产者组create_consumer_group(...)- 创建消费者组feed(queue_name, data)- 投放数据end_feed(name)- 通知生产者结束start()- 启动工厂wait_complete()- 等待完成
ResourceConfig
资源配置类。
参数:
max_memory_mb- 最大内存使用量(MB),默认1024max_queue_size- 队列最大长度,默认10000disk_overflow_threshold- 磁盘溢出阈值,默认0.8temp_dir- 临时目录,默认系统临时目录
高级功能
共享属性
# 设置所有工作者共享的属性
factory.set_shared_attr("multiplier", 3)
# 在处理函数中获取
def my_process(data, producer):
multiplier = producer.get_attr("multiplier", 1)
return {"result": data * multiplier}
本地属性
# 设置单个生产者的本地属性
producer.set_attr("name", "producer_1")
# 本地属性优先于共享属性
value = producer.get_attr("name")
批量消费
factory.create_consumer_group(
name="consumer",
consume_func=my_consume,
batch_size=5000, # 每5000条批量处理
batch_timeout=5.0 # 超时5秒也处理
)
许可证
MIT License
作者
stabvale
贡献
欢迎提交 Issue 和 Pull Request!
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
factory_queue-0.1.1.tar.gz
(16.1 kB
view details)
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file factory_queue-0.1.1.tar.gz.
File metadata
- Download URL: factory_queue-0.1.1.tar.gz
- Upload date:
- Size: 16.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.9.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0a52bb8d41f11d9f40d92df11d6e128bf21a0a8da37acca314fdcd0bd9b9159b
|
|
| MD5 |
0d6d8838b58970c4a9d4a315e14b2844
|
|
| BLAKE2b-256 |
b32e24ff4f1b59ebab948e924c70c4862091f42c6dd2569e6ee23a4396814660
|
File details
Details for the file factory_queue-0.1.1-py3-none-any.whl.
File metadata
- Download URL: factory_queue-0.1.1-py3-none-any.whl
- Upload date:
- Size: 14.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.9.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
beb0452e5c9acc0858e154d48ba82aae23bf828f8dbb785d50fc3c18941b991b
|
|
| MD5 |
1c7a75dce809bbc9d819bc0d649d3001
|
|
| BLAKE2b-256 |
efc70df96916a1d70cfcf850debb72629d7a52df128a83c52f1d0e6195a0397b
|