A scalable, distributed producer/consumer framework for Python
Project description
py-HiveFlow
A scalable, distributed producer/consumer framework for Python that simplifies building resilient task processing systems.
Overview
py-HiveFlow provides a flexible architecture for distributing and processing tasks across multiple workers. Built on modern Python and containerization practices, it enables developers to create highly available, fault-tolerant data processing pipelines with minimal boilerplate code.
Key Features
- Modular Architecture: Easily extendable coordinator and worker components
- Flexible Task Processing: Define custom task types and processing logic
- Real-time Monitoring: Web dashboard for system health and performance metrics
- Scalable Design: Seamlessly add workers to increase processing capacity
- Fault Tolerance: Automatic task recovery and worker health monitoring
- Containerized: Docker and docker-compose support for simple deployment
- Resource Management: Smart resource allocation and rate limiting
Quick Start
Using Docker
All Docker-related configurations are maintained in the ./docker directory:
# Clone the repository
git clone https://github.com/changyy/py-HiveFlow.git
cd py-HiveFlow
# Start the basic system using the docker-compose file in the docker directory
docker-compose -f docker/docker-compose.yml up -d
# For development environment
docker-compose -f docker/docker-compose.dev.yml up -d
# For production environment
docker-compose -f docker/docker-compose.prod.yml up -d
Visit http://localhost:8080 to access the monitoring dashboard.
Using Python Package
# Install the package
pip install py-hiveflow
# Initialize a new project
hiveflow init my-project
cd my-project
# Create a custom worker
hiveflow create worker my-custom-worker
# Start the system
hiveflow start
Example Usage
Defining Tasks
from hiveflow import Task
class DataProcessingTask(Task):
def __init__(self, data_source, parameters=None):
self.data_source = data_source
self.parameters = parameters or {}
def process(self, worker):
# Processing logic here
result = worker.process_data(self.data_source, **self.parameters)
return {"status": "completed", "result": result}
Creating Custom Workers
from hiveflow import Worker
class DataProcessor(Worker):
def setup(self):
# Initialize resources
self.processor = self.load_processor()
def can_process(self, task):
return isinstance(task, DataProcessingTask)
def process_task(self, task):
# Process the task
return task.process(self)
def process_data(self, data_source, **kwargs):
# Implementation of data processing
return self.processor.process(data_source, **kwargs)
Submitting Tasks
from hiveflow import Coordinator
# Connect to the coordinator
coordinator = Coordinator("redis://localhost:6379")
# Submit tasks
task = DataProcessingTask("s3://my-bucket/data.csv", {"format": "csv"})
task_id = coordinator.submit_task(task)
# Check task status
status = coordinator.get_task_status(task_id)
print(f"Task status: {status}")
# Get task result when completed
result = coordinator.get_task_result(task_id)
Architecture
py-HiveFlow consists of the following components:
- Coordinator: Manages task distribution and worker coordination
- Workers: Process assigned tasks based on capability
- Storage: Persists task data and system state (Redis & PostgreSQL)
- Monitor: Web interface for system visibility and management
Use Cases
- Web crawling and data extraction
- Batch processing and ETL workflows
- Distributed data analysis
- Background job processing
- Service integration and webhooks processing
- Scheduled task execution
Docker Container Structure
docker/
├── coordinator/ # Coordinator service
│ ├── Dockerfile
│ └── requirements.txt
├── worker/ # Generic worker
│ ├── Dockerfile
│ └── requirements.txt
├── monitor/ # Web monitoring interface
│ ├── Dockerfile
│ └── requirements.txt
├── postgres/ # PostgreSQL configuration
│ └── init.sql
├── redis/ # Redis configuration
│ └── redis.conf
├── docker-compose.yml # Main compose file
├── docker-compose.dev.yml # Development setup
└── docker-compose.prod.yml # Production setup
Documentation
For full documentation, visit docs/ or the official documentation.
Development
Prerequisites
- Python 3.12+
- Docker and Docker Compose (optional)
- Redis
- PostgreSQL
Setting Up Development Environment
# Clone the repository
git clone https://github.com/changyy/py-HiveFlow.git
cd py-HiveFlow
# Create a virtual environment
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
# Install development dependencies
pip install -e ".[dev]"
# Run tests
pytest
Using Docker for Development
# Run the development environment
docker-compose -f docker/docker-compose.dev.yml up -d
# Run tests in Docker
docker-compose -f docker/docker-compose.test.yml up
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
- Fork the repository
- Create your feature branch (
git checkout -b feature/amazing-feature) - Commit your changes (
git commit -m 'Add some amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
License
This project is licensed under the MIT License - see the LICENSE file for details.
Acknowledgments
py-HiveFlow was inspired by various distributed systems and task processing frameworks, aiming to provide a simpler yet powerful alternative for Python developers.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file hiveflow-1.0.1.tar.gz.
File metadata
- Download URL: hiveflow-1.0.1.tar.gz
- Upload date:
- Size: 19.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
544eaf8cdd48d0a8ac2947b01d4a3e198137052dd5884ff67a2fb9887619726d
|
|
| MD5 |
a9c70cb794001cf92ac4c0240ac04c4c
|
|
| BLAKE2b-256 |
2634c2ad61beb2728a773198c6121fe70a38a052cf3acf48b4b94e1531f8fefc
|
Provenance
The following attestation bundles were made for hiveflow-1.0.1.tar.gz:
Publisher:
python-publish.yml on changyy/py-HiveFlow
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
hiveflow-1.0.1.tar.gz -
Subject digest:
544eaf8cdd48d0a8ac2947b01d4a3e198137052dd5884ff67a2fb9887619726d - Sigstore transparency entry: 177975988
- Sigstore integration time:
-
Permalink:
changyy/py-HiveFlow@074b3418498dcf6f26a1a864709a8dd71efcb75f -
Branch / Tag:
refs/tags/1.0.1 - Owner: https://github.com/changyy
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
python-publish.yml@074b3418498dcf6f26a1a864709a8dd71efcb75f -
Trigger Event:
release
-
Statement type:
File details
Details for the file hiveflow-1.0.1-py3-none-any.whl.
File metadata
- Download URL: hiveflow-1.0.1-py3-none-any.whl
- Upload date:
- Size: 27.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
cf18d2c377873cecc54b3de05a2aa027fd6e0caef050dd943c2c94297096a32f
|
|
| MD5 |
e116c5f53574d8aa78dcc18c3f1a0b6c
|
|
| BLAKE2b-256 |
554fc430915260799e0b780c554b0f1115e558ca8847a624d0fed5ba10a5e69e
|
Provenance
The following attestation bundles were made for hiveflow-1.0.1-py3-none-any.whl:
Publisher:
python-publish.yml on changyy/py-HiveFlow
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
hiveflow-1.0.1-py3-none-any.whl -
Subject digest:
cf18d2c377873cecc54b3de05a2aa027fd6e0caef050dd943c2c94297096a32f - Sigstore transparency entry: 177975989
- Sigstore integration time:
-
Permalink:
changyy/py-HiveFlow@074b3418498dcf6f26a1a864709a8dd71efcb75f -
Branch / Tag:
refs/tags/1.0.1 - Owner: https://github.com/changyy
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
python-publish.yml@074b3418498dcf6f26a1a864709a8dd71efcb75f -
Trigger Event:
release
-
Statement type: