A horizontally scalable data movement and transformation framework
Project description
Reflowfy
A horizontally scalable data movement and transformation framework
Reflowfy enables you to build pipelines that fetch data from sources, apply custom transformations, and send results to destinations—all with millions+ record scalability.
🎯 Key Features
- Horizontally Scalable: Process millions of records in parallel
- Kafka-Based: Reliable message queue for job distribution
- Kubernetes-Native: KEDA autoscaling from 0 to N workers
- Order-Independent: Maximum parallelism without coordination overhead
- User-Extensible: Plugin architecture for sources, destinations, and transformations
- Two Execution Modes: Local testing and distributed production execution
🏗 Architecture
User Request
↓ HTTP
API (FastAPI) ────→ ReflowManager Service (port 8001)
│ ↓
│ PostgreSQL (state + checkpoints)
│ ↓
│ Kafka Producer (rate limited) → Kafka Topic (reflow.jobs)
│ ↓
│ Worker Pool (KEDA scaled)
│ ↓
└─→ Execution Tracking Destinations
Components:
- API: Orchestration, job splitting, route generation
- ReflowManager: Rate limiting, state management, checkpointing
- PostgreSQL: Persistent state storage for executions and checkpoints
- Kafka: Job queue and load balancing
- Workers: Generic executors that process jobs
- KEDA: Kafka lag-based autoscaling
� Documentation
- Fresh Project Guide: Start here to build a new project.
- ReflowManager Architecture: Deep dive into the rate-limiting core.
- OpenShift Deployment: Enterprise deployment guide.
- E2E Testing: How to test your pipelines.
�🚀 Quick Start
1. Define a Custom Transformation
from reflowfy import BaseTransformation
class XmlToJson(BaseTransformation):
name = "xml_to_json"
def apply(self, records, context):
# Your transformation logic
return [parse_xml(r) for r in records]
2. Build a Pipeline
from reflowfy import build_pipeline, pipeline_registry
from reflowfy import elastic_source, kafka_destination
# Configure source with runtime parameters
source = elastic_source(
url="http://elasticsearch:9200",
index="logs-*",
base_query={
"query": {
"range": {
"@timestamp": {
"gte": "{{ start_time }}", # Runtime parameter
"lte": "{{ end_time }}"
}
}
}
}
)
# Configure destination
destination = kafka_destination(
bootstrap_servers="kafka:9092",
topic="processed-logs"
)
# Build and register
pipeline = build_pipeline(
name="elastic_xml_pipeline",
source=source,
transformations=[XmlToJson()],
destination=destination,
rate_limit={"jobs_per_second": 50}
)
pipeline_registry.register(pipeline)
3. Start the API
# In your main.py
from reflowfy.api.app import main
import examples.xml_to_json_pipeline # Import to trigger registration
if __name__ == "__main__":
main()
4. Execute Pipeline
Run Distributed (async via Kafka):
curl -X POST http://localhost:8001/run \
-H "Content-Type: application/json" \
-d '{
"pipeline_name": "elastic_xml_pipeline",
"runtime_params": {
"start_time": "2024-01-01",
"end_time": "2024-01-02"
}
}'
Dry Run (Preview jobs without executing):
curl -X POST http://localhost:8001/run \
-H "Content-Type: application/json" \
-d '{
"pipeline_name": "elastic_xml_pipeline",
"runtime_params": {
"start_time": "2024-01-01",
"end_time": "2024-01-02"
},
"dry_run": true
}'
Returns a preview of the job execution plan, sample records, and configuration.
📦 Installation
# Using pip
pip install -e .
# Using Docker
docker build -f Dockerfile.api -t reflowfy-api .
docker build -f Dockerfile.worker -t reflowfy-worker .
🔌 Built-in Connectors
Sources
- Elasticsearch: Scroll-based pagination with runtime parameters
- SQL: ID range and offset-based splitting (Postgres, MySQL, etc.)
- HTTP API: Offset/cursor pagination with authentication
Destinations
- Kafka: Batching, compression, health checks
- HTTP: Webhooks with retry logic
⚙️ Configuration
Environment Variables
API:
API_HOST=0.0.0.0
API_PORT=8000
KAFKA_BOOTSTRAP_SERVERS=kafka:9092
KAFKA_TOPIC=reflow.jobs
Worker:
KAFKA_BOOTSTRAP_SERVERS=kafka:9092
KAFKA_TOPIC=reflow.jobs
KAFKA_GROUP_ID=reflowfy-workers
| Mode | Endpoint | Use Case | Kafka | Workers |
|---|---|---|---|---|
| Distributed | POST /run |
Production execution | ✅ | ✅ |
| Dry Run | POST /run (dry_run=true) |
Preview/Testing | ❌ | ❌ |
📊 Monitoring
Reflowfy exposes Prometheus metrics:
reflowfy_jobs_processed_total- Total jobs processedreflowfy_job_processing_duration_seconds- Job processing timereflowfy_records_processed_total- Total records processedreflowfy_active_workers- Active worker count
🐳 Kubernetes Deployment
# Deploy with Helm (using bundled charts)
# Note: For production, we recommend using the CLI 'reflowfy deploy'
helm install reflowfy-api ./reflowfy/helm/reflowfy-api
helm install reflowfy-worker ./reflowfy/helm/reflowfy-worker
KEDA will automatically scale workers based on Kafka lag.
📝 License
MIT
🤝 Contributing
Contributions welcome! This is a production-grade framework designed for real-world data processing at scale.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file reflowfy-0.1.3.tar.gz.
File metadata
- Download URL: reflowfy-0.1.3.tar.gz
- Upload date:
- Size: 172.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ecaa06f8d0281e2d4a8a9b5ea4f50811ae9067e163c1d4fc52c081c463d7ca2d
|
|
| MD5 |
40f31a566fba5459826240d05600f4c6
|
|
| BLAKE2b-256 |
40f3f050032e9467be64377296a7cc46c8b0cbea2fc8fff2eca9d0cc81d89eb5
|
File details
Details for the file reflowfy-0.1.3-py3-none-any.whl.
File metadata
- Download URL: reflowfy-0.1.3-py3-none-any.whl
- Upload date:
- Size: 200.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
68b14b96d615733622218898f64671fa2f4ae9605bdd392bad818d2f409ad927
|
|
| MD5 |
95cbb3c2400c891c05a7b680d1581828
|
|
| BLAKE2b-256 |
756c7d17e3778cd8e020488d63ab554b4394957ea556c91ea4beea7372b25dd5
|