Agent workflow orchestration and execution platform
Project description
aipartnerupflow
Task Orchestration and Execution Framework
Core Positioning
The core of aipartnerupflow is task orchestration and execution specifications. It provides a unified task orchestration framework that supports execution of multiple task types. The core is pure orchestration with no LLM dependencies - CrewAI support is optional.
Core includes:
- Task orchestration specifications (TaskManager)
- Core interfaces (ExecutableTask, BaseTask, TaskStorage)
- Storage (DuckDB default, PostgreSQL optional)
- NO CrewAI dependency (available via [crewai] extra)
Optional features:
- CrewAI Support [crewai]: LLM-based agent crews via CrewManager (task executor implementation)
- HTTP/REST Executor [http]: Remote API calls via RestExecutor (task executor implementation)
- SSH Executor [ssh]: Remote command execution via SSH (task executor implementation)
- Docker Executor [docker]: Containerized command execution (task executor implementation)
- gRPC Executor [grpc]: gRPC service calls (task executor implementation)
- WebSocket Executor: Bidirectional WebSocket communication (task executor implementation)
- aipartnerupflow API Executor: Inter-instance API calls for distributed execution (task executor implementation)
- MCP Executor: Model Context Protocol executor for accessing external tools and data sources (task executor implementation)
- MCP Server [a2a]: MCP (Model Context Protocol) server exposing task orchestration as MCP tools and resources
- A2A Protocol Server [a2a]: A2A Protocol Server (A2A Protocol is the standard protocol for agent communication)
- CLI Tools [cli]: Command-line interface
Note: CrewManager and future executors are all implementations of the ExecutableTask interface. Each executor handles different types of task execution (LLM, HTTP, etc.).
Core Features
Task Orchestration Specifications (Core)
- TaskManager: Task tree orchestration, dependency management, priority scheduling
- Unified Execution Specification: All task types unified through the
ExecutableTaskinterface
Task Execution Types
All task executors implement the ExecutableTask interface:
- Custom Tasks (core): Users implement
ExecutableTaskfor their own task types - CrewManager [crewai]: LLM-based task execution via CrewAI (built-in executor)
- RestExecutor [http]: HTTP/REST API calls with authentication and retry (built-in executor)
- SshExecutor [ssh]: Remote command execution via SSH (built-in executor)
- DockerExecutor [docker]: Containerized command execution (built-in executor)
- GrpcExecutor [grpc]: gRPC service calls (built-in executor)
- WebSocketExecutor: Bidirectional WebSocket communication (built-in executor)
- ApFlowApiExecutor: Inter-instance API calls for distributed execution (built-in executor)
- McpExecutor: Model Context Protocol executor for accessing external tools and data sources (built-in executor)
- GenerateExecutor: Generate task tree JSON arrays from natural language requirements using LLM (built-in executor)
- BatchManager [crewai]: Batch orchestration container (batches multiple crews)
Supporting Features
- Storage: Task state persistence (DuckDB default, PostgreSQL optional)
- Unified External API: A2A Protocol Server (HTTP, SSE, WebSocket) [a2a]
- Real-time Progress Streaming: Streaming support via A2A Protocol
- CLI Tools: Command-line interface [cli]
Protocol Standard
- A2A Protocol: The framework adopts A2A (Agent-to-Agent) Protocol as the standard protocol for agent communication. A2A Protocol is a mature, production-ready specification designed specifically for AI Agent systems, providing:
- Agent-to-agent standardized communication interface
- Streaming task execution support
- Agent capability description mechanism (AgentCard, AgentSkill)
- Multiple transport methods (HTTP, SSE, WebSocket)
- Task management and status tracking
- JWT authentication support
Installation
Core Library (Minimum - Pure Orchestration Framework)
pip install aipartnerupflow
Includes: Task orchestration specifications, core interfaces, storage (DuckDB) Excludes: CrewAI, batch execution, API server, CLI tools
With Optional Features
# CrewAI LLM task support (includes batch)
pip install aipartnerupflow[crewai]
# Includes: CrewManager for LLM-based agent crews
# BatchManager for atomic batch execution of multiple crews
# A2A Protocol Server (Agent-to-Agent communication protocol)
pip install aipartnerupflow[a2a]
# Run A2A server: python -m aipartnerupflow.api.main
# Or: aipartnerupflow-server (CLI command)
# CLI tools
pip install aipartnerupflow[cli]
# Run CLI: aipartnerupflow or apflow
# PostgreSQL storage
pip install aipartnerupflow[postgres]
# SSH executor (remote command execution)
pip install aipartnerupflow[ssh]
# Docker executor (containerized execution)
pip install aipartnerupflow[docker]
# gRPC executor (gRPC service calls)
pip install aipartnerupflow[grpc]
# Everything (includes all extras)
pip install aipartnerupflow[all]
๐ Quick Start
Get started with aipartnerupflow in minutes!
Installation
# Minimal installation (core only)
pip install aipartnerupflow
# With all features
pip install aipartnerupflow[all]
As a Library (Pure Core)
Using Task Orchestration Specifications:
from aipartnerupflow import TaskManager, TaskTreeNode, create_session
# Create database session and task manager (core)
db = create_session() # or: db = get_default_session()
task_manager = TaskManager(db)
# Create task tree (task orchestration)
# Use task_repository to create tasks
root_task = await task_manager.task_repository.create_task(
name="root_task",
user_id="user_123",
priority=2
)
child_task = await task_manager.task_repository.create_task(
name="custom_task", # Task name corresponds to specific executor
user_id="user_123",
parent_id=root_task.id,
dependencies=[], # Dependency relationships
inputs={"url": "https://example.com"}
)
# Build task tree and execute (task orchestration core)
task_tree = TaskTreeNode(root_task)
task_tree.add_child(TaskTreeNode(child_task))
result = await task_manager.distribute_task_tree(task_tree)
Creating Custom Tasks (Traditional External Service Calls):
from aipartnerupflow import ExecutableTask
from typing import Dict, Any
import aiohttp
class APICallTask(ExecutableTask):
"""Traditional external API call task"""
id = "api_call_task"
name = "API Call Task"
description = "Call external API service"
async def execute(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
async with aiohttp.ClientSession() as session:
async with session.post(inputs["url"], json=inputs.get("data")) as response:
result = await response.json()
return {"status": "completed", "result": result}
def get_input_schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {
"url": {"type": "string", "description": "API endpoint"},
"data": {"type": "object", "description": "Request data"}
}
}
With CrewAI Support [crewai]
Executing CrewAI (LLM) Tasks:
# Requires: pip install aipartnerupflow[crewai]
from aipartnerupflow.extensions.crewai import CrewManager
# CrewAI task execution
crew = CrewManager(
name="Analysis Crew",
agents=[{"role": "Analyst", "goal": "Analyze data"}],
tasks=[{"description": "Analyze input", "agent": "Analyst"}]
)
result = await crew.execute(inputs={...})
With Batch Support [crewai]
Using BatchManager to batch multiple crews (atomic operation):
# Requires: pip install aipartnerupflow[crewai]
from aipartnerupflow.extensions.crewai import BatchManager, CrewManager
# BatchManager is a batch container - executes multiple crews as atomic operation
batch = BatchManager(
id="my_batch",
name="Batch Analysis",
works={
"data_collection": {
"agents": [{"role": "Collector", "goal": "Collect data"}],
"tasks": [{"description": "Collect data", "agent": "Collector"}]
},
"data_analysis": {
"agents": [{"role": "Analyst", "goal": "Analyze data"}],
"tasks": [{"description": "Analyze data", "agent": "Analyst"}]
}
}
)
# All crews execute sequentially, results are merged
# If any crew fails, entire batch fails (atomic)
result = await batch.execute(inputs={...})
CLI Usage
# Run tasks (standard mode - recommended)
aipartnerupflow run flow --tasks '[{"id": "task1", "name": "Task 1", "schemas": {"method": "executor_id"}, "inputs": {"key": "value"}}]'
# Or use the shorthand
apflow run flow --tasks '[{"id": "task1", "name": "Task 1", "schemas": {"method": "executor_id"}, "inputs": {"key": "value"}}]'
# Or legacy mode (executor ID + inputs)
apflow run flow executor_id --inputs '{"key": "value"}'
# Start API server
apflow serve --port 8000
# Start daemon mode
apflow daemon start
# Stop daemon mode
apflow daemon stop
A2A Protocol Server
The [a2a] extra provides an A2A (Agent-to-Agent) Protocol server built on Starlette/FastAPI.
from aipartnerupflow.api import create_app
# Create A2A protocol server app
app = create_app()
# Run with: uvicorn app:app --port 8000
# Or use the entry point: aipartnerupflow-server
Note: The current [a2a] extra focuses on A2A protocol support. Future versions may
include additional FastAPI REST API endpoints for direct HTTP access without the A2A protocol.
Architecture Design
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Unified External API Interface Layer โ
โ - A2A Protocol Server (HTTP/SSE/WebSocket) [a2a] โ
โ - CLI Tools [cli] โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Task Orchestration Specification Layer (CORE) โ
โ - TaskManager: Task tree orchestration, dependency โ
โ management, priority scheduling โ
โ - ExecutableTask: Unified task interface โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Task Execution Layer โ
โ - Custom Tasks [core]: ExecutableTask implementations โ
โ โข Traditional external service calls (API, DB, etc.) โ
โ โข Automated task services (scheduled tasks, workflows) โ
โ - CrewManager [crewai]: CrewAI (LLM) task execution โ
โ - BatchManager [crewai]: Batch task orchestration โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Supporting Features Layer โ
โ - Storage: Task state persistence (DuckDB/PostgreSQL) โ
โ - Streaming: Real-time progress updates โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Project Structure
See docs/architecture/DIRECTORY_STRUCTURE.md for detailed directory structure and module descriptions.
Installation Strategy:
pip install aipartnerupflow: Core library only (execution, base, storage, utils) - NO CrewAIpip install aipartnerupflow[crewai]: Core + CrewAI support (includes BatchManager)pip install aipartnerupflow[a2a]: Core + A2A Protocol Serverpip install aipartnerupflow[cli]: Core + CLI toolspip install aipartnerupflow[all]: Full installation (all features)
Note: For examples and learning templates, see the test cases in tests/integration/ and tests/extensions/.
๐ Documentation
Quick Links:
- ๐ Full Documentation - Complete documentation index
- ๐ Quick Start - Get running in 10 minutes
- ๐ Tutorials - Step-by-step tutorials
- ๐ Guides - Comprehensive guides
- ๐ก Examples - Practical examples
- ๐ง API Reference - Complete API documentation
For New Users:
- Start with Getting Started
- Follow the Quick Start Guide
- Try the First Steps Tutorial
For Developers:
For Contributors:
Full documentation is also available at docs.aipartnerup.com.
๐ค Contributing
Contributions are welcome! Please see our Contributing Guide for setup instructions and contribution guidelines.
๐ License
Apache-2.0
๐ Links
- Documentation: docs/index.md - Complete documentation
- Website: aipartnerup.com
- GitHub: aipartnerup/aipartnerupflow
- PyPI: aipartnerupflow
- Issues: GitHub Issues
- Discussions: GitHub Discussions
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file aipartnerupflow-0.7.2.tar.gz.
File metadata
- Download URL: aipartnerupflow-0.7.2.tar.gz
- Upload date:
- Size: 561.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0b4482973acc6897227533c0cf3260f3828fb80a3eb0013901e32f8f30a00acf
|
|
| MD5 |
422f4591ec8c9c66eb0d6d838d9f1cc1
|
|
| BLAKE2b-256 |
c5ed65b69279a60679532b9a2302bb3859188763cf59a211bbd6f1292b023f36
|
File details
Details for the file aipartnerupflow-0.7.2-py3-none-any.whl.
File metadata
- Download URL: aipartnerupflow-0.7.2-py3-none-any.whl
- Upload date:
- Size: 310.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a55f77cc2311944539b38c12cda0a3dc6a01906237ccfa8bce6145800ded883e
|
|
| MD5 |
d2418d993b704c1bb9a898de75822332
|
|
| BLAKE2b-256 |
f3d7232ab4cdeb3fdaaea18ddd3678081f1c26db6c31bb07e5bf97fcab53d562
|