A simple workflow framework. Hamilton + APScheduler = FlowerPower
Project description
FlowerPower
Simple Workflow Framework - Hamilton + APScheduler = FlowerPower
📚 Table of Contents
🔍 Overview
FlowerPower is a simple workflow framework based on two fantastic Python libraries:
- Hamilton: Creates DAGs from your pipeline functions
- APScheduler: Handles pipeline scheduling
Key Features
- 🔄 Pipeline Workflows: Create and execute complex DAG-based workflows
- ⏰ Scheduling: Run pipelines at specific times or intervals
- ⚙️ Parameterization: Easily configure pipeline parameters
- 📊 Tracking: Monitor executions with Hamilton UI
- 🛠️ Flexible Configuration: Simple YAML-based setup
- 📡 Distributed Execution: Support for distributed environments
📦 Installation
# Basic installation
pip install flowerpower
# With scheduling support
pip install "flowerpower[scheduler]"
# Additional components
pip install "flowerpower[scheduler,mqtt]" # MQTT broker
pip install "flowerpower[scheduler,redis]" # Redis broker
pip install "flowerpower[scheduler,mongodb]" # MongoDB store
pip install "flowerpower[scheduler,ray]" # Ray computing
pip install "flowerpower[scheduler,dask]" # Dask computing
🚀 Getting Started
Initialize Project
Option 1: Command Line
flowerpower init new-project
cd new-project
Option 2: Python
from flowerpower import init
init("new-project")
This creates basic config files:
conf/project.yml
📦 Optional: Project Management with UV (Recommended)
It is recommended to use the project manager uv
to manage your project dependencies.
Installation
pip install uv
For more installation options, visit: https://docs.astral.sh/uv/getting-started/installation/
Project Initialization
uv init --app --no-readme --vcs git
Pipeline Management
Creating a New Pipeline
Option 1: Command Line
flowerpower add my_flow
# or
flowerpower new my_flow
Option 2: Python
# Using PipelineManager
from flowerpower.pipeline import PipelineManager
pm = PipelineManager()
pm.new("my_flow")
# Or using the new function directly
from flowerpower.pipeline import new
new("my_flow")
This creates the new pipeline and configuration file:
pipelines/my_flow.py
conf/pipelines/my_flow.yml
Setting Up a Pipeline
-
Add Pipeline Functions Build your pipeline by adding the functions (nodes) to
pipelines/my_flow.py
that build the DAG, following the Hamilton paradigm. -
Parameterize Functions
You can parameterize functions in two ways:
Method 1: Default Values
def add_int_col(
df: pd.DataFrame,
col_name: str = "foo",
values: str = "bar"
) -> pd.DataFrame:
return df.assign(**{col_name: values})
Method 2: Configuration File
In conf/pipelines/my_flow.yml
:
...
func:
add_int_col:
col_name: foo
values: bar
...
Add the @parameterize
decorator to the function in your pipeline file:
@parameterize(**PARAMS.add_int_col)
def add_int_col(
df: pd.DataFrame,
col_name: str,
values: int
) -> pd.DataFrame:
return df.assign(**{col_name: values})
Running Pipelines
Configuration
You can configure the pipeline parameters inputs
, and final_vars
, and other parameters in the pipeline
configuration file conf/pipelines/my_flow.yml
or directly in the pipeline execution function.
Using the Pipeline Configuration
...
run:
inputs:
data_path: path/to/data.csv
fs_protocol: local
final_vars: [add_int_col, final_df]
# optional parameters
with_tracker: false
executor: threadpool # or processpool, ray, dask
...
Execution Methods
There are three ways to execute a pipeline:
-
Direct Execution
- Runs in current process
- No data store required
-
Job Execution
- Runs as APScheduler job
- Returns job results
- Requires data store and event broker
-
Async Job Addition
- Adds to APScheduler
- Returns job ID
- Results retrievable from data store
Command Line Usage
# Note: add --inputs and --final-vars and other optional parameters if not specified in the config file
# Direct execution
flowerpower run my_flow
# Job execution
flowerpower run-job my_flow
# Add as scheduled job
flowerpower add-job my_flow
You can also use the --inputs
and --final-vars
flags to override the configuration file parameters or if they are not specified in the configuration file.
flowerpower run my_flow \
--inputs data_path=path/to/data.csv,fs_protocol=local \
--final-vars final_df \
--executor threadpool
--without-tracker
Python Usage
from flowerpower.pipeline import Pipeline, run, run_job, add_job
# Using Pipeline class
p = Pipeline("my_flow")
# Note: add inputs, final_vars, and other optional arguments if not specified in the config file
result = p.run()
result = p.run_job()
job_id = p.add_job()
# Using functions
result = run("my_flow")
result = run_job("my_flow")
job_id = add_job("my_flow")
You can also use the inputs
and final-vars
arguments to override the configuration file parameters or if they are not specified in the configuration file.
result = run(
"my_flow",
inputs={
"data_path": "path/to/data.csv",
"fs_protocol": "local"
},
final_vars=["final_df"],
executor="threadpool",
with_tracker=False
)
---
## ⏰ Scheduling Pipelines
### Setting Up Schedules
#### Command Line Options
```bash
# Run every 30 seconds
flowerpower schedule my_flow \
--type interval \
--interval-params seconds=30
# Run at specific date/time
flowerpower schedule my_flow \
--type date \
--date-params year=2022,month=1,day=1,hour=0,minute=0,second=0
# Run with cron parameters
flowerpower schedule my_flow \
--type cron \
--cron-params second=0,minute=0,hour=0,day=1,month=1,day_of_week=0
# Run with crontab expression
flowerpower schedule my_flow \
--type cron \
--crontab "0 0 1 1 0"
Python Usage
from flowerpower.scheduler import schedule, Pipeline
# Using Pipeline class
p = Pipeline("my_flow")
p.schedule("interval", seconds=30)
# Using schedule function
schedule("my_flow", "interval", seconds=30)
👷 Worker Management
Starting a Worker
Command Line
flowerpower start-worker
Python
# Using the SchedulerManager class
from flowerpower.scheduler import SchedulerManager
sm = SchedulerManager()
sm.start_worker()
# Using the start_worker function
from flowerpower.scheduler import start_worker
start_worker()
Worker Configuration
Configure your worker in conf/project.yml
:
# PostgreSQL Configuration
data_store:
type: postgres
uri: postgresql+asyncpq://user:password@localhost:5432/flowerpower
# Redis Event Broker
event_broker:
type: redis
uri: redis://localhost:6379
# Alternative configuration:
# host: localhost
# port: 6379
Alternative Data Store Options
SQLite
data_store:
type: sqlite
uri: sqlite+aiosqlite:///flowerpower.db
MySQL
data_store:
type: mysql
uri: mysql+aiomysql://user:password@localhost:3306/flowerpower
MongoDB
data_store:
type: mongodb
uri: mongodb://localhost:27017/flowerpower
In-Memory
data_store:
type: memory
Alternative Event Broker Options
MQTT
event_broker:
type: mqtt
host: localhost
port: 1883
username: edge # optional
password: edge # optional
Redis
event_broker:
type: redis
uri: redis://localhost:6379
# Alternative configuration:
# host: localhost
# port: 6379
In-Memory
event_broker:
type: memory
📊 Pipeline Tracking
Hamilton UI Setup
Local Installation
# Install UI package
pip install "flowerpower[ui]"
# Start UI server
flowerpower hamilton-ui
Access the UI at: http://localhost:8241
Docker Installation
# Clone Hamilton repository
git clone https://github.com/dagworks-inc/hamilton
cd hamilton/ui
# Start UI server
./run.sh
Access the UI at: http://localhost:8242
Tracker Configuration
Configure tracking in conf/project.yml
:
username: my_email@example.com
api_url: http://localhost:8241
ui_url: http://localhost:8242
api_key: optional_key
And specify the tracker
parameter in the pipeline configuration `conf/pipelines/my_flow.yml:
...
tracker:
project_id: 1
tags:
environment: dev
version: 1.0
dag_name: my_flow_123
...
🛠️ Development Services
Local Development Setup
Download the docker-compose configuration:
curl -O https://raw.githubusercontent.com/legout/flowerpower/main/docker/docker-compose.yml
Starting Services
# MQTT Broker (EMQX)
docker-compose up mqtt -d
# Redis
docker-compose up redis -d
# MongoDB
docker-compose up mongodb -d
# PostgreSQL
docker-compose up postgres -d
📝 License
🤝 Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
📫 Support
For support, please open an issue in the GitHub repository.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file flowerpower-0.6.0.tar.gz
.
File metadata
- Download URL: flowerpower-0.6.0.tar.gz
- Upload date:
- Size: 2.2 MB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.12.7
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | fe0c9cf80d560eb4eac112a04aff1f4268f52dc4f08414306bf7754834e6fcc2 |
|
MD5 | f4cf3325d619c6f805e0c53d5294ad09 |
|
BLAKE2b-256 | d08360cacc9fdf6b394c1315ae6474a0f2d9e0f1cd2fffcf267640ae2e2bc2fa |
File details
Details for the file flowerpower-0.6.0-py3-none-any.whl
.
File metadata
- Download URL: flowerpower-0.6.0-py3-none-any.whl
- Upload date:
- Size: 25.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.12.7
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 453efc03f8249a9bd343180cd09158abda9aaf1a3d217a6514f45e77a46c6a74 |
|
MD5 | d1bf9c3d6194ad34b6b5a22fa71f6647 |
|
BLAKE2b-256 | 9e2f53a2317879ec351fea6245208b466e3c7ff03e055b3a43eff4c4bb0a82e3 |