Skip to main content

A scalable, distributed producer/consumer framework for Python

Project description

py-HiveFlow

A scalable, distributed producer/consumer framework for Python that simplifies building resilient task processing systems.

License: MIT Python 3.12+ Docker PyPI

Overview

py-HiveFlow provides a flexible architecture for distributing and processing tasks across multiple workers. Built on modern Python and containerization practices, it enables developers to create highly available, fault-tolerant data processing pipelines with minimal boilerplate code.

Key Features

  • Modular Architecture: Easily extendable coordinator and worker components
  • Flexible Task Processing: Define custom task types and processing logic
  • Real-time Monitoring: Web dashboard for system health and performance metrics
  • Scalable Design: Seamlessly add workers to increase processing capacity
  • Fault Tolerance: Automatic task recovery and worker health monitoring
  • Containerized: Docker and docker-compose support for simple deployment
  • Resource Management: Smart resource allocation and rate limiting

Quick Start

Using Docker

All Docker-related configurations are maintained in the ./docker directory:

# Clone the repository
git clone https://github.com/changyy/py-HiveFlow.git
cd py-HiveFlow

# Start the basic system using the docker-compose file in the docker directory
docker-compose -f docker/docker-compose.yml up -d

# For development environment
docker-compose -f docker/docker-compose.dev.yml up -d

# For production environment
docker-compose -f docker/docker-compose.prod.yml up -d

Visit http://localhost:8080 to access the monitoring dashboard.

Using Python Package

# Install the package
pip install py-hiveflow

# Initialize a new project
hiveflow init my-project
cd my-project

# Create a custom worker
hiveflow create worker my-custom-worker

# Start the system
hiveflow start

Example Usage

Defining Tasks

from hiveflow import Task

class DataProcessingTask(Task):
    def __init__(self, data_source, parameters=None):
        self.data_source = data_source
        self.parameters = parameters or {}
        
    def process(self, worker):
        # Processing logic here
        result = worker.process_data(self.data_source, **self.parameters)
        return {"status": "completed", "result": result}

Creating Custom Workers

from hiveflow import Worker

class DataProcessor(Worker):
    def setup(self):
        # Initialize resources
        self.processor = self.load_processor()
        
    def can_process(self, task):
        return isinstance(task, DataProcessingTask)
        
    def process_task(self, task):
        # Process the task
        return task.process(self)
        
    def process_data(self, data_source, **kwargs):
        # Implementation of data processing
        return self.processor.process(data_source, **kwargs)

Submitting Tasks

from hiveflow import Coordinator

# Connect to the coordinator
coordinator = Coordinator("redis://localhost:6379")

# Submit tasks
task = DataProcessingTask("s3://my-bucket/data.csv", {"format": "csv"})
task_id = coordinator.submit_task(task)

# Check task status
status = coordinator.get_task_status(task_id)
print(f"Task status: {status}")

# Get task result when completed
result = coordinator.get_task_result(task_id)

Architecture

py-HiveFlow consists of the following components:

  • Coordinator: Manages task distribution and worker coordination
  • Workers: Process assigned tasks based on capability
  • Storage: Persists task data and system state (Redis & PostgreSQL)
  • Monitor: Web interface for system visibility and management

Use Cases

  • Web crawling and data extraction
  • Batch processing and ETL workflows
  • Distributed data analysis
  • Background job processing
  • Service integration and webhooks processing
  • Scheduled task execution

Docker Container Structure

docker/
├── coordinator/                   # Coordinator service
│   ├── Dockerfile
│   └── requirements.txt
├── worker/                        # Generic worker
│   ├── Dockerfile
│   └── requirements.txt
├── monitor/                       # Web monitoring interface
│   ├── Dockerfile
│   └── requirements.txt
├── postgres/                      # PostgreSQL configuration
│   └── init.sql
├── redis/                         # Redis configuration
│   └── redis.conf
├── docker-compose.yml             # Main compose file
├── docker-compose.dev.yml         # Development setup
└── docker-compose.prod.yml        # Production setup

Documentation

For full documentation, visit docs/ or the official documentation.

Development

Prerequisites

  • Python 3.12+
  • Docker and Docker Compose (optional)
  • Redis
  • PostgreSQL

Setting Up Development Environment

# Clone the repository
git clone https://github.com/changyy/py-HiveFlow.git
cd py-HiveFlow

# Create a virtual environment
python -m venv venv
source venv/bin/activate  # On Windows: venv\Scripts\activate

# Install development dependencies
pip install -e ".[dev]"

# Run tests
pytest

Using Docker for Development

# Run the development environment
docker-compose -f docker/docker-compose.dev.yml up -d

# Run tests in Docker
docker-compose -f docker/docker-compose.test.yml up

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

  1. Fork the repository
  2. Create your feature branch (git checkout -b feature/amazing-feature)
  3. Commit your changes (git commit -m 'Add some amazing feature')
  4. Push to the branch (git push origin feature/amazing-feature)
  5. Open a Pull Request

License

This project is licensed under the MIT License - see the LICENSE file for details.

Acknowledgments

py-HiveFlow was inspired by various distributed systems and task processing frameworks, aiming to provide a simpler yet powerful alternative for Python developers.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

hiveflow-1.0.1.tar.gz (19.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

hiveflow-1.0.1-py3-none-any.whl (27.0 kB view details)

Uploaded Python 3

File details

Details for the file hiveflow-1.0.1.tar.gz.

File metadata

  • Download URL: hiveflow-1.0.1.tar.gz
  • Upload date:
  • Size: 19.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for hiveflow-1.0.1.tar.gz
Algorithm Hash digest
SHA256 544eaf8cdd48d0a8ac2947b01d4a3e198137052dd5884ff67a2fb9887619726d
MD5 a9c70cb794001cf92ac4c0240ac04c4c
BLAKE2b-256 2634c2ad61beb2728a773198c6121fe70a38a052cf3acf48b4b94e1531f8fefc

See more details on using hashes here.

Provenance

The following attestation bundles were made for hiveflow-1.0.1.tar.gz:

Publisher: python-publish.yml on changyy/py-HiveFlow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file hiveflow-1.0.1-py3-none-any.whl.

File metadata

  • Download URL: hiveflow-1.0.1-py3-none-any.whl
  • Upload date:
  • Size: 27.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for hiveflow-1.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 cf18d2c377873cecc54b3de05a2aa027fd6e0caef050dd943c2c94297096a32f
MD5 e116c5f53574d8aa78dcc18c3f1a0b6c
BLAKE2b-256 554fc430915260799e0b780c554b0f1115e558ca8847a624d0fed5ba10a5e69e

See more details on using hashes here.

Provenance

The following attestation bundles were made for hiveflow-1.0.1-py3-none-any.whl:

Publisher: python-publish.yml on changyy/py-HiveFlow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page