Skip to main content

Python middleware bridging Google ADK agents with AGUI protocol via Server-Sent Events for real-time agent interactions

Project description

ADK AGUI Middleware

Ask DeepWiki CI CodeQL Semgrep Gitleaks License: MIT Code style: Ruff Security: Bandit Type Checker: mypy

Enterprise-grade Python 3.10+ middleware that seamlessly bridges Google's Agent Development Kit (ADK) with AGUI protocol, providing high-performance Server-Sent Events streaming and Human-in-the-Loop (HITL) workflow orchestration.

Overview

Enterprise-grade Python 3.10+ middleware that bridges Google's Agent Development Kit (ADK) with AGUI protocol, enabling real-time AI agent applications with Server-Sent Events streaming and Human-in-the-Loop workflows.

Key Features

  • โšก SSE Streaming: High-performance Server-Sent Events with real-time ADK โ†” AGUI translation
  • ๐Ÿ”’ Session Management: Thread-safe locking with configurable timeout and retry mechanisms
  • ๐Ÿค HITL Workflows: Complete Human-in-the-Loop orchestration with state persistence
  • ๐Ÿ—๏ธ Enterprise Architecture: Modular design with dependency injection and clean separation
  • ๐Ÿ›ก๏ธ Production-Ready: Comprehensive error handling, logging, and graceful shutdown
  • ๐ŸŽฏ Type Safety: Full Python 3.10 annotations with strict mypy validation

Installation

pip install adk-agui-middleware

Requirements

  • Python 3.10+ (recommended 3.10.3+)
  • Google ADK >= 1.9.0
  • AGUI Protocol >= 0.1.7
  • FastAPI >= 0.104.0

Examples

Jump in with hands-on, progressively richer examples under examples/.

  • 01_minimal_sse
    • Smallest working setup that streams Server-Sent Events (SSE) from an ADK LlmAgent.
    • Path: examples/01_minimal_sse/app.py
  • 02_context_history
    • Main SSE endpoint plus History and State endpoints, with simple context extraction.
    • Path: examples/02_context_history/app.py
  • 03_advanced_pipeline
    • Adds a custom input/output recorder and a safe preprocessor for RunAgentInput.
    • Path: examples/03_advanced_pipeline/app.py
  • 04_lifecycle_handlers
    • Walks through the full request lifecycle and HandlerContext hooks (session lock, ADK/AGUI handlers, translation, state snapshot, I/O recording).
    • Path: examples/04_lifecycle_handlers/app.py

Architecture Overview

High-Level System Architecture

graph TB
    %% Clients
    subgraph "Clients"
        WEB[Web Apps]
        MOBILE[Mobile Apps]
        API[API Clients]
    end

    %% Endpoints
    subgraph "FastAPI Endpoints"
        MAIN_EP[POST / - RunAgentInput]
        HIST_EP[History API]
        STATE_EP[State API]
    end

    %% Services
    subgraph "Services"
        SSE_SVC[SSEService]
        HIST_SVC[HistoryService]
        STATE_SVC[StateService]
    end

    %% Core pipeline
    subgraph "Core Pipeline"
        AGUI_USER[AGUIUserHandler]
        USER_MSG[UserMessageHandler]
        RUNNING[RunningHandler]
        TRANSLATOR[EventTranslator]
        SESSION_HDL[SessionHandler]
        QUEUE_HDL[QueueHandler]
        LOCK_HDL[SessionLockHandler]
    end

    %% Handler hooks (dependency injection)
    subgraph "HandlerContext Hooks"
        ADK_H[BaseADKEventHandler]
        ADK_TO[BaseADKEventTimeoutHandler]
        TR_H[BaseTranslateHandler]
        AGUI_H[BaseAGUIEventHandler]
        STATE_H[BaseAGUIStateSnapshotHandler]
        IO_H[BaseInOutHandler]
    end

    %% Managers & ADK
    subgraph "Managers & ADK"
        SESS_MGR[SessionManager]
        ADK_RUNNER[ADK Runner]
        BASE_AGENT[BaseAgent]
        RUN_CFG[RunConfig]
        RUNNER_CFG[RunnerConfig]
    end

    %% Utilities
    subgraph "Utilities"
        SSE_ENC[SSE Encoder/Formatter]
        FRONT_TOOLS[FrontendToolset]
        SHUTDOWN[ShutdownHandler]
    end

    %% Flow
    WEB --> MAIN_EP
    MOBILE --> MAIN_EP
    API --> MAIN_EP
    WEB --> HIST_EP
    WEB --> STATE_EP

    MAIN_EP --> SSE_SVC
    HIST_EP --> HIST_SVC
    STATE_EP --> STATE_SVC

    %% SSE service orchestration
    SSE_SVC --> LOCK_HDL
    SSE_SVC --> AGUI_USER
    SSE_SVC --> SHUTDOWN
    SSE_SVC --> SSE_ENC

    %% Pipeline
    AGUI_USER --> USER_MSG
    AGUI_USER --> SESSION_HDL
    AGUI_USER --> RUNNING
    AGUI_USER --> QUEUE_HDL
    RUNNING --> TRANSLATOR
    RUNNING --> ADK_RUNNER
    ADK_RUNNER --> BASE_AGENT
    ADK_RUNNER --> RUN_CFG
    RUNNER_CFG --> RUN_CFG

    %% HandlerContext wiring
    RUNNING -. pre/post hooks .-> ADK_H
    RUNNING -. timeout .-> ADK_TO
    RUNNING -. translate .-> TR_H
    RUNNING -. post-AGUI .-> AGUI_H
    RUNNING -. state snapshot .-> STATE_H
    SSE_SVC -. I/O record .-> IO_H

    %% Managers and tools
    SESSION_HDL --> SESS_MGR
    RUNNING --> FRONT_TOOLS
    QUEUE_HDL -->|ADK/AGUI queues| SSE_ENC

    %% Styling
    classDef box fill:#f7f7f7,stroke:#555,color:#111,stroke-width:1px
    class WEB,MOBILE,API,MAIN_EP,HIST_EP,STATE_EP,SSE_SVC,HIST_SVC,STATE_SVC,AGUI_USER,USER_MSG,RUNNING,TRANSLATOR,SESSION_HDL,QUEUE_HDL,LOCK_HDL,SESS_MGR,ADK_RUNNER,BASE_AGENT,RUN_CFG,RUNNER_CFG,SSE_ENC,FRONT_TOOLS,SHUTDOWN,ADK_H,ADK_TO,TR_H,AGUI_H,STATE_H,IO_H box

Concurrent Event Processing Architecture

graph TB
    subgraph "Request Initiation"
        CLIENT_REQ[๐Ÿ“ฅ Client Request<br/>POST RunAgentInput<br/>with Messages & Tools]
        INPUT_INFO[๐Ÿ“‹ Input Info Creation<br/>Extract Context<br/>Initialize Event Queues]
        QUEUE_INIT[๐ŸŽฏ Queue Initialization<br/>Create EventQueue Model<br/>ADK & AGUI Queues]
    end

    subgraph "Dual Queue Architecture"
        ADK_QUEUE[๐Ÿ“Š ADK Event Queue<br/>Queue Event or None<br/>Producer: Agent Runner]
        AGUI_QUEUE[๐Ÿ“ฆ AGUI Event Queue<br/>Queue BaseEvent or None<br/>Consumer: Client Stream]
    end

    subgraph "Queue Management Layer"
        QUEUE_HANDLER[๐ŸŽฏ Queue Handler<br/>Factory Pattern<br/>Create Managers]
        ADK_MGR[๐Ÿ“Š ADK Queue Manager<br/>Logging & Iteration<br/>Caller Tracking]
        AGUI_MGR[๐Ÿ“ฆ AGUI Queue Manager<br/>Logging & Iteration<br/>Caller Tracking]
    end

    subgraph "Concurrent Task Execution"
        TASK_GROUP[โšก Async TaskGroup<br/>Concurrent Execution<br/>Exception Aggregation]

        subgraph "ADK Producer Task"
            ADK_TASK[๐Ÿ”ต Task 1: ADK Event Producer<br/>async _run_async_with_adk]
            ADK_RUNNER[๐Ÿš€ ADK Agent Runner<br/>Execute Agent Logic<br/>Generate Events]
            ADK_PUT[โžก๏ธ Put Events to ADK Queue<br/>await adk_queue.put event<br/>Log with Caller Info]
            ADK_SENTINEL[๐Ÿ›‘ ADK Termination<br/>Put None to ADK Queue<br/>Signal Completion]
        end

        subgraph "AGUI Translator Task"
            AGUI_TASK[๐ŸŸข Task 2: AGUI Event Translator<br/>async _run_async_with_agui]
            AGUI_ITER[๐Ÿ”„ ADK Queue Iterator<br/>async for adk_event<br/>in adk_queue.get_iterator]
            TRANSLATOR[๐Ÿ”„ Event Translator<br/>ADK โ†’ AGUI Translation<br/>Streaming & Tool Detection]
            AGUI_PUT[โžก๏ธ Put Events to AGUI Queue<br/>await agui_queue.put event<br/>Generate HITL Events]
            AGUI_SENTINEL[๐Ÿ›‘ AGUI Termination<br/>Put None to AGUI Queue<br/>After Final State]
        end
    end

    subgraph "Client Stream Consumer"
        STREAM_CONSUMER[๐ŸŒŠ SSE Stream Consumer<br/>Main Workflow Loop<br/>async for agui_event]
        AGUI_OUTPUT[๐Ÿ“ค AGUI Queue Iterator<br/>async for in agui_queue<br/>Yield to Client]
        SSE_SWITCH{๐Ÿ”€ SSE Mode? event_source_response_mode}
        SSE_SSR[๐Ÿ”Œ StreamingResponse<br/>Fake SSE lines]
        SSE_ESR[๐Ÿ”Œ EventSourceResponse<br/>Spec-compliant SSE]
        CLIENT_STREAM[๐Ÿ“ก Client Stream<br/>Real-time Event Delivery]
    end

    subgraph "Exception Handling"
        ADK_EXCEPTION[โš ๏ธ ADK Exception Handler<br/>Context Manager<br/>Ensure Sentinel in Finally]
        AGUI_EXCEPTION[โš ๏ธ AGUI Exception Handler<br/>Context Manager<br/>Ensure Sentinel in Finally]
        TASK_EXCEPTION[๐Ÿšจ TaskGroup Exception<br/>Aggregate Exceptions<br/>ExceptionGroup Handler]
        ERROR_EVENT[โŒ Error Event Generation<br/>Convert to AGUI Error<br/>Send to Client]
    end

    subgraph "Synchronization & Termination"
        ITER_PROTOCOL[๐Ÿ”„ AsyncQueueIterator<br/>__aiter__ & __anext__<br/>task_done on get]
        NONE_SENTINEL[๐Ÿ›‘ None Sentinel Pattern<br/>Signals Queue Termination<br/>Raises StopAsyncIteration]
        GRACEFUL_STOP[โœ… Graceful Termination<br/>All Tasks Complete<br/>Queues Drained]
    end

    %% Request flow
    CLIENT_REQ --> INPUT_INFO
    INPUT_INFO --> QUEUE_INIT
    QUEUE_INIT --> ADK_QUEUE
    QUEUE_INIT --> AGUI_QUEUE

    %% Queue management setup
    QUEUE_INIT --> QUEUE_HANDLER
    QUEUE_HANDLER --> ADK_MGR
    QUEUE_HANDLER --> AGUI_MGR
    ADK_MGR --> ADK_QUEUE
    AGUI_MGR --> AGUI_QUEUE

    %% Concurrent task execution
    INPUT_INFO --> TASK_GROUP
    TASK_GROUP --> ADK_TASK
    TASK_GROUP --> AGUI_TASK

    %% ADK producer flow
    ADK_TASK --> ADK_EXCEPTION
    ADK_EXCEPTION --> ADK_RUNNER
    ADK_RUNNER --> ADK_PUT
    ADK_PUT --> ADK_MGR
    ADK_MGR --> ADK_QUEUE
    ADK_EXCEPTION -.->|Finally Block| ADK_SENTINEL
    ADK_SENTINEL --> ADK_QUEUE

    %% AGUI translator flow
    AGUI_TASK --> AGUI_EXCEPTION
    AGUI_EXCEPTION --> AGUI_ITER
    ADK_QUEUE --> AGUI_ITER
    ADK_QUEUE --> TRANSLATOR
    TRANSLATOR --> AGUI_PUT
    AGUI_PUT --> AGUI_MGR
    AGUI_MGR --> AGUI_QUEUE
    AGUI_EXCEPTION -.->|Finally Block| AGUI_SENTINEL
    AGUI_SENTINEL --> AGUI_QUEUE

    %% Stream consumer flow
    TASK_GROUP --> STREAM_CONSUMER
    STREAM_CONSUMER --> AGUI_OUTPUT
    AGUI_QUEUE --> AGUI_OUTPUT
    AGUI_OUTPUT --> SSE_SWITCH
    SSE_SWITCH -->|False| SSE_SSR
    SSE_SWITCH -->|True| SSE_ESR
    SSE_SSR --> CLIENT_STREAM
    SSE_ESR --> CLIENT_STREAM

    %% Exception handling
    ADK_TASK -.->|Exception| TASK_EXCEPTION
    AGUI_TASK -.->|Exception| TASK_EXCEPTION
    TASK_EXCEPTION --> ERROR_EVENT
    ERROR_EVENT --> AGUI_QUEUE

    %% Synchronization
    AGUI_ITER --> ITER_PROTOCOL
    AGUI_OUTPUT --> ITER_PROTOCOL
    ITER_PROTOCOL --> NONE_SENTINEL
    NONE_SENTINEL --> GRACEFUL_STOP

    %% Styling
    classDef request fill:#e3f2fd,color:#000,stroke:#1976d2,stroke-width:2px
    classDef queue fill:#ffebee,color:#000,stroke:#d32f2f,stroke-width:2px
    classDef manager fill:#fff3e0,color:#000,stroke:#f57c00,stroke-width:2px
    classDef task fill:#e8f5e9,color:#000,stroke:#43a047,stroke-width:2px
    classDef producer fill:#e1f5fe,color:#000,stroke:#0288d1,stroke-width:2px
    classDef translator fill:#f3e5f5,color:#000,stroke:#8e24aa,stroke-width:2px
    classDef consumer fill:#fff8e1,color:#000,stroke:#ffa000,stroke-width:2px
    classDef exception fill:#fbe9e7,color:#000,stroke:#ff6f00,stroke-width:2px
    classDef sync fill:#f1f8e9,color:#000,stroke:#689f38,stroke-width:2px

    class CLIENT_REQ,INPUT_INFO,QUEUE_INIT request
    class ADK_QUEUE,AGUI_QUEUE queue
    class QUEUE_HANDLER,ADK_MGR,AGUI_MGR manager
    class TASK_GROUP task
    class ADK_TASK,ADK_RUNNER,ADK_PUT,ADK_SENTINEL producer
    class AGUI_TASK,AGUI_ITER,TRANSLATOR,AGUI_PUT,AGUI_SENTINEL translator
    class STREAM_CONSUMER,AGUI_OUTPUT,SSE_SWITCH,SSE_SSR,SSE_ESR,CLIENT_STREAM consumer
    class ADK_EXCEPTION,AGUI_EXCEPTION,TASK_EXCEPTION,ERROR_EVENT exception
    class ITER_PROTOCOL,NONE_SENTINEL,GRACEFUL_STOP sync

Human-in-the-Loop (HITL) Workflow

graph TD
    subgraph "Client Request Processing"
        REQ[๐Ÿ“ฅ Client Request<br/>RunAgentInput<br/>POST /]
        AUTH[๐Ÿ” Authentication<br/>Extract User Context<br/>Session Validation]
        LOCK[๐Ÿ”’ Session Lock<br/>Acquire Exclusive Access<br/>Prevent Concurrency]
    end

    subgraph "Session & State Management"
        SESS_CHECK[๐Ÿ“‹ Session Check<br/>Get or Create Session<br/>Load Existing State]
        STATE_INIT[๐Ÿ—‚๏ธ State Initialization<br/>Apply Initial State<br/>Load Pending Tools]
        TOOL_RESUME[โฑ๏ธ Tool Resume Check<br/>Detect Pending LRO Tools<br/>Resume HITL Workflow]
        FRONTEND_TOOLS[๐Ÿงฐ Frontend Tools Setup<br/>Extract Client Tools<br/>Inject into Agent]
    end

    subgraph "Message Processing"
        MSG_TYPE{โ“ Message Type?}
        USER_MSG[๐Ÿ’ฌ User Message<br/>Extract Content<br/>Prepare for Agent]
        TOOL_RESULT[๐Ÿ› ๏ธ Tool Result<br/>Validate Tool Call ID<br/>Convert to ADK Format]
        MSG_ERROR[โŒ Message Error<br/>Invalid Tool ID or<br/>Missing Content]
    end

    subgraph "Agent Execution Pipeline"
        AGENT_START[โ–ถ๏ธ Agent Execution<br/>RUN_STARTED Event<br/>Begin Processing]
        QUEUE_SETUP[๐ŸŽฏ Queue Setup<br/>Initialize Event Queues<br/>ADK & AGUI Managers]
        CONCURRENT_EXEC[โšก Concurrent Execution<br/>TaskGroup with 2 Tasks<br/>Producer & Translator]
        ADK_RUN[๐Ÿš€ ADK Runner Task<br/>Agent Processing<br/>Stream to ADK Queue]
        EVENT_PROC[๐Ÿ”„ AGUI Translator Task<br/>ADK โ†’ AGUI Translation<br/>Stream to AGUI Queue]
        CLIENT_STREAM[๐ŸŒŠ Client Stream Consumer<br/>AGUI Queue Iterator<br/>Yield to SSE Response]
    end

    subgraph "Tool Call Detection & Processing"
        TOOL_CHECK{๐Ÿ” Long-Running Tool?}
        FRONTEND_CALL{๐Ÿงฐ Frontend Tool Call?}
        LRO_DETECT[โฑ๏ธ LRO Detection<br/>Mark as Long-Running<br/>Store Tool Call Info]
        FRONTEND_EVENT[๐ŸŽฏ Frontend Tool Event<br/>Generate Function Call<br/>Put to AGUI Queue]
        HITL_PAUSE[โธ๏ธ HITL Pause<br/>Early Return from Translator<br/>Wait for Human Input]
        NORMAL_FLOW[โžก๏ธ Normal Flow<br/>Continue Processing<br/>Standard Tools]
    end

    subgraph "State Persistence"
        TOOL_PERSIST[๐Ÿ’พ Tool State Persist<br/>Save Pending Tools<br/>Update Session State]
        STATE_SNAP[๐Ÿ“ธ State Snapshot<br/>Create Final State<br/>Send to Client]
        COMPLETION[โœ… Completion<br/>RUN_FINISHED Event<br/>Release Resources]
    end

    subgraph "Error Handling"
        ERROR_CATCH[๐Ÿšจ Error Handler<br/>Catch Exceptions<br/>Generate Error Events]
        ERROR_EVENT[โš ๏ธ Error Event<br/>AGUI Error Format<br/>Client Notification]
        CLEANUP[๐Ÿงน Cleanup<br/>Release Session Lock<br/>Resource Cleanup]
    end

    %% Request Processing Flow
    REQ --> AUTH
    AUTH --> LOCK
    LOCK --> SESS_CHECK

    %% Session Management Flow
    SESS_CHECK --> STATE_INIT
    STATE_INIT --> TOOL_RESUME
    TOOL_RESUME --> FRONTEND_TOOLS
    FRONTEND_TOOLS --> MSG_TYPE

    %% Message Processing Flow
    MSG_TYPE -->|User Message| USER_MSG
    MSG_TYPE -->|Tool Result| TOOL_RESULT
    MSG_TYPE -->|Error| MSG_ERROR
    USER_MSG --> AGENT_START
    TOOL_RESULT --> AGENT_START
    MSG_ERROR --> ERROR_EVENT

    %% Agent Execution Flow
    AGENT_START --> QUEUE_SETUP
    QUEUE_SETUP --> CONCURRENT_EXEC
    CONCURRENT_EXEC --> ADK_RUN
    CONCURRENT_EXEC --> EVENT_PROC
    CONCURRENT_EXEC --> CLIENT_STREAM
    ADK_RUN --> EVENT_PROC
    EVENT_PROC --> CLIENT_STREAM
    EVENT_PROC --> TOOL_CHECK

    %% Tool Call Handling
    TOOL_CHECK -->|Long-Running Tool| LRO_DETECT
    TOOL_CHECK -->|Standard Tool| NORMAL_FLOW
    LRO_DETECT --> FRONTEND_CALL
    FRONTEND_CALL -->|Yes| FRONTEND_EVENT
    FRONTEND_CALL -->|No| HITL_PAUSE
    FRONTEND_EVENT --> HITL_PAUSE
    NORMAL_FLOW --> STATE_SNAP

    %% HITL Flow
    HITL_PAUSE --> TOOL_PERSIST
    TOOL_PERSIST --> COMPLETION

    %% Normal Completion Flow
    STATE_SNAP --> COMPLETION

    %% Error Handling Flow
    ADK_RUN -.->|Exception| ERROR_CATCH
    EVENT_PROC -.->|Exception| ERROR_CATCH
    ERROR_CATCH --> ERROR_EVENT
    ERROR_EVENT --> CLEANUP

    %% Final Cleanup
    COMPLETION --> CLEANUP
    CLEANUP --> REQ

    %% Styling
    classDef request fill:#e3f2fd,color:#000,stroke:#1976d2,stroke-width:2px
    classDef session fill:#f1f8e9,color:#000,stroke:#689f38,stroke-width:2px
    classDef message fill:#fff3e0,color:#000,stroke:#f57c00,stroke-width:2px
    classDef agent fill:#fce4ec,color:#000,stroke:#c2185b,stroke-width:2px
    classDef tool fill:#fff8e1,color:#000,stroke:#ff8f00,stroke-width:2px
    classDef state fill:#f3e5f5,color:#000,stroke:#7b1fa2,stroke-width:2px
    classDef error fill:#ffebee,color:#000,stroke:#d32f2f,stroke-width:2px
    classDef decision fill:#e8f5e8,color:#000,stroke:#388e3c,stroke-width:3px

    class REQ,AUTH,LOCK request
    class SESS_CHECK,STATE_INIT,TOOL_RESUME session
    class USER_MSG,TOOL_RESULT,MSG_ERROR message
    class AGENT_START,ADK_RUN,EVENT_PROC agent
    class LRO_DETECT,HITL_PAUSE,NORMAL_FLOW tool
    class TOOL_PERSIST,STATE_SNAP,COMPLETION state
    class ERROR_CATCH,ERROR_EVENT,CLEANUP error
    class MSG_TYPE,TOOL_CHECK decision

Complete Request Lifecycle

sequenceDiagram
    participant CLIENT as "๐ŸŒ Client"
    participant ENDPOINT as "๐ŸŽฏ FastAPI Endpoint"
    participant SSE as "โšก SSE Service"
    participant LOCK as "๐Ÿ”’ Session Lock"
    participant AGUI_USER as "๐ŸŽญ AGUI User Handler"
    participant RUNNING as "๐Ÿƒ Running Handler"
    participant TRANSLATE as "๐Ÿ”„ Event Translator"
    participant ADK_RUNNER as "๐Ÿš€ ADK Runner"
    participant BASE_AGENT as "๐Ÿค– Base Agent"
    participant SESSION_MGR as "๐Ÿ“‹ Session Manager"
    participant SESSION_SVC as "๐Ÿ’พ Session Service"

    note over CLIENT,SESSION_SVC: Request Initiation & Context Setup
    CLIENT->>ENDPOINT: POST RunAgentInput
    ENDPOINT->>SSE: Extract context & create runner
    SSE->>SSE: Extract app_name, user_id, session_id
    SSE->>LOCK: Acquire session lock

    alt Session locked by another request
        LOCK-->>SSE: Lock failed
        SSE-->>CLIENT: SSE: RunErrorEvent (session busy)
    else Lock acquired successfully
        LOCK-->>SSE: Lock acquired

        note over SSE,SESSION_SVC: Handler Initialization & Session Setup
        SSE->>AGUI_USER: Initialize AGUI User Handler
        AGUI_USER->>SESSION_MGR: Check and create session
        SESSION_MGR->>SESSION_SVC: Get or create session with initial state
        SESSION_SVC-->>SESSION_MGR: Session object with state
        SESSION_MGR-->>AGUI_USER: Session ready

        AGUI_USER->>AGUI_USER: Load pending tool calls from state
        AGUI_USER->>RUNNING: Set long-running tool IDs

        note over AGUI_USER,BASE_AGENT: Message Processing & Agent Execution
        AGUI_USER->>AGUI_USER: Determine message type (user input or tool result)
        AGUI_USER->>SSE: Yield RUN_STARTED
        SSE-->>CLIENT: SSE: RUN_STARTED

        AGUI_USER->>RUNNING: Execute agent with user message
        RUNNING->>ADK_RUNNER: ADK Runner execution
        ADK_RUNNER->>BASE_AGENT: Process with custom agent logic

        note over BASE_AGENT,CLIENT: Event Streaming & Real-time Translation
        loop For each ADK event
            BASE_AGENT-->>ADK_RUNNER: Agent-generated ADK event
            ADK_RUNNER-->>RUNNING: Stream ADK event
            RUNNING->>TRANSLATE: Translate ADK to AGUI event
            TRANSLATE-->>RUNNING: AGUI event(s)
            RUNNING-->>AGUI_USER: AGUI event stream
            AGUI_USER-->>SSE: AGUI events
            SSE-->>CLIENT: SSE: Event data (TEXT_MESSAGE_*, TOOL_CALL, etc.)

            alt Long-running tool detected
            RUNNING->>AGUI_USER: Long-running tool call detected
            AGUI_USER-->>SSE: Early return (HITL pause)
            note over AGUI_USER: TaskGroup completes
            AGUI_USER->>SESSION_MGR: Persist pending tool call state
            SESSION_MGR->>SESSION_SVC: Update session state with tool info
            end
        end

        note over AGUI_USER,CLIENT: Workflow Completion & Cleanup
        alt Normal completion (no LRO tools)
            RUNNING->>TRANSLATE: Force close streaming messages
            TRANSLATE-->>RUNNING: Message end events
            RUNNING->>SESSION_MGR: Get final session state
            SESSION_MGR->>SESSION_SVC: Retrieve current state
            SESSION_SVC-->>SESSION_MGR: State snapshot
            SESSION_MGR-->>RUNNING: State data
            RUNNING-->>AGUI_USER: State snapshot event
            AGUI_USER-->>SSE: StateSnapshotEvent
            SSE-->>CLIENT: SSE: STATE_SNAPSHOT
        end

        AGUI_USER-->>SSE: RunFinishedEvent
        SSE-->>CLIENT: SSE: RUN_FINISHED

        note over SSE,LOCK: Resource Cleanup
        SSE->>LOCK: Release session lock
        LOCK-->>SSE: Lock released
    end

    note over CLIENT,SESSION_SVC: Subsequent HITL Tool Result Submission
    opt Tool result submission for HITL
        CLIENT->>ENDPOINT: POST RunAgentInput (with tool result)
        Note right of CLIENT: Tool result contains: tool_call_id, result data
        ENDPOINT->>SSE: Process tool result submission
        note over SSE,AGUI_USER: Same flow but with tool result processing
        AGUI_USER->>AGUI_USER: Validate tool_call_id against pending tools
        AGUI_USER->>AGUI_USER: Convert tool result to ADK format
        AGUI_USER->>SESSION_MGR: Remove completed tool from pending state
        note over AGUI_USER,CLIENT: Continue agent execution with tool result
    end

Session State Management Lifecycle

stateDiagram-v2
    [*] --> SessionCreate: New request with session_id

    SessionCreate --> StateInitialize: Session created/retrieved
    StateInitialize --> ActiveSession: Initial state applied

    state ActiveSession {
        [*] --> ProcessingMessage
        ProcessingMessage --> AgentExecution: User message validated

        state AgentExecution {
            [*] --> StreamingEvents
            StreamingEvents --> ToolCallDetected: Long-running tool found
            StreamingEvents --> NormalCompletion: Standard processing

            state ToolCallDetected {
                [*] --> PendingToolState
                PendingToolState --> HITLWaiting: Tool info persisted
            }
        }

        HITLWaiting --> ProcessingMessage: Tool result submitted
        NormalCompletion --> SessionComplete: Final state snapshot
    }

    SessionComplete --> [*]: Session ends

    state ErrorHandling {
        [*] --> ErrorState
        ErrorState --> SessionCleanup: Error event generated
        SessionCleanup --> [*]
    }

    ActiveSession --> ErrorHandling: Exception occurred
    AgentExecution --> ErrorHandling: Processing error
    HITLWaiting --> ErrorHandling: Invalid tool result

    note right of HITLWaiting
        Session state contains:
        - pending_tool_calls: tool_id to tool_name mapping
        - conversation_history
        - custom_state_data
        - hitl_workflow_status
    end note

    note left of PendingToolState
        Long-running tool state:
        - tool_call_id (UUID)
        - tool_name (function name)
        - call_timestamp
        - awaiting_result: true
    end note

โš ๏ธ Critical Configuration: SSE Response Mode

CopilotKit Frontend Compatibility Issue

IMPORTANT: Some legacy frontends (for example, CopilotKit) do not strictly follow the Server-Sent Events (SSE) specification and can fail to parse FastAPI's standard EventSourceResponse. Although they label their stream as "SSE", the wire format differs from the spec.

The Problem

  • Standard SSE Format (EventSourceResponse): Follows W3C SSE specification with proper event formatting
  • CopilotKit's Expectation: Requires StreamingResponse with non-standard formatting, breaking SSE compliance
  • Impact: If you use the standard-compliant EventSourceResponse, CopilotKit frontends cannot parse the events correctly

The Solution

We provide a configuration flag in ConfigContext to switch between standard-compliant SSE and CopilotKit-compatible streaming:

from adk_agui_middleware.data_model.context import ConfigContext

# For CopilotKit frontend (default, non-standard)
config_context = ConfigContext(
    app_name="my-app",
    user_id=extract_user_id,
    session_id=extract_session_id,
    event_source_response_mode=False  # Default: Uses StreamingResponse for CopilotKit
)

# For SSE-compliant frontends (recommended for custom implementations)
config_context = ConfigContext(
    app_name="my-app",
    user_id=extract_user_id,
    session_id=extract_session_id,
    event_source_response_mode=True  # Uses EventSourceResponse (SSE standard)
)

Configuration Guide

Configuration Response Type Use Case SSE Compliance
event_source_response_mode=False (default) StreamingResponse CopilotKit frontend โŒ Non-compliant
event_source_response_mode=True EventSourceResponse Custom/Standard frontends โœ… W3C compliant

Stream Completion Message Filtering

Configuration: retune_on_stream_complete

When using streaming responses, ADK may emit both incremental streaming chunks AND a final complete message. By default (retune_on_stream_complete=False), the final complete message is filtered to prevent duplicate content on the client side, since all content has already been sent via streaming chunks.

Why This Matters

  • Default Behavior (retune_on_stream_complete=False): Filters out the final complete message to avoid duplication

    • Streaming chunks: โœ… Sent to client
    • Final complete message: โŒ Filtered (prevents duplicate)
  • Alternative Behavior (retune_on_stream_complete=True): Sends both streaming chunks AND the final complete message

    • Streaming chunks: โœ… Sent to client
    • Final complete message: โœ… Sent to client (may cause duplication)

Configuration

Set this in both ConfigContext and HistoryConfig:

from adk_agui_middleware.data_model.context import ConfigContext
from adk_agui_middleware.data_model.config import HistoryConfig

# SSE Service Configuration
config_context = ConfigContext(
    app_name="my-app",
    user_id=extract_user_id,
    session_id=extract_session_id,
    retune_on_stream_complete=False  # Default: Filter final complete message
)

# History Service Configuration
history_config = HistoryConfig(
    app_name="my-app",
    user_id=extract_user_id,
    session_id=extract_session_id,
    retune_on_stream_complete=False  # Default: Filter final complete message
)

Recommendation: Keep the default False to prevent duplicate content unless your frontend specifically requires the final complete message.

Our Stance

Since our in-house frontend is a complete redesign that does not use CopilotKit, we require the backend to strictly comply with the SSE specification. However, to maintain backward compatibility with CopilotKit users, we've made this configurable with the default set to CopilotKit's non-standard mode.

For production systems with custom frontends, we strongly recommend:

config_context = ConfigContext(
    app_name="my-app",
    user_id=extract_user_id,
    session_id=extract_session_id,
    event_source_response_mode=True  # Use SSE standard
)

This ensures your implementation follows web standards and maintains long-term compatibility with standard-compliant SSE clients.


Quick Start

Basic Implementation

from fastapi import FastAPI, Request
from google.adk.agents import BaseAgent
from adk_agui_middleware import SSEService
from adk_agui_middleware.endpoint import register_agui_endpoint
from adk_agui_middleware.data_model.config import PathConfig, RunnerConfig
from adk_agui_middleware.data_model.context import ConfigContext

# Initialize FastAPI application
app = FastAPI(title="AI Agent Service", version="1.0.0")

# Define your custom ADK agent
class MyAgent(BaseAgent):
    def __init__(self) -> None:
        super().__init__()
        self.instructions = "You are a helpful AI assistant."

# Simple context extraction
async def extract_user_id(_: object, request: Request) -> str:
    return request.headers.get("X-User-Id", "default-user")

# Create SSE service
agent = MyAgent()
sse_service = SSEService(
    agent=agent,
    config_context=ConfigContext(
        app_name="my-app",
        user_id=extract_user_id,
        session_id=lambda content, req: content.thread_id,
        event_source_response_mode=True,  # Use spec-compliant SSE by default
    ),
    runner_config=RunnerConfig(),
)

# Register endpoint at /agui
register_agui_endpoint(
    app,
    sse_service,
    path_config=PathConfig(agui_main_path="/agui"),
)

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

RunnerConfig Configuration

The RunnerConfig class manages ADK runner setup and service configuration. It provides flexible service configuration with automatic in-memory fallbacks for development and testing environments.

Default Configuration (In-Memory Services)

By default, RunnerConfig uses in-memory services, perfect for development and testing:

from adk_agui_middleware.data_model.config import RunnerConfig
from adk_agui_middleware import SSEService

# Default: Automatic in-memory services
runner_config = RunnerConfig()

sse_service = SSEService(
    agent=MyAgent(),
    config_context=config_context,
    runner_config=runner_config  # Optional: uses default if not provided
)

Custom Service Configuration

For production environments, configure custom services:

from google.adk.sessions import FirestoreSessionService
from google.adk.artifacts import GCSArtifactService
from google.adk.memory import RedisMemoryService
from google.adk.auth.credential_service import VaultCredentialService
from google.adk.agents.run_config import StreamingMode
from google.adk.agents import RunConfig

# Custom production configuration
runner_config = RunnerConfig(
    # Service configuration
    session_service=FirestoreSessionService(project_id="my-project"),
    artifact_service=GCSArtifactService(bucket_name="my-artifacts"),
    memory_service=RedisMemoryService(host="redis.example.com"),
    credential_service=VaultCredentialService(vault_url="https://vault.example.com"),

    # Disable automatic in-memory fallback for production
    use_in_memory_services=False,

    # Optional: Add ADK plugins to extend agent capabilities
    plugins=[MyCustomPlugin(), AnotherPlugin()],

    # Customize agent execution behavior
    run_config=RunConfig(
        streaming_mode=StreamingMode.SSE,
        max_iterations=50,
        timeout=300
    )
)

sse_service = SSEService(
    agent=MyAgent(),
    config_context=config_context,
    runner_config=runner_config
)

RunnerConfig Attributes

Attribute Type Default Description
use_in_memory_services bool True Automatically create in-memory services when services are None
run_config RunConfig RunConfig(streaming_mode=SSE) ADK run configuration for agent execution behavior
session_service BaseSessionService InMemorySessionService() Session service for conversation persistence
artifact_service BaseArtifactService None Artifact service for file and data management
memory_service BaseMemoryService None Memory service for agent memory management
credential_service BaseCredentialService None Credential service for authentication
plugins list[BasePlugin] None List of ADK plugins for extending agent capabilities

Configuration Examples

Development/Testing Setup:

# Uses all in-memory services automatically
runner_config = RunnerConfig()

Production Setup with Firestore:

from google.adk.sessions import FirestoreSessionService

runner_config = RunnerConfig(
    use_in_memory_services=False,
    session_service=FirestoreSessionService(
        project_id="my-project",
        database_id="my-database"
    )
)

Mixed Environment (Some Custom, Some In-Memory):

# Custom session service, auto-creates in-memory for others
runner_config = RunnerConfig(
    use_in_memory_services=True,  # Auto-create missing services
    session_service=FirestoreSessionService(project_id="my-project"),
    # artifact_service, memory_service, credential_service will be auto-created
)

Custom Agent Execution Configuration:

from google.adk.agents import RunConfig
from google.adk.agents.run_config import StreamingMode

runner_config = RunnerConfig(
    run_config=RunConfig(
        streaming_mode=StreamingMode.SSE,  # Server-Sent Events mode
        max_iterations=100,  # Maximum agent iterations
        timeout=600,  # Execution timeout in seconds
        enable_thinking=True,  # Enable thinking/reasoning mode
    )
)

Advanced Configuration with Config Class

from fastapi import FastAPI, Request
from google.adk.agents import BaseAgent
from adk_agui_middleware import SSEService
from adk_agui_middleware.endpoint import (
    register_agui_endpoint,
    register_agui_history_endpoint,
    register_state_endpoint
)
from adk_agui_middleware.data_model.config import HistoryConfig, RunnerConfig, StateConfig
from adk_agui_middleware.data_model.context import ConfigContext, HandlerContext
from adk_agui_middleware.service.history_service import HistoryService
from adk_agui_middleware.service.state_service import StateService
from ag_ui.core import RunAgentInput

class MyAgent(BaseAgent):
    def __init__(self):
        super().__init__()
        self.instructions = "You are a helpful AI assistant."

class AGUIConfig:
    @staticmethod
    async def extract_user_id(request: Request) -> str:
        return request.headers.get("x-user-id", "default-user")

    @staticmethod
    async def extract_session_id(request: Request) -> str:
        return request.path_params.get("thread_id", "default-session")

    @staticmethod
    async def extract_initial_state(content: RunAgentInput, request: Request) -> dict:
        return {"frontend_state": content.state or {}}

    def create_sse_service(self) -> SSEService:
        return SSEService(
            agent=MyAgent(),
            config_context=ConfigContext(
                app_name="my-app",
                user_id=lambda content, req: self.extract_user_id(req),
                session_id=lambda content, req: content.thread_id,
                extract_initial_state=self.extract_initial_state,
                event_source_response_mode=True,
            ),
            # Optional: Add custom handlers
            # handler_context=HandlerContext(
            #     translate_handler=MyTranslateHandler,
            #     adk_event_handler=MyADKEventHandler,
            #     in_out_record_handler=MyInOutHandler,
            # ),
        )

    def create_history_service(self) -> HistoryService:
        return HistoryService(
            HistoryConfig(
                app_name="my-app",
                user_id=self.extract_user_id,
                session_id=self.extract_session_id,
            )
        )

    def create_state_service(self) -> StateService:
        return StateService(
            StateConfig(
                app_name="my-app",
                user_id=self.extract_user_id,
                session_id=self.extract_session_id,
            )
        )

# Initialize FastAPI and services
app = FastAPI(title="AI Agent Service", version="1.0.0")
config = AGUIConfig()

# Register all endpoints
register_agui_endpoint(app, config.create_sse_service())
register_agui_history_endpoint(app, config.create_history_service())
register_state_endpoint(app, config.create_state_service())

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

Custom Event Handlers

from collections.abc import AsyncGenerator
from adk_agui_middleware.base_abc.handler import (
    BaseADKEventHandler,
    BaseAGUIEventHandler,
    BaseInOutHandler,
    BaseTranslateHandler,
)
from adk_agui_middleware.data_model.common import InputInfo
from adk_agui_middleware.data_model.event import TranslateEvent
from ag_ui.core import BaseEvent
from google.adk.events import Event

class MyADKEventHandler(BaseADKEventHandler):
    def __init__(self, input_info: InputInfo | None):
        self.input_info = input_info

    async def process(self, event: Event) -> AsyncGenerator[Event | None]:
        # Optionally filter or transform ADK events before translation
        yield event

class MyTranslateHandler(BaseTranslateHandler):
    def __init__(self, input_info: InputInfo | None):
        self.input_info = input_info

    async def translate(self, adk_event: Event) -> AsyncGenerator[TranslateEvent]:
        # Optionally emit AGUI events or replace/retune ADK events
        # yield TranslateEvent(agui_event=SomeAGUIEvent())
        # yield TranslateEvent(is_retune=True)
        yield TranslateEvent()

class MyAGUIEventHandler(BaseAGUIEventHandler):
    def __init__(self, input_info: InputInfo | None):
        self.input_info = input_info

    async def process(self, event: BaseEvent) -> AsyncGenerator[BaseEvent | None]:
        # Optionally filter or transform AGUI events after translation
        yield event

class MyInOutHandler(BaseInOutHandler):
    async def input_record(self, input_info: InputInfo) -> None:
        # Record incoming context for audit/debugging
        pass

    async def output_record(self, agui_event: BaseEvent) -> None:
        # Record outgoing AGUI events (pre-encoding)
        pass

    async def output_catch_and_change(self, agui_event: BaseEvent) -> BaseEvent:
        # Optionally modify the event before encoding to SSE
        return agui_event

Examples

Explore ready-to-run usage patterns in the examples folder. Each example is self-contained with comments and can be launched via uvicorn.

  • Minimal SSE: uvicorn examples.01_minimal_sse.app:app --reload
  • Context + History + State: uvicorn examples.02_context_history.app:app --reload
  • Advanced pipeline (I/O recorder + input preprocessing): uvicorn examples.03_advanced_pipeline.app:app --reload
  • Lifecycle handlers (full hook set): uvicorn examples.04_lifecycle_handlers.app:app --reload

See examples/README.md for details.

HandlerContext Lifecycle

HandlerContext configures pluggable hooks for the request lifecycle. Instances are constructed per-request (except session lock, which is created with SSEService) and invoked at defined stages.

  • session_lock_handler (created at SSEService init)
    • When: Before running the request stream and in finally cleanup
    • Used by: SSEService.runner (lock/unlock, generate locked error event)
  • in_out_record_handler
    • When: Immediately after building InputInfo (input_record), then for every emitted SSE event (output_record, output_catch_and_change)
    • Used by: SSEService.get_runner and SSEService.event_generator
  • adk_event_handler
    • When: On each ADK event before translation
    • Used by: RunningHandler._process_events_with_handler for ADK streams
  • adk_event_timeout_handler
    • When: Surrounds ADK event processing with a timeout; on TimeoutError, yields fallback events
    • Used by: RunningHandler._process_events_with_handler(enable_timeout=True)
  • translate_handler
    • When: Before default translation; can yield AGUI events, request retune, or replace the ADK event
    • Used by: RunningHandler._translate_adk_to_agui_async
  • agui_event_handler
    • When: On each AGUI event after translation, before encoding
    • Used by: RunningHandler._process_events_with_handler for AGUI streams
  • agui_state_snapshot_handler
    • When: Once at the end to transform final state before creating a StateSnapshotEvent
    • Used by: RunningHandler.create_state_snapshot_event

API Reference

Main AGUI Endpoint

Register with register_agui_endpoint(app, sse_service)

Method Endpoint Description Request Body Response Type
POST / Execute agent with streaming response RunAgentInput EventSourceResponse

History Endpoints

Register with register_agui_history_endpoint(app, history_service)

Method Endpoint Description Request Body Response Type
GET /thread/list List user's conversation threads - List[Dict[str, str]]
DELETE /thread/{thread_id} Delete conversation thread - Dict[str, str]
GET /message_snapshot/{thread_id} Get conversation history - MessagesSnapshotEvent

State Management Endpoints

Register with register_state_endpoint(app, state_service)

Method Endpoint Description Request Body Response Type
GET /state_snapshot/{thread_id} Get session state snapshot - StateSnapshotEvent
PATCH /state/{thread_id} Update session state List[JSONPatch] Dict[str, str]

Event Types

The middleware supports comprehensive event translation between ADK and AGUI formats:

AGUI Event Types

  • TEXT_MESSAGE_START - Begin streaming text response
  • TEXT_MESSAGE_CONTENT - Streaming text content chunk
  • TEXT_MESSAGE_END - Complete streaming text response
  • TOOL_CALL - Agent tool/function invocation
  • TOOL_RESULT - Tool execution result
  • STATE_DELTA - Incremental state update
  • STATE_SNAPSHOT - Complete state snapshot
  • RUN_STARTED - Agent execution began
  • RUN_FINISHED - Agent execution completed
  • ERROR - Error event with details

License

This project is licensed under the MIT License - see the LICENSE file for details.

Contributing

Please read CONTRIBUTING.md for details on our code of conduct and the process for submitting pull requests.

Security

See SECURITY.md for our security policy and vulnerability reporting process.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

adk_agui_middleware-1.5.1.tar.gz (81.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

adk_agui_middleware-1.5.1-py3-none-any.whl (96.1 kB view details)

Uploaded Python 3

File details

Details for the file adk_agui_middleware-1.5.1.tar.gz.

File metadata

  • Download URL: adk_agui_middleware-1.5.1.tar.gz
  • Upload date:
  • Size: 81.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for adk_agui_middleware-1.5.1.tar.gz
Algorithm Hash digest
SHA256 5ab6dee0923594e30eea1d14afa88da128a852b14952049387e4c3e3699a73df
MD5 dc465ac9965a017f487787f9a392b4b4
BLAKE2b-256 9f1bb3d1da5935652f7b04189132af43790acd784007fd18b9618c1e40caedb8

See more details on using hashes here.

Provenance

The following attestation bundles were made for adk_agui_middleware-1.5.1.tar.gz:

Publisher: publish.yml on trendmicro/adk-agui-middleware

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file adk_agui_middleware-1.5.1-py3-none-any.whl.

File metadata

File hashes

Hashes for adk_agui_middleware-1.5.1-py3-none-any.whl
Algorithm Hash digest
SHA256 0be8026c0648005933a4d6543ee35a60deae529c1bf6f57ef7e8f1fb74816355
MD5 15140cc5acce7713364692e3fecad783
BLAKE2b-256 0e2a91b97c28f391d69b8a73d79dbd6f23db447e96ad4cc500d5fd0be3dd74bc

See more details on using hashes here.

Provenance

The following attestation bundles were made for adk_agui_middleware-1.5.1-py3-none-any.whl:

Publisher: publish.yml on trendmicro/adk-agui-middleware

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page