Skip to main content

Supabase 기반 이벤트/작업 폴링으로 A2A AgentExecutor를 실행하는 SDK

Project description

📘 ProcessGPT Agent SDK – README

1. 이게 뭐하는 건가요?

이 SDK는 ProcessGPT 에이전트 서버를 만들 때 필요한 공통 기능을 제공합니다.

  • DB에서 작업(todo) 폴링 → 처리할 일감 가져오기
  • 컨텍스트 준비 (사용자 정보, 폼 정의, MCP 설정 등 자동으로 조회)
  • 다양한 에이전트 오케스트레이션(A2A) 과 호환
  • 이벤트(Event) 전송 규격 통일화 → 결과를 DB에 안전하게 저장

👉 쉽게 말하면: 여러 종류의 AI 에이전트를 같은 규칙으로 실행/저장/호출할 수 있게 해주는 통합 SDK 입니다.


2. 아키텍처 다이어그램

flowchart TD
    subgraph DB[Postgres/Supabase]
        T[todolist]:::db
        E[events]:::db
    end

    subgraph SDK
        P[Polling\n(fetch_pending_task)] --> C[Context 준비\n(fetch_context_bundle 등)]
        C --> X[Executor\n(MinimalExecutor)]
        X -->|TaskStatusUpdateEvent| E
        X -->|TaskArtifactUpdateEvent| T
    end

    classDef db fill=#f2f2f2,stroke=#333,stroke-width=1px;
  • todolist: 각 작업(Task)의 진행 상태, 결과물 저장
  • events: 실행 중간에 발생한 이벤트 로그 저장
  • SDK는 두 테이블을 자동으로 연결해 줍니다.

3. A2A 타입과 이벤트 종류

A2A 타입 (2가지)

A2A 타입 설명 매칭 테이블
TaskStatusUpdateEvent 작업 상태 업데이트 events 테이블
TaskArtifactUpdateEvent 작업 결과물 업데이트 todolist 테이블

(v1.0) Enum 변경사항: snake_caseSCREAMING_SNAKE_CASE

a2a-sdk v1.0부터 A2A 스펙(ProtoJSON) 정합성을 위해 모든 enum 값이 대문자 스네이크 케이스로 표준화되었습니다.

  • TaskState

    • TaskState.submittedTaskState.TASK_STATE_SUBMITTED
    • TaskState.workingTaskState.TASK_STATE_WORKING
    • TaskState.completedTaskState.TASK_STATE_COMPLETED
    • TaskState.failedTaskState.TASK_STATE_FAILED
    • TaskState.canceledTaskState.TASK_STATE_CANCELED
    • TaskState.input_requiredTaskState.TASK_STATE_INPUT_REQUIRED
    • TaskState.auth_requiredTaskState.TASK_STATE_AUTH_REQUIRED
    • TaskState.rejectedTaskState.TASK_STATE_REJECTED
    • (추가) TaskState.TASK_STATE_UNSPECIFIED
  • Role

    • Role.userRole.ROLE_USER
    • Role.agentRole.ROLE_AGENT
    • (추가) Role.ROLE_UNSPECIFIED

Event Type (4가지)

Event Type Python 클래스 저장 테이블 설명
task_started TaskStatusUpdateEvent events 작업 시작 상태
task_working TaskStatusUpdateEvent events 작업 진행 중 상태
task_completed TaskArtifactUpdateEvent todolist 작업 완료 및 결과물 저장
task_error TaskStatusUpdateEvent events 작업 오류 발생

👉 A2A 타입 2가지가 핵심이며, 각각 eventstodolist 테이블에 매칭됩니다. Event Type 4가지로 세부 상태를 구분합니다.


4. 사용 예시

이 SDK는 “하나의 완제품 서비스”가 아니라, 내 서비스에 붙여서 사용하는 프레임워크/라이브러리입니다.

아래 예시는 한 프로세스에서 다음을 동시에 제공합니다.

  • 프로세스(폴링): await server.run()로 DB에서 todo를 가져와 처리
  • 채팅(SSE): /chat/stream 엔드포인트로 요청을 받아 Message-only로 응답 + chats에 저장

4.1 서버 구성 예시 (폴링 + SSE 함께)

import asyncio

import uvicorn
from starlette.applications import Starlette

from processgpt_agent_sdk import ProcessGPTAgentServer
from my_service.my_executor import MyExecutor


async def main():
    server = ProcessGPTAgentServer(
        agent_executor=MyExecutor(),
        agent_type="langchain-react",
    )

    app = Starlette()
    server.mount_chat_sse(app, path="/chat/stream")

    uvicorn_server = uvicorn.Server(
        uvicorn.Config(app, host="127.0.0.1", port=8010, log_level="info")
    )

    await asyncio.gather(
        server.run(),
        uvicorn_server.serve(),
    )


if __name__ == "__main__":
    asyncio.run(main())

4.2 Executor 구현 예시 (채팅 저장 payload 커스텀)

import os

from a2a.helpers import new_text_message
from a2a.helpers import new_text_status_update_event
from a2a.types import Role
from a2a.types import TaskState
import litellm


class MyExecutor(...):
    async def execute(self, context, event_queue):
        # (선택) 중간 스트리밍(토큰/툴 이벤트)을 SSE로 흘리고 싶다면,
        # A2A 표준 `TaskStatusUpdateEvent.metadata`에 "JSON 객체"를 담아 enqueue 하세요.
        # 프레임워크가 이를 SSE `event: message`의 `data`로 변환해 발행합니다.
        #
        # 예: LiteLLM 프록시(OpenAI 호환) 스트리밍 → token 이벤트로 변환
        model = os.environ.get("LLM_MODEL")
        proxy_url = (os.environ.get("LLM_PROXY_URL") or "").rstrip("/")
        api_key = os.environ.get("LLM_PROXY_API_KEY")
        api_base = proxy_url if proxy_url.endswith("/v1") else f"{proxy_url}/v1"

        # 채팅(SSE) 요청이면 Message-only로 응답하고,
        # Message.metadata.chat_payload를 chats.messages에 그대로 저장합니다.
        if (context.metadata or {}).get("request_kind") == "chat":
            stream = await litellm.acompletion(
                model=model,
                messages=[
                    {"role": "system", "content": "You are a helpful assistant. Reply in Korean."},
                    {"role": "user", "content": context.get_user_input()},
                ],
                temperature=0,
                stream=True,
                api_base=api_base,
                api_key=api_key,
            )

            full = ""
            async for chunk in stream:
                try:
                    token = chunk.choices[0].delta.content
                except Exception:
                    token = None
                if not token:
                    continue

                full += token
                tok_evt = new_text_status_update_event(
                    task_id=context.task_id,
                    context_id=context.context_id,
                    state=TaskState.TASK_STATE_WORKING,
                    text="",
                )
                tok_evt.metadata.update({"type": "token", "content": token})
                await event_queue.enqueue_event(tok_evt)

            msg = new_text_message(text=full, role=Role.ROLE_AGENT)
            msg.metadata.update(
                {
                    "chat_payload": {
                        # 이 payload는 외부 서비스가 원하는 형태로 자유롭게 구성하세요.
                        "role": "assistant",
                        "content": full,
                        # (선택) conversation_id/tenant_id/user_uid 등은
                        # 외부 서비스가 표준 필드로 실어 보내거나, 별도 저장 로직에서 주입하세요.
                    }
                }
            )
            await event_queue.enqueue_event(msg)
            return

        # 그 외(폴링)는 Task lifecycle 패턴으로 처리 (Task → status/artifact)
        ...

4.3 채팅(SSE) 요청 예시

요청 바디 예시:

curl -N -X POST http://127.0.0.1:8010/chat/stream \
  -H 'content-type: application/json' \
  -d '{"message":"hello","conversation_id":"conv-1","tenant_id":"","user_uid":"u1"}'

4.4 설치(옵션: SSE)

채팅(SSE)을 포함해 사용하려면 extras가 필요합니다.

pip install "process-gpt-agent-sdk[sse]"

참고로, 레포에는 빠르게 확인할 수 있는 샘플(sample_server/minimal_server.py, sample_server/minimal_executor.py)도 포함되어 있습니다.


5. ⚠️ JSON 직렬화 주의 (str() 절대 금지)

반드시 json.dumps()로 직렬화해야 합니다.

  • ❌ 이렇게 하면 안됨:

    text = str({"key": "value"})  # Python dict string → JSON 아님
    

    DB에 "'{key: value}'" 꼴로 문자열 저장됨 → 파싱 실패

  • ✅ 이렇게 해야 함:

    text = json.dumps({"key": "value"}, ensure_ascii=False)
    

    DB에 {"key": "value"} JSON 저장됨 → 파싱 성공

👉 SDK는 내부에서 json.loads로 재파싱하기 때문에, 표준 JSON 문자열이 아니면 무조건 문자열로만 남습니다.


6. 사용법 (내 코드에 붙이기)

핵심은 사용자 AgentExecutor.execute()요청 경로에 따라 아래 둘 중 하나를 선택하는 것입니다.

  • 프로세스(폴링) 경로: Task lifecycle (Task → status/artifact)
    • 첫 이벤트는 반드시 Task
    • 이후 TaskStatusUpdateEvent / TaskArtifactUpdateEvent
  • 채팅(SSE) 경로: Message-only (Message 1개)
    • 정확히 1개의 Message
    • status/artifact/Task를 섞지 않음

6.1 프로세스(폴링)만 실행

from processgpt_agent_sdk import ProcessGPTAgentServer

server = ProcessGPTAgentServer(agent_executor=MyExecutor(), agent_type="crewai-action")
await server.run()

6.2 채팅(SSE) 엔드포인트 추가

SSE를 쓰려면 extras 설치가 필요합니다.

pip install "process-gpt-agent-sdk[sse]"
from starlette.applications import Starlette

from processgpt_agent_sdk import ProcessGPTAgentServer

server = ProcessGPTAgentServer(agent_executor=MyExecutor(), agent_type="crewai-action")
app = Starlette()
server.mount_chat_sse(app, path="/chat/stream")  # POST /chat/stream

요청 바디 예시:

{
  "message": "안녕",
  "tenant_id": "t1",
  "user_uid": "u1",
  "user_email": "user@example.com",
  "user_name": "홍길동",
  "user_jwt": "",
  "conversation_id": "conv-1",
  "file": null,
  "files": [],
  "file_count": 0,
  "stream": true,
  "metadata": {}
}

6.3 폴링 + SSE를 한 프로세스에서 함께 실행 (권장 예시)

import asyncio

import uvicorn
from starlette.applications import Starlette

from processgpt_agent_sdk import ProcessGPTAgentServer


async def main():
    server = ProcessGPTAgentServer(agent_executor=MyExecutor(), agent_type="crewai-action")

    app = Starlette()
    server.mount_chat_sse(app, path="/chat/stream")

    uvicorn_server = uvicorn.Server(
        uvicorn.Config(app, host="127.0.0.1", port=8010, log_level="info")
    )

    await asyncio.gather(
        server.run(),
        uvicorn_server.serve(),
    )


if __name__ == "__main__":
    asyncio.run(main())

운영 환경에서는 폴링 프로세스HTTP API 프로세스를 분리 운영하는 경우도 많습니다.

7. 버전업

  • ./release.sh 버전
  • 오류 발생시 : python -m ensurepip --upgrade

8. integrations 모듈 안내

  • 스토리지 업로드 유틸은 processgpt_agent_sdk.integrations.storage 로 분리되었습니다.
  • 기존 processgpt_agent_sdk.utils.upload_file_to_bucket, upload_files_to_bucket 는 하위호환용으로 유지되지만 deprecated 입니다.
  • 신규 코드는 아래 경로를 사용하세요:
    • from processgpt_agent_sdk.integrations.storage import upload_file_to_bucket, upload_files_to_bucket

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

process_gpt_agent_sdk-0.4.18.tar.gz (30.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

process_gpt_agent_sdk-0.4.18-py3-none-any.whl (34.8 kB view details)

Uploaded Python 3

File details

Details for the file process_gpt_agent_sdk-0.4.18.tar.gz.

File metadata

  • Download URL: process_gpt_agent_sdk-0.4.18.tar.gz
  • Upload date:
  • Size: 30.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.12

File hashes

Hashes for process_gpt_agent_sdk-0.4.18.tar.gz
Algorithm Hash digest
SHA256 71aace9c50b9f7ea520d8d96272efb50371febdac7e312377cc51c70147f17f4
MD5 55b00e8783069c90519a69174dcbc85b
BLAKE2b-256 0dc78d3e41abea51f4ceff17c0d5c9b156e1b0036c34079248d7bcf525be94ab

See more details on using hashes here.

File details

Details for the file process_gpt_agent_sdk-0.4.18-py3-none-any.whl.

File metadata

File hashes

Hashes for process_gpt_agent_sdk-0.4.18-py3-none-any.whl
Algorithm Hash digest
SHA256 eb780ac4d44234375e2b0d9b3c59b5870f9eefd4040b550a6b229fef77b71a9c
MD5 b6fadc347836e0f3b5c29ccb14788c78
BLAKE2b-256 4e1973895ebba601331ae00d747a1c3768721cfeb9113d4ab82c91a89597ae4c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page