Skip to main content

生产者-消费者工厂模块,支持多生产者、多消费者、多队列、资源控制、磁盘溢出

Project description

Factory Queue

PyPI version Python License

生产者-消费者工厂模块,支持多生产者、多消费者、多队列、资源控制、磁盘溢出。

功能特性

  • 多生产者/多消费者 - 支持并发处理,自由配置线程数
  • 多队列管理 - 一个生产者可输出到多个队列
  • 自动绑定 - 消费者自动绑定生产者,简化配置
  • 资源控制 - 可设置内存上限、队列大小
  • 磁盘溢出 - 内存不足时自动写入磁盘,防止OOM
  • 优雅退出 - 完整的生产者-消费者同步机制
  • 批量消费 - 支持按批次处理数据
  • 实时监控 - 定时输出队列和工作者状态
  • 彩色日志 - 不同级别日志使用不同颜色显示

安装

pip install factory-queue

快速开始

from factory_queue import Factory, ResourceConfig
import logging

# 设置日志
logging.basicConfig(level=logging.INFO)

# 定义处理函数
def my_process(data, producer):
    """生产者处理函数"""
    result = {"consumer_a": None, "consumer_b": None}
    processed = data * 2
    
    if processed % 2 == 0:
        result["consumer_a"] = processed
    else:
        result["consumer_b"] = processed
    
    return result

def my_consume_a(data, consumer):
    """消费者A处理函数"""
    print(f"消费者A处理: {data}")

def my_consume_b(data, consumer):
    """消费者B处理函数"""
    print(f"消费者B处理: {data}")

# 创建工厂
config = ResourceConfig(
    max_memory_mb=512,
    max_queue_size=1000,
    temp_dir="./temp_queue"
)

with Factory(resource_config=config) as factory:
    # 创建输入队列
    factory.create_queue("input")
    
    # 创建生产者组
    factory.create_producer_group(
        name="main_producer",
        input_queue_name="input",
        output_consumer_names=["consumer_a", "consumer_b"],
        process_func=my_process,
        num_workers=2
    )
    
    # 创建消费者组(自动创建队列,自动绑定生产者)
    factory.create_consumer_group(
        name="consumer_a",
        consume_func=my_consume_a,
        num_workers=2,
        batch_size=5000
    )
    
    factory.create_consumer_group(
        name="consumer_b",
        consume_func=my_consume_b,
        num_workers=1
    )
    
    # 启动工厂
    factory.start()
    
    # 投放数据
    for i in range(100):
        factory.feed("input", i)
    
    # 通知生产者:没有更多数据了
    factory.end_feed(name="main_producer")
    
    # 等待完成
    factory.wait_complete()

主要类说明

Factory

工厂主类,管理整个生产消费流程。

主要方法:

  • create_queue(name) - 创建队列
  • create_producer_group(...) - 创建生产者组
  • create_consumer_group(...) - 创建消费者组
  • feed(queue_name, data) - 投放数据
  • end_feed(name) - 通知生产者结束
  • start() - 启动工厂
  • wait_complete() - 等待完成

ResourceConfig

资源配置类。

参数:

  • max_memory_mb - 最大内存使用量(MB),默认1024
  • max_queue_size - 队列最大长度,默认10000
  • disk_overflow_threshold - 磁盘溢出阈值,默认0.8
  • temp_dir - 临时目录,默认系统临时目录

高级功能

共享属性

# 设置所有工作者共享的属性
factory.set_shared_attr("multiplier", 3)

# 在处理函数中获取
def my_process(data, producer):
    multiplier = producer.get_attr("multiplier", 1)
    return {"result": data * multiplier}

本地属性

# 设置单个生产者的本地属性
producer.set_attr("name", "producer_1")

# 本地属性优先于共享属性
value = producer.get_attr("name")

批量消费

factory.create_consumer_group(
    name="consumer",
    consume_func=my_consume,
    batch_size=5000,  # 每5000条批量处理
    batch_timeout=5.0  # 超时5秒也处理
)

许可证

MIT License

作者

stabvale

贡献

欢迎提交 Issue 和 Pull Request!

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

factory_queue-0.1.1.tar.gz (16.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

factory_queue-0.1.1-py3-none-any.whl (14.2 kB view details)

Uploaded Python 3

File details

Details for the file factory_queue-0.1.1.tar.gz.

File metadata

  • Download URL: factory_queue-0.1.1.tar.gz
  • Upload date:
  • Size: 16.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.9.13

File hashes

Hashes for factory_queue-0.1.1.tar.gz
Algorithm Hash digest
SHA256 0a52bb8d41f11d9f40d92df11d6e128bf21a0a8da37acca314fdcd0bd9b9159b
MD5 0d6d8838b58970c4a9d4a315e14b2844
BLAKE2b-256 b32e24ff4f1b59ebab948e924c70c4862091f42c6dd2569e6ee23a4396814660

See more details on using hashes here.

File details

Details for the file factory_queue-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: factory_queue-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 14.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.9.13

File hashes

Hashes for factory_queue-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 beb0452e5c9acc0858e154d48ba82aae23bf828f8dbb785d50fc3c18941b991b
MD5 1c7a75dce809bbc9d819bc0d649d3001
BLAKE2b-256 efc70df96916a1d70cfcf850debb72629d7a52df128a83c52f1d0e6195a0397b

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page