A Python library for orchestrating RPA, data processing, and AI task pipelines from YAML/JSON configurations
Project description
๐ TaskFlow Pipeline# TaskFlow
A Python library for orchestrating automated pipelines including RPA (desktop/web automation), data processing, and AI tasks using YAML or JSON configuration files.
Features
A powerful Python library for orchestrating complex automation workflows with simple YAML/JSON configuration files
- Configuration-based execution: Define your task pipelines in YAML or JSON
Installation โข- Modular task system: Supports RPA, data processing, and AI tasks
Quick Start โข- Easy extensibility: Add custom task modules with simple function definitions
Features โข- Type-safe: Built with type hints for better IDE support
Documentation โข- Error handling: Clear error messages for debugging pipelines
Installation
---pip install taskflow-pipeline
๐ฏ What is TaskFlow?
For development:
TaskFlow is a declarative automation framework that lets you build complex workflows by writing simple configuration files. Perfect for:
- **๐ค RPA (Robotic Process Automation)**: Automate repetitive tasks, UI interactions, and document processinggit clone https://github.com/berkterekli/taskflow-pipeline.git
- **๐ Data Processing**: ETL pipelines, data cleaning, transformation, and validationcd taskflow-pipeline
- **๐ง AI/ML Workflows**: Text generation, classification, sentiment analysis, and morepip install -e ".[dev]"
- **๐ Business Process Automation**: Invoice processing, report generation, email automation```
**Why TaskFlow?**## Quick Start
โ
**No-code workflow definition** - Define complex pipelines in YAML/JSON 1. Create a `tasks.yaml` file:
โ
**Type-safe and tested** - Built with type hints and comprehensive test coverage
โ
**Extensible architecture** - Easily add custom tasks and integrations ```yaml
โ
**Production-ready** - Detailed logging, error handling, and validation tasks:
โ
**Framework-agnostic** - Works with Selenium, Playwright, Pandas, OpenAI, and more - action: "rpa.click"
params:
--- target: "Submit Button"
## ๐ฆ Installation - action: "data.clean_data"
params:
### Basic Installation data: "sample_data.csv"
pip install taskflow-pipeline2. Run your pipeline:
### Development Installationfrom taskflow import TaskFlow
```bash# Initialize and run the pipeline
git clone https://github.com/berkterekli/taskflow-pipeline.gitpipeline = TaskFlow("tasks.yaml")
cd taskflow-pipelinepipeline.run()
pip install -e ".[dev]"```
Task Types
Requirements
RPA Tasks
-
Python 3.8+-
rpa.click: Simulate clicking on UI elements -
PyYAML 6.0+-
rpa.extract_table_from_pdf: Extract tables from PDF files
---### Data Tasks
data.clean_data: Clean and preprocess data
๐ Quick Start
AI Tasks
1. Create Your First Pipeline- ai.generate_text: Generate text using AI models
Create a file named my_workflow.yaml:## Project Structure
yaml
tasks:taskflow/
Step 1: Simulate a button clickโโโ init.py
-
action: "rpa.click"โโโ core.py # Main TaskFlow engine
params:โโโ parser.py # YAML/JSON parser
target: "Submit Button"โโโ tasks/
โโโ init.py
Step 2: Extract data from a PDF โโโ rpa_tasks.py # RPA automation tasks
-
action: "rpa.extract_table_from_pdf" โโโ data_tasks.py # Data processing tasks
params: โโโ ai_tasks.py # AI-related tasks
file_path: "invoices/invoice_001.pdf"```
Step 3: Clean the extracted data## Development
-
action: "data.clean_data"
params:Run tests:
data: "extracted_table.csv"
# Step 4: Generate a summary with AIpytest
- action: "ai.generate_text"```
params:
prompt: "Summarize the invoice data"Format code:
max_tokens: 200
``````bash
black taskflow tests
### 2. Run Your Pipeline```
```pythonType checking:
from taskflow import TaskFlow
```bash
# Initialize the pipelinemypy taskflow
pipeline = TaskFlow("my_workflow.yaml")```
# Execute all tasks## License
pipeline.run()
```MIT License - see [LICENSE](LICENSE) file for details.
### 3. See the Results## Contributing
```Contributions are welcome! Please see [CONTRIBUTING.md](CONTRIBUTING.md) for guidelines.
2025-11-03 16:12:08 - INFO - TaskFlow initialized with config: my_workflow.yaml
2025-11-03 16:12:08 - INFO - Starting pipeline execution...## Publishing
2025-11-03 16:12:08 - INFO - Task 1/4: Executing 'rpa.click'
[RPA] Simulating click on: Submit ButtonSee [PUBLISHING_GUIDE.md](PUBLISHING_GUIDE.md) for detailed instructions on publishing to PyPI.
2025-11-03 16:12:08 - INFO - Task 1: Completed successfully
2025-11-03 16:12:08 - INFO - Task 2/4: Executing 'rpa.extract_table_from_pdf'## Changelog
[RPA] Extracting table from PDF: invoices/invoice_001.pdf
2025-11-03 16:12:08 - INFO - Task 2: Completed successfullySee [CHANGELOG.md](CHANGELOG.md) for version history and changes.
...
2025-11-03 16:12:08 - INFO - Pipeline execution completed successfully!## Author
Berk Terekli
Support
โจ Features
- ๐ Documentation
๐ญ Configuration-Based Workflows- ๐ Issue Tracker
- ๐ฌ Discussions
Define your entire automation pipeline in a simple, readable format:
tasks:
- action: "rpa.click"
params:
target: "Login Button"
- action: "data.transform_data"
params:
input_path: "raw_data.csv"
output_path: "processed_data.csv"
operations: ["dedupe", "normalize", "validate"]
๐งฉ Modular Task System
TaskFlow comes with three built-in task categories:
๐ค RPA Tasks - Robotic Process Automation
# Available RPA Tasks:
rpa.click # Click UI elements
rpa.type_text # Type into input fields
rpa.take_screenshot # Capture screenshots
rpa.extract_table_from_pdf # Extract tables from PDFs
Example Use Case: Invoice Processing
tasks:
- action: "rpa.extract_table_from_pdf"
params:
file_path: "invoice.pdf"
- action: "data.validate_data"
params:
data: "extracted_invoice.csv"
schema:
invoice_number: "string"
amount: "float"
date: "date"
๐ Data Tasks - Data Processing & ETL
# Available Data Tasks:
data.clean_data # Clean and preprocess data
data.transform_data # Apply transformations
data.merge_datasets # Merge multiple datasets
data.validate_data # Validate against schemas
Example Use Case: Data Pipeline
tasks:
- action: "data.merge_datasets"
params:
datasets:
- "sales_q1.csv"
- "sales_q2.csv"
- "sales_q3.csv"
output_path: "annual_sales.csv"
- action: "data.clean_data"
params:
data: "annual_sales.csv"
- action: "data.transform_data"
params:
input_path: "annual_sales.csv"
output_path: "sales_report.csv"
operations: ["aggregate", "pivot", "format"]
๐ง AI Tasks - Artificial Intelligence
# Available AI Tasks:
ai.generate_text # Generate text with LLMs
ai.classify_text # Classify text into categories
ai.analyze_sentiment # Sentiment analysis
ai.extract_entities # Named entity recognition
Example Use Case: Content Analysis
tasks:
- action: "ai.analyze_sentiment"
params:
text: "Customer feedback text here..."
- action: "ai.extract_entities"
params:
text: "Extract companies, people, and locations from this text."
- action: "ai.generate_text"
params:
prompt: "Write a summary of the customer feedback"
max_tokens: 150
๐ Easy Extensibility
Add your own custom tasks in minutes:
from taskflow import TaskFlow
# Define a custom function
def send_email(to: str, subject: str, body: str) -> None:
"""Send an email notification."""
print(f"Sending email to {to}: {subject}")
# Your email logic here...
# Register it as a custom action
pipeline = TaskFlow("workflow.yaml")
pipeline.add_custom_action("email.send", send_email)
pipeline.run()
Then use it in your YAML:
tasks:
- action: "email.send"
params:
to: "team@company.com"
subject: "Pipeline Completed"
body: "The data processing pipeline has finished successfully."
๐ก๏ธ Type-Safe & Production Ready
- Type Hints: Full type annotations for IDE support and type checking
- Comprehensive Logging: Detailed logs for every step
- Error Handling: Clear error messages with actionable information
- Validation: Automatic validation of configuration files and parameters
๐จ Both YAML and JSON Support
Use whichever format you prefer:
YAML (Recommended for readability):
tasks:
- action: "rpa.click"
params:
target: "Button"
JSON (Better for programmatic generation):
{
"tasks": [
{
"action": "rpa.click",
"params": {"target": "Button"}
}
]
}
๐ Complete Task Reference
๐ค RPA Tasks
| Action | Description | Parameters | Example |
|---|---|---|---|
rpa.click |
Click UI element | target (str) |
Click login button |
rpa.type_text |
Type into input | target (str), text (str) |
Fill form fields |
rpa.take_screenshot |
Capture screen | output_path (str) |
Save evidence |
rpa.extract_table_from_pdf |
Extract PDF table | file_path (str) |
Parse invoices |
๐ Data Tasks
| Action | Description | Parameters | Example |
|---|---|---|---|
data.clean_data |
Clean data | data (str/list) |
Remove duplicates |
data.transform_data |
Transform data | input_path, output_path, operations |
ETL pipeline |
data.merge_datasets |
Merge datasets | datasets (list), output_path |
Combine data sources |
data.validate_data |
Validate schema | data (str), schema (dict) |
Ensure data quality |
๐ง AI Tasks
| Action | Description | Parameters | Example |
|---|---|---|---|
ai.generate_text |
Generate text | prompt (str), max_tokens (int) |
Create summaries |
ai.classify_text |
Classify text | text (str), categories (list) |
Categorize content |
ai.analyze_sentiment |
Analyze sentiment | text (str) |
Measure satisfaction |
ai.extract_entities |
Extract entities | text (str) |
Find names, places |
๐ก Real-World Examples
Example 1: Invoice Processing Automation
# invoice_workflow.yaml
tasks:
# Step 1: Extract data from invoice PDF
- action: "rpa.extract_table_from_pdf"
params:
file_path: "invoices/invoice_2024_001.pdf"
# Step 2: Validate the extracted data
- action: "data.validate_data"
params:
data: "extracted_invoice.csv"
schema:
invoice_id: "string"
vendor: "string"
amount: "float"
date: "date"
# Step 3: Clean and format the data
- action: "data.clean_data"
params:
data: "extracted_invoice.csv"
# Step 4: Generate a summary email
- action: "ai.generate_text"
params:
prompt: "Create a professional email summary of this invoice"
max_tokens: 200
Run it:
from taskflow import TaskFlow
pipeline = TaskFlow("invoice_workflow.yaml")
pipeline.run()
Example 2: Customer Feedback Analysis
# feedback_analysis.yaml
tasks:
# Step 1: Analyze sentiment of customer reviews
- action: "ai.analyze_sentiment"
params:
text: "The product quality is excellent but shipping was slow."
# Step 2: Extract key entities (products, issues)
- action: "ai.extract_entities"
params:
text: "The product quality is excellent but shipping was slow."
# Step 3: Classify feedback type
- action: "ai.classify_text"
params:
text: "The product quality is excellent but shipping was slow."
categories: ["product_quality", "shipping", "customer_service", "pricing"]
# Step 4: Generate action items
- action: "ai.generate_text"
params:
prompt: "Based on the feedback, suggest 3 action items for improvement"
max_tokens: 150
Example 3: Data ETL Pipeline
# etl_pipeline.yaml
tasks:
# Step 1: Merge quarterly sales data
- action: "data.merge_datasets"
params:
datasets:
- "data/q1_sales.csv"
- "data/q2_sales.csv"
- "data/q3_sales.csv"
- "data/q4_sales.csv"
output_path: "data/annual_sales.csv"
# Step 2: Clean the merged data
- action: "data.clean_data"
params:
data: "data/annual_sales.csv"
# Step 3: Transform and aggregate
- action: "data.transform_data"
params:
input_path: "data/annual_sales.csv"
output_path: "data/sales_report.csv"
operations: ["dedupe", "aggregate", "sort"]
# Step 4: Validate final output
- action: "data.validate_data"
params:
data: "data/sales_report.csv"
schema:
product_id: "string"
total_sales: "float"
region: "string"
Example 4: Custom Task Integration
# custom_workflow.py
from taskflow import TaskFlow
import requests
# Define custom tasks
def send_slack_notification(webhook_url: str, message: str) -> None:
"""Send a Slack notification."""
requests.post(webhook_url, json={"text": message})
print(f"Sent Slack message: {message}")
def query_database(connection_string: str, query: str) -> list:
"""Query a database and return results."""
print(f"Executing query: {query}")
# Your database logic here...
return [{"id": 1, "name": "Result"}]
# Set up pipeline with custom actions
pipeline = TaskFlow("custom_workflow.yaml")
pipeline.add_custom_action("slack.send", send_slack_notification)
pipeline.add_custom_action("db.query", query_database)
# Run the pipeline
pipeline.run()
custom_workflow.yaml:
tasks:
- action: "db.query"
params:
connection_string: "postgresql://localhost/mydb"
query: "SELECT * FROM users WHERE active = true"
- action: "data.clean_data"
params:
data: "query_results.csv"
- action: "slack.send"
params:
webhook_url: "https://hooks.slack.com/services/YOUR/WEBHOOK/URL"
message: "Daily user report generated successfully!"
๐๏ธ Architecture
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ TaskFlow โ
โ (Orchestrator) โ
โโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโ
โ โ
โผ โผ
โโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโ
โ YAML Parser โ โ JSON Parser โ
โโโโโโโโโโฌโโโโโโโโ โโโโโโโโโโฌโโโโโโโโ
โ โ
โโโโโโโโโโโโโฌโโโโโโโโโโโโโ
โผ
โโโโโโโโโโโโโโโโโโโโโโโโโ
โ Action Mapper โ
โโโโโโโโโโโโโฌโโโโโโโโโโโโ
โ
โโโโโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโโ
โผ โผ โผ
โโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโ
โ RPA Tasks โ โ Data Tasks โ โ AI Tasks โ
โโโโโโโโโโโโโโโโโค โโโโโโโโโโโโโโโค โโโโโโโโโโโโโโโโโค
โ โข click โ โ โข clean โ โ โข generate โ
โ โข type โ โ โข transform โ โ โข classify โ
โ โข screenshot โ โ โข merge โ โ โข sentiment โ
โ โข extract โ โ โข validate โ โ โข entities โ
โโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโ
โ โ โ
โโโโโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโโ
โผ
โโโโโโโโโโโโโโโโ
โ Logging โ
โ Error Handle โ
โโโโโโโโโโโโโโโโ
๐ง Advanced Usage
Error Handling
from taskflow import TaskFlow
try:
pipeline = TaskFlow("workflow.yaml")
pipeline.run()
except FileNotFoundError:
print("โ Configuration file not found")
except ValueError as e:
print(f"โ Invalid configuration: {e}")
except Exception as e:
print(f"โ Pipeline failed: {e}")
Custom Logging
import logging
from taskflow import TaskFlow
# Configure logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('pipeline.log'),
logging.StreamHandler()
]
)
pipeline = TaskFlow("workflow.yaml")
pipeline.run()
Dynamic Task Generation
from taskflow import TaskFlow
import yaml
# Generate tasks programmatically
tasks = {
"tasks": [
{
"action": "rpa.click",
"params": {"target": f"Button {i}"}
}
for i in range(5)
]
}
# Save to file
with open("dynamic_workflow.yaml", "w") as f:
yaml.dump(tasks, f)
# Run the pipeline
pipeline = TaskFlow("dynamic_workflow.yaml")
pipeline.run()
๐ Documentation
Project Structure
taskflow/
โโโ __init__.py # Package initialization
โโโ core.py # Main TaskFlow engine
โโโ parser.py # YAML/JSON configuration parser
โโโ tasks/
โโโ __init__.py
โโโ rpa_tasks.py # RPA automation functions
โโโ data_tasks.py # Data processing functions
โโโ ai_tasks.py # AI/ML task functions
Development
# Clone the repository
git clone https://github.com/berkterekli/taskflow-pipeline.git
cd taskflow-pipeline
# Install development dependencies
pip install -e ".[dev]"
# Run tests
pytest -v
# Run tests with coverage
pytest --cov=taskflow --cov-report=html
# Format code
black taskflow tests examples
# Type checking
mypy taskflow
# Lint code
flake8 taskflow
Running Tests
# Run all tests
pytest
# Run specific test file
pytest tests/test_pipeline.py
# Run with verbose output
pytest -v
# Run with coverage report
pytest --cov=taskflow --cov-report=term-missing
๐ค Contributing
We welcome contributions! Here's how you can help:
- Fork the repository
- Create a feature branch:
git checkout -b feature/amazing-feature - Make your changes
- Run tests:
pytest - Format code:
black taskflow tests - Commit changes:
git commit -m 'Add amazing feature' - Push to branch:
git push origin feature/amazing-feature - Open a Pull Request
See CONTRIBUTING.md for detailed guidelines.
Adding New Tasks
To add a new task module:
- Create
taskflow/tasks/your_tasks.py - Implement functions with type hints and docstrings
- Register actions in
taskflow/core.py - Add tests in
tests/test_your_tasks.py - Update documentation
๐ License
This project is licensed under the MIT License - see the LICENSE file for details.
๐ Acknowledgments
- Built with โค๏ธ by Berk Terekli
- Inspired by the need for simple, maintainable automation workflows
- Thanks to all contributors
๐ Support & Community
- ๐ Documentation: GitHub Wiki
- ๐ Bug Reports: Issue Tracker
- ๐ฌ Discussions: GitHub Discussions
- ๐ง Email: berk.terekli@example.com
- ๐ฆ Twitter: @berkterekli
โญ Star History
If you find TaskFlow useful, please consider giving it a star on GitHub! It helps others discover the project.
๐บ๏ธ Roadmap
- Web UI Dashboard - Visual pipeline editor and monitor
- Parallel Execution - Run tasks in parallel for better performance
- Conditional Logic - If/else conditions in workflows
- Loop Support - Iterate over datasets
- Error Retry - Automatic retry with exponential backoff
- Notifications - Email, Slack, Teams integrations
- Scheduling - Cron-like scheduling support
- Docker Support - Pre-built Docker images
- Cloud Integrations - AWS, Azure, GCP task modules
- Database Tasks - Built-in database operations
- API Tasks - REST API integration tasks
๐ Stats
Made with โค๏ธ by Berk Terekli
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file taskflow_pipeline-0.1.1.tar.gz.
File metadata
- Download URL: taskflow_pipeline-0.1.1.tar.gz
- Upload date:
- Size: 28.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.14
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
91db2e753e97d73fc245482eb2f526b5abcd7a3c56be5b89d95c8216fcb1d49a
|
|
| MD5 |
c40521b86515cff46acaa512008f4bfe
|
|
| BLAKE2b-256 |
3e035354ffee482c66457adee95f603a530847200fde869b3b230eaf4f719cd8
|
File details
Details for the file taskflow_pipeline-0.1.1-py3-none-any.whl.
File metadata
- Download URL: taskflow_pipeline-0.1.1-py3-none-any.whl
- Upload date:
- Size: 16.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.14
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
641c8145d6dc167eba50a65986a65c3f4712a6f38827b9ad704d7267f78388b0
|
|
| MD5 |
b0d8936ed9ad86c089437c5a68824b1b
|
|
| BLAKE2b-256 |
7f559ca41df0ab1b0ab7136a916acaf04e55ca2be7ecc4be1e055fa55137ddb6
|