Skip to main content

A simple workflow framework. Hamilton + APScheduler = FlowerPower

Project description

FlowerPower

Simple Workflow Framework - Hamilton + APScheduler = FlowerPower

FlowerPower Logo

📚 Table of Contents

  1. Overview
  2. Installation
  3. Getting Started
  4. Development

🔍 Overview

FlowerPower is a simple workflow framework based on two fantastic Python libraries:

Key Features

  • 🔄 Pipeline Workflows: Create and execute complex DAG-based workflows
  • Scheduling: Run pipelines at specific times or intervals
  • ⚙️ Parameterization: Easily configure pipeline parameters
  • 📊 Tracking: Monitor executions with Hamilton UI
  • 🛠️ Flexible Configuration: Simple YAML-based setup
  • 📡 Distributed Execution: Support for distributed environments

More details in Hamilton docs


📦 Installation

# Basic installation
pip install flowerpower

# With scheduling support
pip install "flowerpower[scheduler]"

# Additional components
pip install "flowerpower[scheduler,mqtt]"    # MQTT broker
pip install "flowerpower[scheduler,redis]"   # Redis broker
pip install "flowerpower[scheduler,mongodb]" # MongoDB store
pip install "flowerpower[scheduler,ray]"     # Ray computing
pip install "flowerpower[scheduler,dask]"    # Dask computing

🚀 Getting Started

Initialize Project

Option 1: Command Line

flowerpower init new-project
cd new-project

Option 2: Python

from flowerpower import init
init("new-project")

This creates basic config files:

  • conf/project.yml

📦 Optional: Project Management with UV (Recommended)

It is recommended to use the project manager uv to manage your project dependencies.

Installation

pip install uv

For more installation options, visit: https://docs.astral.sh/uv/getting-started/installation/

Project Initialization

uv init --app --no-readme --vcs git

Pipeline Management

Creating a New Pipeline

Option 1: Command Line

flowerpower add my_flow
# or
flowerpower new my_flow

Option 2: Python

# Using PipelineManager
from flowerpower.pipeline import PipelineManager
pm = PipelineManager()
pm.new("my_flow")

# Or using the new function directly
from flowerpower.pipeline import new
new("my_flow")

This creates the new pipeline and configuration file:

  • pipelines/my_flow.py
  • conf/pipelines/my_flow.yml

Setting Up a Pipeline

  1. Add Pipeline Functions Build your pipeline by adding the functions (nodes) to pipelines/my_flow.py that build the DAG, following the Hamilton paradigm.

  2. Parameterize Functions

You can parameterize functions in two ways:

Method 1: Default Values

def add_int_col(
    df: pd.DataFrame,
    col_name: str = "foo",
    values: str = "bar"
) -> pd.DataFrame:
    return df.assign(**{col_name: values})

Method 2: Configuration File

In conf/pipelines/my_flow.yml:

...
func:
  add_int_col:
    col_name: foo
    values: bar
...

Add the @parameterize decorator to the function in your pipeline file:

@parameterize(**PARAMS.add_int_col)
def add_int_col(
    df: pd.DataFrame,
    col_name: str,
    values: int
) -> pd.DataFrame:
    return df.assign(**{col_name: values})

Running Pipelines

Configuration

You can configure the pipeline parameters inputs, and final_vars, and other parameters in the pipeline configuration file conf/pipelines/my_flow.yml or directly in the pipeline execution function.

Using the Pipeline Configuration

...
run:
  inputs:
    data_path: path/to/data.csv
    fs_protocol: local
  final_vars: [add_int_col, final_df]
  # optional parameters
  with_tracker: false
  executor: threadpool # or processpool, ray, dask
...

Execution Methods

There are three ways to execute a pipeline:

  1. Direct Execution

    • Runs in current process
    • No data store required
  2. Job Execution

    • Runs as APScheduler job
    • Returns job results
    • Requires data store and event broker
  3. Async Job Addition

    • Adds to APScheduler
    • Returns job ID
    • Results retrievable from data store

Command Line Usage

# Note: add --inputs and --final-vars and other optional parameters if not specified in the config file
# Direct execution
flowerpower run my_flow
# Job execution
flowerpower run-job my_flow

# Add as scheduled job
flowerpower add-job my_flow

You can also use the --inputs and --final-vars flags to override the configuration file parameters or if they are not specified in the configuration file.

flowerpower run my_flow \
    --inputs data_path=path/to/data.csv,fs_protocol=local \
    --final-vars final_df \
    --executor threadpool
    --without-tracker

Python Usage

from flowerpower.pipeline import Pipeline, run, run_job, add_job

# Using Pipeline class
p = Pipeline("my_flow")
# Note: add inputs, final_vars, and other optional arguments if not specified in the config file
result = p.run()
result = p.run_job()
job_id = p.add_job()

# Using functions
result = run("my_flow")
result = run_job("my_flow")
job_id = add_job("my_flow")

You can also use the inputs and final-vars arguments to override the configuration file parameters or if they are not specified in the configuration file.

result = run(
    "my_flow",
    inputs={
        "data_path": "path/to/data.csv",
        "fs_protocol": "local"
    },
    final_vars=["final_df"],
    executor="threadpool",
    with_tracker=False
)


---
## ⏰ Scheduling Pipelines

### Setting Up Schedules

#### Command Line Options

```bash
# Run every 30 seconds
flowerpower schedule my_flow \
    --type interval \
    --interval-params seconds=30

# Run at specific date/time
flowerpower schedule my_flow \
    --type date \
    --date-params year=2022,month=1,day=1,hour=0,minute=0,second=0

# Run with cron parameters
flowerpower schedule my_flow \
    --type cron \
    --cron-params second=0,minute=0,hour=0,day=1,month=1,day_of_week=0

# Run with crontab expression
flowerpower schedule my_flow \
    --type cron \
    --crontab "0 0 1 1 0"

Python Usage

from flowerpower.scheduler import schedule, Pipeline

# Using Pipeline class
p = Pipeline("my_flow")
p.schedule("interval", seconds=30)

# Using schedule function
schedule("my_flow", "interval", seconds=30)

👷 Worker Management

Starting a Worker

Command Line

flowerpower start-worker

Python

# Using the SchedulerManager class
from flowerpower.scheduler import SchedulerManager
sm = SchedulerManager()
sm.start_worker()

# Using the start_worker function
from flowerpower.scheduler import start_worker
start_worker()

Worker Configuration

Configure your worker in conf/project.yml:

# PostgreSQL Configuration
data_store:
  type: postgres
  uri: postgresql+asyncpq://user:password@localhost:5432/flowerpower

# Redis Event Broker
event_broker:
  type: redis
  uri: redis://localhost:6379
  # Alternative configuration:
  # host: localhost
  # port: 6379

Alternative Data Store Options

SQLite

data_store:
  type: sqlite
  uri: sqlite+aiosqlite:///flowerpower.db

MySQL

data_store:
  type: mysql
  uri: mysql+aiomysql://user:password@localhost:3306/flowerpower

MongoDB

data_store:
  type: mongodb
  uri: mongodb://localhost:27017/flowerpower

In-Memory

data_store:
  type: memory

Alternative Event Broker Options

MQTT

event_broker:
  type: mqtt
  host: localhost
  port: 1883
  username: edge  # optional
  password: edge  # optional

Redis

event_broker:
  type: redis
  uri: redis://localhost:6379
  # Alternative configuration:
  # host: localhost
  # port: 6379

In-Memory

event_broker:
  type: memory

📊 Pipeline Tracking

Hamilton UI Setup

Local Installation

# Install UI package
pip install "flowerpower[ui]"

# Start UI server
flowerpower hamilton-ui

Access the UI at: http://localhost:8241

Docker Installation

# Clone Hamilton repository
git clone https://github.com/dagworks-inc/hamilton
cd hamilton/ui

# Start UI server
./run.sh

Access the UI at: http://localhost:8242

Tracker Configuration

Configure tracking in conf/project.yml:

username: my_email@example.com
api_url: http://localhost:8241
ui_url: http://localhost:8242
api_key: optional_key

And specify the tracker parameter in the pipeline configuration `conf/pipelines/my_flow.yml:

...
tracker:
  project_id: 1
  tags:
    environment: dev
    version: 1.0
  dag_name: my_flow_123
...

🛠️ Development Services

Local Development Setup

Download the docker-compose configuration:

curl -O https://raw.githubusercontent.com/legout/flowerpower/main/docker/docker-compose.yml

Starting Services

# MQTT Broker (EMQX)
docker-compose up mqtt -d

# Redis
docker-compose up redis -d

# MongoDB
docker-compose up mongodb -d

# PostgreSQL
docker-compose up postgres -d

📝 License

MIT License


🤝 Contributing

Contributions are welcome! Please feel free to submit a Pull Request.


📫 Support

For support, please open an issue in the GitHub repository.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

flowerpower-0.6.0.tar.gz (2.2 MB view details)

Uploaded Source

Built Distribution

flowerpower-0.6.0-py3-none-any.whl (25.2 kB view details)

Uploaded Python 3

File details

Details for the file flowerpower-0.6.0.tar.gz.

File metadata

  • Download URL: flowerpower-0.6.0.tar.gz
  • Upload date:
  • Size: 2.2 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.12.7

File hashes

Hashes for flowerpower-0.6.0.tar.gz
Algorithm Hash digest
SHA256 fe0c9cf80d560eb4eac112a04aff1f4268f52dc4f08414306bf7754834e6fcc2
MD5 f4cf3325d619c6f805e0c53d5294ad09
BLAKE2b-256 d08360cacc9fdf6b394c1315ae6474a0f2d9e0f1cd2fffcf267640ae2e2bc2fa

See more details on using hashes here.

File details

Details for the file flowerpower-0.6.0-py3-none-any.whl.

File metadata

  • Download URL: flowerpower-0.6.0-py3-none-any.whl
  • Upload date:
  • Size: 25.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.12.7

File hashes

Hashes for flowerpower-0.6.0-py3-none-any.whl
Algorithm Hash digest
SHA256 453efc03f8249a9bd343180cd09158abda9aaf1a3d217a6514f45e77a46c6a74
MD5 d1bf9c3d6194ad34b6b5a22fa71f6647
BLAKE2b-256 9e2f53a2317879ec351fea6245208b466e3c7ff03e055b3a43eff4c4bb0a82e3

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page