Skip to main content

Agent workflow orchestration and execution platform

Project description

apflow

Task Orchestration and Execution Framework

Core Positioning

The core of apflow is task orchestration and execution specifications. It provides a unified task orchestration framework that supports execution of multiple task types. The core is pure orchestration with no LLM dependencies - CrewAI support is optional.

Core includes:

  • Task orchestration specifications (TaskManager)
  • Core interfaces (ExecutableTask, BaseTask, TaskStorage)
  • Storage (DuckDB default, PostgreSQL optional)
  • NO CrewAI dependency (available via [crewai] extra)

Optional features:

  • CrewAI Support [crewai]: LLM-based agent crews via CrewaiExecutor (task executor implementation)
  • HTTP/REST Executor [http]: Remote API calls via RestExecutor (task executor implementation)
  • SSH Executor [ssh]: Remote command execution via SSH (task executor implementation)
  • Docker Executor [docker]: Containerized command execution (task executor implementation)
  • gRPC Executor [grpc]: gRPC service calls (task executor implementation)
  • WebSocket Executor: Bidirectional WebSocket communication (task executor implementation)
  • apflow API Executor: Inter-instance API calls for distributed execution (task executor implementation)
  • MCP Executor: Model Context Protocol executor for accessing external tools and data sources (task executor implementation)
  • MCP Server [a2a]: MCP (Model Context Protocol) server exposing task orchestration as MCP tools and resources
  • LLM Executor [llm]: Direct LLM interaction via LiteLLM (supports OpenAI, Anthropic, Gemini, etc.)
  • A2A Protocol Server [a2a]: A2A Protocol Server (A2A Protocol is the standard protocol for agent communication)
  • CLI Tools [cli]: Command-line interface

Note: CrewaiExecutor and future executors are all implementations of the ExecutableTask interface. Each executor handles different types of task execution (LLM, HTTP, etc.).

Core Features

Task Orchestration Specifications (Core)

  • TaskManager: Task tree orchestration, dependency management, priority scheduling
  • Unified Execution Specification: All task types unified through the ExecutableTask interface

Task Execution Types

All task executors implement the ExecutableTask interface:

  • Custom Tasks (core): Users implement ExecutableTask for their own task types
  • CrewaiExecutor [crewai]: LLM-based task execution via CrewAI (built-in executor)
  • RestExecutor [http]: HTTP/REST API calls with authentication and retry (built-in executor)
  • SshExecutor [ssh]: Remote command execution via SSH (built-in executor)
  • DockerExecutor [docker]: Containerized command execution (built-in executor)
  • GrpcExecutor [grpc]: gRPC service calls (built-in executor)
  • WebSocketExecutor: Bidirectional WebSocket communication (built-in executor)
  • ApFlowApiExecutor: Inter-instance API calls for distributed execution (built-in executor)
  • McpExecutor: Model Context Protocol executor for accessing external tools and data sources (built-in executor)
  • GenerateExecutor: Generate task tree JSON arrays from natural language requirements using LLM (built-in executor)
  • LLMExecutor [llm]: Direct LLM interaction via LiteLLM (supports 100+ providers)
  • BatchCrewaiExecutor [crewai]: Batch orchestration container (batches multiple crews)

Supporting Features

  • Storage: Task state persistence (DuckDB default, PostgreSQL optional)
  • Unified External API: A2A Protocol Server (HTTP, SSE, WebSocket) [a2a]
  • Real-time Progress Streaming: Streaming support via A2A Protocol
  • CLI Tools: Command-line interface [cli]

Protocol Standard

  • A2A Protocol: The framework adopts A2A (Agent-to-Agent) Protocol as the standard protocol for agent communication. A2A Protocol is a mature, production-ready specification designed specifically for AI Agent systems, providing:
    • Agent-to-agent standardized communication interface
    • Streaming task execution support
    • Agent capability description mechanism (AgentCard, AgentSkill)
    • Multiple transport methods (HTTP, SSE, WebSocket)
    • Task management and status tracking
    • JWT authentication support

Installation

Core Library (Minimum - Pure Orchestration Framework)

pip install apflow

Includes: Task orchestration specifications, core interfaces, storage (DuckDB) Excludes: CrewAI, batch execution, API server, CLI tools

With Optional Features

# Standard installation (recommended for most use cases)
# Includes A2A server, CLI tools, CrewAI, and LLM support
pip install apflow[standard]

# Individual features:
# CrewAI LLM task support (includes batch)
pip install apflow[crewai]
# Includes: CrewaiExecutor for LLM-based agent crews
#           BatchCrewaiExecutor for atomic batch execution of multiple crews

# A2A Protocol Server (Agent-to-Agent communication protocol)
pip install apflow[a2a]
# Run A2A server: python -m apflow.api.main
# Or: apflow-server (CLI command)

# CLI tools
pip install apflow[cli]
# Run CLI: apflow or apflow

# PostgreSQL storage
pip install apflow[postgres]

# SSH executor (remote command execution)
pip install apflow[ssh]

# Docker executor (containerized execution)
pip install apflow[docker]

# gRPC executor (gRPC service calls)
pip install apflow[grpc]

# LLM support (LiteLLM, supports 100+ providers)
pip install apflow[llm]

# Everything (includes all extras)
pip install apflow[all]

๐Ÿš€ Quick Start

Get started with apflow in minutes!

Installation

# Minimal installation (core only)
pip install apflow

# With all features
pip install apflow[all]

As a Library (Pure Core)

Using Task Orchestration Specifications:

from apflow import TaskManager, TaskTreeNode, create_session

# Create database session and task manager (core)
db = create_session()  # or: db = get_default_session()
task_manager = TaskManager(db)

# Create task tree (task orchestration)
# Use task_repository to create tasks
root_task = await task_manager.task_repository.create_task(
    name="root_task",
    user_id="user_123",
    priority=2
)

child_task = await task_manager.task_repository.create_task(
    name="custom_task",  # Task name corresponds to specific executor
    user_id="user_123",
    parent_id=root_task.id,
    dependencies=[],  # Dependency relationships
    inputs={"url": "https://example.com"}
)

# Build task tree and execute (task orchestration core)
task_tree = TaskTreeNode(root_task)
task_tree.add_child(TaskTreeNode(child_task))
result = await task_manager.distribute_task_tree(task_tree)

Creating Custom Tasks (Traditional External Service Calls):

from apflow import ExecutableTask
from typing import Dict, Any
import aiohttp

class APICallTask(ExecutableTask):
    """Traditional external API call task"""
    
    id = "api_call_task"
    name = "API Call Task"
    description = "Call external API service"
    
    async def execute(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
        async with aiohttp.ClientSession() as session:
            async with session.post(inputs["url"], json=inputs.get("data")) as response:
                result = await response.json()
                return {"status": "completed", "result": result}
    
    def get_input_schema(self) -> Dict[str, Any]:
        return {
            "type": "object",
            "properties": {
                "url": {"type": "string", "description": "API endpoint"},
                "data": {"type": "object", "description": "Request data"}
            }
        }

With Fluent API (TaskBuilder)

Using TaskBuilder for fluent task creation:

from apflow import TaskManager, TaskBuilder, create_session

# Create database session and task manager
db = create_session()
task_manager = TaskManager(db)

# Use fluent API to create and execute tasks
result = await (
    TaskBuilder(task_manager, "rest_executor")
    .with_name("fetch_user_data")
    .with_user("user_123")
    .with_input("url", "https://api.example.com/users")
    .with_input("method", "GET")
    .execute()
)

With CrewAI Support [crewai]

Executing CrewAI (LLM) Tasks:

# Requires: pip install apflow[crewai]
from apflow.extensions.crewai import CrewaiExecutor

# CrewAI task execution
crew = CrewaiExecutor(
    name="Analysis Crew",
    agents=[{"role": "Analyst", "goal": "Analyze data"}],
    tasks=[{"description": "Analyze input", "agent": "Analyst"}]
)
result = await crew.execute(inputs={...})

With Batch Support [crewai]

Using BatchCrewaiExecutor to batch multiple crews (atomic operation):

# Requires: pip install apflow[crewai]
from apflow.extensions.crewai import BatchCrewaiExecutor, CrewaiExecutor

# BatchCrewaiExecutor is a batch container - executes multiple crews as atomic operation
batch = BatchCrewaiExecutor(
    id="my_batch",
    name="Batch Analysis",
    works={
        "data_collection": {
            "agents": [{"role": "Collector", "goal": "Collect data"}],
            "tasks": [{"description": "Collect data", "agent": "Collector"}]
        },
        "data_analysis": {
            "agents": [{"role": "Analyst", "goal": "Analyze data"}],
            "tasks": [{"description": "Analyze data", "agent": "Analyst"}]
        }
    }
)

# All crews execute sequentially, results are merged
# If any crew fails, entire batch fails (atomic)
result = await batch.execute(inputs={...})

CLI Usage

# Run tasks (standard mode - recommended)
apflow run flow --tasks '[{"id": "task1", "name": "Task 1", "schemas": {"method": "executor_id"}, "inputs": {"key": "value"}}]'

# Or use the shorthand
apflow run flow --tasks '[{"id": "task1", "name": "Task 1", "schemas": {"method": "executor_id"}, "inputs": {"key": "value"}}]'

# Or legacy mode (executor ID + inputs)
apflow run flow executor_id --inputs '{"key": "value"}'

# Start API server
apflow serve --port 8000

# Start daemon mode
apflow daemon start

# Stop daemon mode
apflow daemon stop

A2A Protocol Server

The [a2a] extra provides an A2A (Agent-to-Agent) Protocol server built on Starlette/FastAPI.

from apflow.api import create_app

# Create A2A protocol server app
app = create_app()

# Run with: uvicorn app:app --port 8000
# Or use the entry point: apflow-server

Note: The current [a2a] extra focuses on A2A protocol support. Future versions may include additional FastAPI REST API endpoints for direct HTTP access without the A2A protocol.

Architecture Design

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚              Unified External API Interface Layer            โ”‚
โ”‚  - A2A Protocol Server (HTTP/SSE/WebSocket) [a2a]          โ”‚
โ”‚  - CLI Tools [cli]                                          โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                          โ†“
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚        Task Orchestration Specification Layer (CORE)         โ”‚
โ”‚        - TaskManager: Task tree orchestration, dependency  โ”‚
โ”‚          management, priority scheduling                     โ”‚
โ”‚        - ExecutableTask: Unified task interface             โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                          โ†“
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                    Task Execution Layer                      โ”‚
โ”‚  - Custom Tasks [core]: ExecutableTask implementations      โ”‚
โ”‚    โ€ข Traditional external service calls (API, DB, etc.)     โ”‚
โ”‚    โ€ข Automated task services (scheduled tasks, workflows)  โ”‚
โ”‚  - CrewaiExecutor [crewai]: CrewAI (LLM) task execution        โ”‚
โ”‚  - BatchCrewaiExecutor [crewai]: Batch task orchestration           โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                          โ†“
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                    Supporting Features Layer                โ”‚
โ”‚  - Storage: Task state persistence (DuckDB/PostgreSQL)     โ”‚
โ”‚  - Streaming: Real-time progress updates                    โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Project Structure

See docs/architecture/directory-structure.md for detailed directory structure and module descriptions.

Installation Strategy:

  • pip install apflow: Core library only (execution, base, storage, utils) - NO CrewAI
  • pip install apflow[standard]: Core + A2A server + CLI tools + CrewAI + LLM support (recommended)
  • pip install apflow[crewai]: Core + CrewAI support (includes BatchCrewaiExecutor)
  • pip install apflow[a2a]: Core + A2A Protocol Server
  • pip install apflow[cli]: Core + CLI tools
  • pip install apflow[all]: Full installation (all features)

Note: For examples and learning templates, see the test cases in tests/integration/ and tests/extensions/.

๐Ÿ“š Documentation

Quick Links:

For New Users:

For Developers:

Architecture & Design:

For Contributors:

Examples & Templates:

Full documentation is also available at flow-docs.aipartnerup.com.

๐Ÿค Contributing

Contributions are welcome! Please see our Contributing Guide for setup instructions and contribution guidelines.

๐Ÿ“„ License

Apache-2.0

๐Ÿ”— Links

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

apflow-0.12.0.tar.gz (692.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

apflow-0.12.0-py3-none-any.whl (355.1 kB view details)

Uploaded Python 3

File details

Details for the file apflow-0.12.0.tar.gz.

File metadata

  • Download URL: apflow-0.12.0.tar.gz
  • Upload date:
  • Size: 692.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.11

File hashes

Hashes for apflow-0.12.0.tar.gz
Algorithm Hash digest
SHA256 874ccf9a181f143264387305958f8f43293055f6e02e7dbd44090518574d6739
MD5 80561512ebdf76af97301def7cb05677
BLAKE2b-256 08f31e47e2c3f4cd1d5ef28c7784b7f102194f5c49aaaf44c2e2d47c2f1ffe80

See more details on using hashes here.

File details

Details for the file apflow-0.12.0-py3-none-any.whl.

File metadata

  • Download URL: apflow-0.12.0-py3-none-any.whl
  • Upload date:
  • Size: 355.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.11

File hashes

Hashes for apflow-0.12.0-py3-none-any.whl
Algorithm Hash digest
SHA256 310edf4dbc09c932f58213e52ef3e2738bbd0516e2f17ce95636a72fe6bc17cb
MD5 f8d113d364dfa87b2ea5a41ea8930950
BLAKE2b-256 7a29473d1bec2905621cd6f2c09d3e8836a8b93f823a2e3e5821ff2a0388dbef

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page