Supabase 기반 이벤트/작업 폴링으로 A2A AgentExecutor를 실행하는 SDK
Project description
ProcessGPT Agent Framework (A2A SDK 연동 가이드)
이 저장소는 Supabase 기반의 프로세스 작업(Todolist)을 폴링하고, A2A 규격 이벤트를 통해 작업 상태/결과를 기록하는 경량 에이전트 서버 프레임워크입니다. 최소 구현으로 빠르게 통합하고, 필요하면 커스터마이즈할 수 있습니다.
- 런타임: Python 3.10+
- 저장소 의존: Supabase(Postgres) + 제공된 RPC/테이블
- 이벤트 규격: A2A
TaskStatusUpdateEvent/TaskArtifactUpdateEvent
아키텍처 한눈에 보기
flowchart LR
subgraph Supabase
A[Todolist] --- B[Events]
A -.RPC.-> C[(save_task_result)]
D[(fetch_pending_task)] --> A
end
subgraph Agent Server
E[ProcessGPTAgentServer] -->|polls| D
E --> F[ProcessGPTRequestContext]
E --> G[ProcessGPTEventQueue]
H[Your AgentExecutor]
F --> H
H -->|A2A Events| G
end
G -->|TaskStatusUpdateEvent| B
G -->|TaskArtifactUpdateEvent| A
- 서버는 주기적으로 Todolist를 폴링하여 새 작업을 가져옵니다.
- 사용자 구현
AgentExecutor가 요청을 처리하고, A2A 이벤트를 큐에 전달합니다. - 이벤트 큐는 상태 이벤트를
events테이블에, 아티팩트 이벤트를todolist.output에 저장합니다.
엔드-투-엔드 시퀀스(정상 흐름)
sequenceDiagram
participant SB as Supabase
participant SRV as ProcessGPTAgentServer
participant CTX as RequestContext
participant EXE as Your AgentExecutor
participant EQ as ProcessGPTEventQueue
SRV->>SB: RPC fetch_pending_task
SB-->>SRV: todolist row
SRV->>CTX: prepare_context()
SRV->>EXE: execute(context, event_queue)
EXE->>EQ: TaskStatusUpdateEvent (state=working)
EQ->>SB: INSERT events (data=payload)
EXE->>EQ: TaskArtifactUpdateEvent (lastChunk=true, artifact)
EQ->>SB: RPC save_task_result (output=payload, p_final=true)
SRV->>EQ: task_done()
EQ->>SB: INSERT events (crew_completed)
Human-in-the-loop(HITL) 시퀀스
sequenceDiagram
participant EXE as Your AgentExecutor
participant EQ as ProcessGPTEventQueue
participant SB as Supabase
participant UI as Operator UI
EXE->>EQ: TaskStatusUpdateEvent (state=input_required)
Note right of EXE: event_type 전송 생략 가능
EQ->>SB: INSERT events (event_type=human_asked, data=질문 payload)
UI->>SB: INSERT events (event_type=human_response, data=사용자 응답)
EXE-->>SB: 선택: fetch_human_response_sync(job_id)
친절한 시작 가이드(5분 컷)
- 가상환경 + 설치
uv venv --python 3.11.9
uv pip install -r requirements.txt
source .venv/Scripts/activate
- .env 준비
- SUPABASE_URL, SUPABASE_KEY 필수
- ENV=dev (개발 환경에서 권장)
- 샘플 서버 실행
python sample_server/minimal_server.py | cat
- 이벤트 전송 패턴 이해
- 진행 상태:
TaskStatusUpdateEvent(state=working)+new_agent_text_message(text, contextId, taskId) - 사용자 입력 요청(HITL):
TaskState.input_required만 보내면 event_type은 자동human_asked - 결과물:
TaskArtifactUpdateEvent(lastChunk=True)+new_text_artifact(name, desc, text)
- 저장물 확인 포인트
events테이블: data에는 래퍼 제거된 순수 payload 저장todolist.output: 순수 payload 저장, 최종 청크면p_final=true
샘플 서버 실행 코드 (친절 버전)
A. 가장 간단한 서버(minimal)
# sample_server/minimal_server.py
import os
import sys
import asyncio
from dotenv import load_dotenv
# 패키지 루트 경로 추가 (샘플에서만)
sys.path.append(os.path.dirname(os.path.dirname(__file__)))
from processgpt_agent_sdk.processgpt_agent_framework import ProcessGPTAgentServer
from sample_server.minimal_executor import MinimalExecutor
async def main():
load_dotenv()
# agent_type은 Supabase의 todolist.agent_orch와 매칭되어야 함
server = ProcessGPTAgentServer(agent_executor=MinimalExecutor(), agent_type="crewai-action")
server.polling_interval = 3 # 초
await server.run()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
pass
- Windows
python sample_server/minimal_server.py
- macOS/Linux
python3 sample_server/minimal_server.py
B. CLI 옵션이 있는 서버 예시
# sample_server/crew_ai_dr_agent_server.py
import os
import sys
import asyncio
import click
from dotenv import load_dotenv
# 패키지 루트 경로 추가 (샘플에서만)
sys.path.append(os.path.dirname(os.path.dirname(__file__)))
from processgpt_agent_sdk.processgpt_agent_framework import ProcessGPTAgentServer
from sample_server.crew_ai_dr_agent_executor import CrewAIDeepResearchAgentExecutor
load_dotenv()
@click.command()
@click.option('--agent-type', default='crew-ai-dr', help='Agent type identifier')
@click.option('--polling-interval', default=5, help='Polling interval in seconds')
def cli_main(agent_type: str, polling_interval: int):
"""ProcessGPT Agent Server for CrewAI Deep Research Agent"""
agent_executor = CrewAIDeepResearchAgentExecutor()
server = ProcessGPTAgentServer(agent_executor=agent_executor, agent_type=agent_type)
server.polling_interval = polling_interval
print(f"Starting ProcessGPT Agent Server...")
print(f"Agent Type: {agent_type}")
print(f"Polling Interval: {polling_interval} seconds")
print("Press Ctrl+C to stop")
try:
asyncio.run(server.run())
except KeyboardInterrupt:
print("\nShutting down server...")
server.stop()
except Exception as e:
print(f"Server error: {e}")
sys.exit(1)
if __name__ == "__main__":
cli_main()
- 실행
- Windows
python sample_server/crew_ai_dr_agent_server.py --agent-type crew-ai-dr --polling-interval 3
- macOS/Linux
python3 sample_server/crew_ai_dr_agent_server.py --agent-type crew-ai-dr --polling-interval 3
- Windows
최소 예시(익스큐터)
# sample_server/minimal_executor.py (요약)
import asyncio
import json
from typing_extensions import override
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.events import EventQueue
from a2a.types import TaskStatusUpdateEvent, TaskState, TaskArtifactUpdateEvent
from a2a.utils import new_agent_text_message, new_text_artifact
class MinimalExecutor(AgentExecutor):
@override
async def execute(self, context: RequestContext, event_queue: EventQueue) -> None:
row = context.get_context_data()["row"]
context_id = row.get("root_proc_inst_id") or row.get("proc_inst_id")
task_id = row.get("id")
payload = {"order_process_activity_order_request_form": {"orderer_name": "안치윤","product_name": "금형세트","order_quantity": "50"}}
# 1) 진행 상태 이벤트
event_queue.enqueue_event(
TaskStatusUpdateEvent(
status={
"state": TaskState.working,
"message": new_agent_text_message(
json.dumps(payload, ensure_ascii=False),
context_id,
task_id,
),
},
final=False,
contextId=context_id,
taskId=task_id,
metadata={"crew_type": "action", "event_type": "task_started", "job_id": "job-demo-0001"},
)
)
await asyncio.sleep(0.1)
# 2) HITL: 사용자 입력 요청 (event_type 생략해도 자동 human_asked)
event_queue.enqueue_event(
TaskStatusUpdateEvent(
status={
"state": TaskState.input_required,
"message": new_agent_text_message(
json.dumps(payload, ensure_ascii=False),
context_id,
task_id,
),
},
final=True,
contextId=context_id,
taskId=task_id,
metadata={"crew_type": "action", "job_id": "job-demo-0001"},
)
)
await asyncio.sleep(0.1)
# 3) 최종 아티팩트
artifact = new_text_artifact(
name="current_result",
description="Result of request to agent.",
text=json.dumps(payload, ensure_ascii=False),
)
event_queue.enqueue_event(
TaskArtifactUpdateEvent(
artifact=artifact,
lastChunk=True,
contextId=context_id,
taskId=task_id,
)
)
서버가 해주는 일(정확한 규칙)
- 메시지/아티팩트 래퍼 제거 →
parts[0].text|content|data→root.*→top-level순서로 텍스트만 추출 후 JSON 파싱하여 저장 TaskStatusUpdateEvent수신 시status.state == input_required면event_type=human_asked로 저장(명시값보다 우선)- 그 외 상태는
metadata.event_type저장(없으면 NULL)
TaskArtifactUpdateEvent수신 시final또는lastChunk가 참이면 최종 저장(p_final=true)
체크리스트(실패 없는 통합을 위한)
- .env에
SUPABASE_URL,SUPABASE_KEY설정했는가? -
requirements.txt설치 완료했는가? - Supabase에서 제공 SQL(
database_schema.sql,function.sql) 적용했는가? - 익스큐터에서
contextId,taskId를 todolist의proc_inst_id,id로 매핑했는가? - 상태 이벤트는
new_agent_text_message로 만들고 있는가? - 최종 아티팩트는
new_text_artifact+lastChunk=True로 보내고 있는가? - HITL 요청은
TaskState.input_required만 보내고 있는가?(event_type 생략 가능)
트러블슈팅
- 이벤트 미기록
- Supabase URL/Key 재확인, 테이블/권한 확인
- 최종 아티팩트가 최종으로 저장되지 않음
- 익스큐터에서
lastChunk=True또는final=True로 보냈는지 확인
- 익스큐터에서
- payload가 래퍼와 같이 저장됨
- 메시지에
parts[0].text또는parts[0].root.text에 JSON 문자열이 들어있는지 확인
- 메시지에
- 휴먼인더루프 이벤트 타입 미지정
input_required상태면 자동human_asked로 저장됨
레퍼런스
- 이벤트 유틸:
new_agent_text_message,new_text_artifact - 서버 진입점:
ProcessGPTAgentServer.run() - 컨텍스트 확장:
ProcessGPTRequestContext.prepare_context() - 이벤트 저장:
ProcessGPTEventQueue.enqueue_event(event)→database.record_event/save_task_result - 휴먼 응답 조회:
database.fetch_human_response_sync(job_id)
라이선스
해당 저장소의 라이선스 정책을 따릅니다.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file process_gpt_agent_sdk-0.3.10.tar.gz.
File metadata
- Download URL: process_gpt_agent_sdk-0.3.10.tar.gz
- Upload date:
- Size: 9.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
36781e2a89c1402abd6f1085aff1f5651bec0e403ea6fe13b6216d4fb1a5c96d
|
|
| MD5 |
81132abf268d602b7200c923345f84e1
|
|
| BLAKE2b-256 |
3409445fb422c17e57bdd02b2925b56105f83436b43a5ab8b6c560c7c1a0cfc1
|
File details
Details for the file process_gpt_agent_sdk-0.3.10-py3-none-any.whl.
File metadata
- Download URL: process_gpt_agent_sdk-0.3.10-py3-none-any.whl
- Upload date:
- Size: 9.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
dfc1435c7be8f2792e7d372c4057037c0ae1787cfba2f0bec2aa0742b8577297
|
|
| MD5 |
f85e6027209aac11034aeb86c0ee7328
|
|
| BLAKE2b-256 |
35e6362d41e7dd4b12b861c3c5a40938113b89d2bf36c3e38f3018c16d3b77ab
|