Skip to main content

Run your workflow with lightweight library. This is how broflow flows.

Project description

broflow

A workflow-agnostic Python library for building readable, maintainable pipelines and workflows. broflow provides an intuitive framework for orchestrating complex data processing, ML pipelines, web scraping, automation tasks, and more.

Learn once, use everywhere - from data science to web automation, broflow adapts to your workflow needs with a consistent, readable syntax.

What is broflow?

broflow is a lightweight workflow orchestration library that lets you:

  • Chain actions using the intuitive >> operator
  • Run tasks in parallel with built-in concurrency support
  • Branch conditionally based on action results
  • Share state across all workflow steps
  • Visualize workflows with automatic Mermaid diagram generation
  • Debug easily with built-in logging and state inspection

Unlike heavy workflow engines, broflow focuses on simplicity and readability while providing powerful orchestration capabilities.

Key Features

  • ๐Ÿ”„ Sequential Workflows: Chain actions with simple >> operator
  • โšก Parallel Execution: Run independent actions simultaneously with ParallelAction
  • ๐Ÿ”€ Conditional Branching: Route workflows based on action results
  • ๐ŸŒ Global State Management: Share data across actions with built-in state
  • ๐Ÿ› ๏ธ Tool Integration: Built-in utilities for parameter validation and extraction
  • ๐Ÿ“Š Visual Flow Charts: Generate Mermaid diagrams of your workflows

Quick Start

Basic Sequential Workflow

from broflow import Action, Flow, Start, End

# Define custom actions
class LoadData(Action):
    def run(self, shared):
        # Simulate data loading
        shared['data'] = [1, 2, 3, 4, 5]
        print(f"Loaded {len(shared['data'])} items")
        return shared

class ProcessData(Action):
    def run(self, shared):
        # Process the data
        shared['processed'] = [x * 2 for x in shared['data']]
        print(f"Processed data: {shared['processed']}")
        return shared

class SaveResult(Action):
    def run(self, shared):
        # Save results
        print(f"Saved {len(shared['processed'])} processed items")
        shared['saved'] = True
        return shared

# Build and execute workflow
start = Start("๐Ÿš€ Starting data pipeline")
load = LoadData()
process = ProcessData()
save = SaveResult()
end = End("โœ… Pipeline completed successfully")

# Chain actions with >> operator
start >> load >> process >> save >> end

# Execute the workflow
flow = Flow(start, name="DataPipeline")
result = flow.run({})

Output:

๐Ÿš€ Starting data pipeline
Loaded 5 items
Processed data: [2, 4, 6, 8, 10]
Saved 5 processed items
โœ… Pipeline completed successfully

Parallel Processing

Run independent tasks simultaneously to improve performance:

from broflow import Action, Flow, Start, End
from broflow.parallel_action import ParallelAction
import time

class FetchUserData(Action):
    def run(self, shared):
        time.sleep(1)  # Simulate API call
        return {'users': ['alice', 'bob', 'charlie']}

class FetchProductData(Action):
    def run(self, shared):
        time.sleep(1)  # Simulate API call
        return {'products': ['laptop', 'mouse', 'keyboard']}

class FetchOrderData(Action):
    def run(self, shared):
        time.sleep(1)  # Simulate API call
        return {'orders': [101, 102, 103]}

class MergeData(Action):
    def run(self, shared):
        # Access parallel results
        parallel_results = shared.get('parallel', {})
        shared['merged'] = {
            'total_users': len(parallel_results.get('fetchuserdata', {}).get('users', [])),
            'total_products': len(parallel_results.get('fetchproductdata', {}).get('products', [])),
            'total_orders': len(parallel_results.get('fetchorderdata', {}).get('orders', []))
        }
        print(f"Merged data: {shared['merged']}")
        return shared

# Create parallel action
parallel_fetch = ParallelAction(
    FetchUserData(),
    FetchProductData(), 
    FetchOrderData(),
    result_key='parallel'
)

# Build workflow
start = Start("๐Ÿ”„ Fetching data in parallel")
merge = MergeData()
end = End("๐Ÿ“Š Data aggregation complete")

start >> parallel_fetch >> merge >> end

# Execute (runs in ~1 second instead of 3)
flow = Flow(start)
flow.run({})

Use Cases

broflow excels in various domains:

๐Ÿค– Machine Learning

  • Feature engineering pipelines
  • Model training workflows
  • Data preprocessing chains
  • Hyperparameter optimization

๐ŸŒ Web Automation

  • Multi-site web scraping
  • API data aggregation
  • Content processing pipelines
  • Notification systems

๐Ÿ“Š Data Processing

  • ETL pipelines
  • Data validation workflows
  • Report generation
  • Multi-source data integration

๐Ÿ”ง DevOps & Automation

  • Deployment pipelines
  • Testing workflows
  • Monitoring systems
  • Batch processing jobs

Advanced Features

Conditional Workflows

Route workflow execution based on action results:

class ValidateData(Action):
    def run(self, shared):
        data_quality = shared.get('quality_score', 0)
        print(f"Data quality score: {data_quality}")
        
        if data_quality >= 0.8:
            self.next_action = 'high_quality'
            print("โœ… High quality data - proceeding directly")
        else:
            self.next_action = 'needs_cleaning'
            print("โš ๏ธ Low quality data - cleaning required")
        return shared

class CleanData(Action):
    def run(self, shared):
        print("๐Ÿงน Cleaning data...")
        shared['quality_score'] = 0.9  # Improved after cleaning
        return shared

class ProcessCleanData(Action):
    def run(self, shared):
        print("โšก Processing clean data")
        shared['processed'] = True
        return shared

# Build conditional workflow
start = Start("๐Ÿ” Starting data validation")
validator = ValidateData()
clean = CleanData()
process = ProcessCleanData()
end = End("๐ŸŽฏ Processing complete")

# Set up conditional branches
validator - 'high_quality' >> process >> end
validator - 'needs_cleaning' >> clean >> process >> end

start >> validator

# Test with low quality data
flow = Flow(start)
flow.run({'quality_score': 0.5})

print("\n" + "="*50 + "\n")

# Test with high quality data  
flow2 = Flow(start)
flow2.run({'quality_score': 0.9})

Global State & Configuration

Manage configuration and shared state across your entire workflow:

from broflow import Action, Flow, Start, End, state
from broflow.config import load_config, save_config

# Create a config file
config_data = {
    'debug': True,
    'api_endpoint': 'https://api.example.com',
    'batch_size': 100,
    'retry_count': 3
}

# Save config
save_config('workflow_config.json')

class ConfigurableAction(Action):
    def run(self, shared):
        # Access global configuration
        endpoint = state.get('api_endpoint')
        batch_size = state.get('batch_size', 50)
        debug = state.get('debug', False)
        
        if debug:
            print(f"๐Ÿ”ง Using endpoint: {endpoint}")
            print(f"๐Ÿ“ฆ Batch size: {batch_size}")
        
        # Update global state
        state.set('last_run', 'success')
        return shared

# Load configuration at startup
load_config('workflow_config.json')

# Use in workflow
start = Start("โš™๏ธ Configurable workflow")
action = ConfigurableAction()
end = End("๐Ÿ Workflow finished")

start >> action >> end

flow = Flow(start)
flow.run({})

# Check updated state
print(f"Last run status: {state.get('last_run')}")

Visual Workflow Documentation

Automatically generate visual documentation of your workflows:

from broflow import Action, Flow, Start, End
from broflow.parallel_action import ParallelAction

class ExtractData(Action):
    pass

class TransformData(Action):
    pass

class ValidateData(Action):
    def run(self, shared):
        # Conditional logic
        if shared.get('valid', True):
            self.next_action = 'success'
        else:
            self.next_action = 'retry'
        return shared

class LoadData(Action):
    pass

class RetryProcess(Action):
    pass

# Build complex workflow
start = Start("ETL Pipeline")
extract = ExtractData()
transform = TransformData()
validate = ValidateData()
load = LoadData()
retry = RetryProcess()
end = End("Pipeline Complete")

# Chain with conditional branching
start >> extract >> transform >> validate
validate - 'success' >> load >> end
validate - 'retry' >> retry >> transform

# Generate visual documentation
flow = Flow(start, name="ETL_Pipeline")
print(flow.to_mermaid())

# Save to file
flow.save_mermaid('etl_workflow.md')

Generated Mermaid Diagram:

flowchart TD
    ETL_Pipeline -->|default| ExtractData
    ExtractData -->|default| TransformData
    TransformData -->|default| ValidateData
    ValidateData -->|success| LoadData
    ValidateData -->|retry| RetryProcess
    LoadData -->|default| End
    RetryProcess -->|default| TransformData

Installation

pip install broflow

Documentation

Real-World Examples

Machine Learning Pipeline

from broflow import Action, Flow, Start, End
from broflow.parallel_action import ParallelAction

class LoadDataset(Action):
    def run(self, shared):
        # Load your dataset
        shared['raw_data'] = "dataset.csv"  # Placeholder
        return shared

class FeatureEngineering(Action):
    def run(self, shared):
        return {'features': 'engineered_features'}

class ScaleFeatures(Action):
    def run(self, shared):
        return {'scaled': 'scaled_features'}

class SelectFeatures(Action):
    def run(self, shared):
        return {'selected': 'selected_features'}

class TrainModel(Action):
    def run(self, shared):
        parallel_results = shared.get('parallel', {})
        print("๐Ÿค– Training model with processed features")
        shared['model'] = 'trained_model'
        return shared

class EvaluateModel(Action):
    def run(self, shared):
        print("๐Ÿ“Š Model accuracy: 94.5%")
        shared['accuracy'] = 0.945
        return shared

# Build ML pipeline
start = Start("๐Ÿง  ML Pipeline Started")
load = LoadDataset()

# Parallel feature processing
feature_pipeline = ParallelAction(
    FeatureEngineering(),
    ScaleFeatures(),
    SelectFeatures()
)

train = TrainModel()
evaluate = EvaluateModel()
end = End("๐ŸŽ‰ ML Pipeline Complete")

start >> load >> feature_pipeline >> train >> evaluate >> end

flow = Flow(start, name="ML_Pipeline")
flow.run({})

Web Scraping Pipeline

class ScrapeWebsite(Action):
    def run(self, shared):
        url = shared.get('url')
        print(f"๐Ÿ•ท๏ธ Scraping {url}")
        return {'scraped_data': f'data_from_{url}'}

class ParseContent(Action):
    def run(self, shared):
        parallel_results = shared.get('parallel', {})
        print("๐Ÿ“ Parsing scraped content")
        return shared

class SaveToDatabase(Action):
    def run(self, shared):
        print("๐Ÿ’พ Saving to database")
        return shared

# Scrape multiple sites in parallel
scrape_parallel = ParallelAction(
    ScrapeWebsite(),
    ScrapeWebsite(),
    ScrapeWebsite()
)

start = Start("๐ŸŒ Web Scraping Started")
parse = ParseContent()
save = SaveToDatabase()
end = End("๐Ÿ“š Data Collection Complete")

start >> scrape_parallel >> parse >> save >> end

flow = Flow(start)
flow.run({
    'urls': ['site1.com', 'site2.com', 'site3.com']
})

Why Choose broflow?

๐ŸŽฏ Readable & Intuitive

Workflows read like natural language:

start >> load_data >> clean_data >> ParallelAction(feature1, feature2, feature3) >> train_model >> save_model >> end

๐Ÿ”ง Flexible Architecture

  • Sequential processing: Chain actions with >>
  • Parallel execution: Use ParallelAction for concurrency
  • Conditional branching: Route based on results with -
  • State management: Share data across all actions
  • Mixed patterns: Combine all approaches seamlessly

๐Ÿ› Easy Debugging

  • Built-in debug logging
  • Visual workflow generation
  • State inspection at any point
  • Clear error propagation

๐Ÿ“ˆ Scalable Design

  • From simple scripts to complex production pipelines
  • Lightweight with minimal dependencies
  • Extensible action system
  • Framework-agnostic approach

Advanced Features

Custom Action Base Classes

from broflow import Action

class DatabaseAction(Action):
    """Base class for database operations"""
    def __init__(self, connection_string):
        super().__init__()
        self.connection_string = connection_string
    
    def connect(self):
        # Database connection logic
        pass

class QueryAction(DatabaseAction):
    def __init__(self, query, connection_string):
        super().__init__(connection_string)
        self.query = query
    
    def run(self, shared):
        # Execute query
        shared['query_result'] = f"Results for: {self.query}"
        return shared

Error Handling & Retries

class RobustAction(Action):
    def __init__(self, max_retries=3):
        super().__init__()
        self.max_retries = max_retries
    
    def run(self, shared):
        for attempt in range(self.max_retries):
            try:
                # Your action logic here
                return self.execute_logic(shared)
            except Exception as e:
                if attempt == self.max_retries - 1:
                    raise e
                print(f"Attempt {attempt + 1} failed, retrying...")
        return shared
    
    def execute_logic(self, shared):
        # Implement your logic
        return shared

Contributing

We welcome contributions! Please see our Contributing Guidelines for details on:

  • Setting up the development environment
  • Running tests
  • Submitting pull requests
  • Code style guidelines

Development Setup

git clone https://github.com/yourusername/broflow.git
cd broflow
pip install -e .

License

MIT License - see LICENSE for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

broflow-0.1.5.tar.gz (15.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

broflow-0.1.5-py3-none-any.whl (13.7 kB view details)

Uploaded Python 3

File details

Details for the file broflow-0.1.5.tar.gz.

File metadata

  • Download URL: broflow-0.1.5.tar.gz
  • Upload date:
  • Size: 15.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.8.13

File hashes

Hashes for broflow-0.1.5.tar.gz
Algorithm Hash digest
SHA256 102026bc0866b2ef82be3b969a030ce2f30458dbee7ba7461ef01087caaf5832
MD5 296029f457491ec22de3a8eafb8a7fec
BLAKE2b-256 f39d734e041f3f3f76245fcb491de769dd39c2a89d5a2fa3077d61faa76e61c7

See more details on using hashes here.

File details

Details for the file broflow-0.1.5-py3-none-any.whl.

File metadata

  • Download URL: broflow-0.1.5-py3-none-any.whl
  • Upload date:
  • Size: 13.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.8.13

File hashes

Hashes for broflow-0.1.5-py3-none-any.whl
Algorithm Hash digest
SHA256 9c832a7173e3734323ed0f2e2e56ed865f82449168d1be579aa654c6c024128c
MD5 d39b6397817adc171542cbac42e53146
BLAKE2b-256 1f14d2a1bf6bc65135965b24e0b3099652451f111ddf730c08409813b9538ad2

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page