Skip to main content

Supabase 기반 이벤트/작업 폴링으로 A2A AgentExecutor를 실행하는 SDK

Project description

📘 ProcessGPT Agent SDK – README

1. 이게 뭐하는 건가요?

이 SDK는 ProcessGPT 에이전트 서버를 만들 때 필요한 공통 기능을 제공합니다.

  • DB에서 작업(todo) 폴링 → 처리할 일감 가져오기
  • 컨텍스트 준비 (사용자 정보, 폼 정의, MCP 설정 등 자동으로 조회)
  • 다양한 에이전트 오케스트레이션(A2A) 과 호환
  • 이벤트(Event) 전송 규격 통일화 → 결과를 DB에 안전하게 저장

👉 쉽게 말하면: 여러 종류의 AI 에이전트를 같은 규칙으로 실행/저장/호출할 수 있게 해주는 통합 SDK 입니다.


2. 아키텍처 다이어그램

flowchart TD
    subgraph DB[Postgres/Supabase]
        T[todolist]:::db
        E[events]:::db
    end

    subgraph SDK
        P[Polling\n(fetch_pending_task)] --> C[Context 준비\n(fetch_context_bundle 등)]
        C --> X[Executor\n(MinimalExecutor)]
        X -->|TaskStatusUpdateEvent| E
        X -->|TaskArtifactUpdateEvent| T
    end

    classDef db fill=#f2f2f2,stroke=#333,stroke-width=1px;
  • todolist: 각 작업(Task)의 진행 상태, 결과물 저장
  • events: 실행 중간에 발생한 이벤트 로그 저장
  • SDK는 두 테이블을 자동으로 연결해 줍니다.

3. A2A 타입과 이벤트 종류

A2A 타입 (2가지)

A2A 타입 설명 매칭 테이블
TaskStatusUpdateEvent 작업 상태 업데이트 events 테이블
TaskArtifactUpdateEvent 작업 결과물 업데이트 todolist 테이블

Event Type (4가지)

Event Type Python 클래스 저장 테이블 설명
task_started TaskStatusUpdateEvent events 작업 시작 상태
task_working TaskStatusUpdateEvent events 작업 진행 중 상태
task_completed TaskArtifactUpdateEvent todolist 작업 완료 및 결과물 저장
task_error TaskStatusUpdateEvent events 작업 오류 발생

👉 A2A 타입 2가지가 핵심이며, 각각 eventstodolist 테이블에 매칭됩니다. Event Type 4가지로 세부 상태를 구분합니다.


4. 미니멀 예제 (기본 사용법)

minimal_executor.py

class MinimalExecutor(AgentExecutor):
    async def execute(self, context: RequestContext, event_queue: EventQueue):
        # 1) 입력 가져오기
        query = context.get_user_input()
        print("User Query:", query)

        # 2) 상태 이벤트 (events 테이블 저장)
        payload = {"demo": "hello world"}
        event_queue.enqueue_event(
            TaskStatusUpdateEvent(
                status={
                    "state": TaskState.working,
                    "message": new_agent_text_message(
                        json.dumps(payload, ensure_ascii=False),  # ⚠️ str() 쓰지말고 반드시 json.dumps!
                        context.get_context_data()["row"]["proc_inst_id"],
                        context.get_context_data()["row"]["id"],
                    ),
                },
                contextId=context.get_context_data()["row"]["proc_inst_id"],
                taskId=context.get_context_data()["row"]["id"],
                metadata={"crew_type": "action", "event_type": "task_started"},
            )
        )

        # 3) 최종 아티팩트 이벤트 (todolist 테이블 저장)
        artifact = new_text_artifact(
            name="result",
            description="Demo Result",
            text=json.dumps(payload, ensure_ascii=False),  # ⚠️ 여기서도 str() 금지!
        )
        event_queue.enqueue_event(
            TaskArtifactUpdateEvent(
                artifact=artifact,
                lastChunk=True,
                contextId=context.get_context_data()["row"]["proc_inst_id"],
                taskId=context.get_context_data()["row"]["id"],
            )
        )

minimal_server.py

async def main():
    load_dotenv()
    server = ProcessGPTAgentServer(
        agent_executor=MinimalExecutor(),
        agent_type="crewai-action"  # 오케스트레이터 타입
    )
    await server.run()

👉 실행하면 SDK가 자동으로:

  1. DB에서 작업 하나 가져오기 (fetch_pending_task)
  2. 컨텍스트 준비 (폼/유저/MCP 조회)
  3. Executor 실행 → 이벤트/결과 DB에 저장

5. ⚠️ JSON 직렬화 주의 (str() 절대 금지)

반드시 json.dumps()로 직렬화해야 합니다.

  • ❌ 이렇게 하면 안됨:

    text = str({"key": "value"})  # Python dict string → JSON 아님
    

    DB에 "'{key: value}'" 꼴로 문자열 저장됨 → 파싱 실패

  • ✅ 이렇게 해야 함:

    text = json.dumps({"key": "value"}, ensure_ascii=False)
    

    DB에 {"key": "value"} JSON 저장됨 → 파싱 성공

👉 SDK는 내부에서 json.loads로 재파싱하기 때문에, 표준 JSON 문자열이 아니면 무조건 문자열로만 남습니다.


6. 요약

  • 이 SDK는 ProcessGPT Agent를 표준 규격으로 실행/저장/호출하는 공통 레이어
  • 작업 → 컨텍스트 준비 → Executor 실행 → 이벤트 저장 전체를 자동화
  • A2A 타입 2가지: TaskStatusUpdateEvent, TaskArtifactUpdateEvent
  • Event Type 4가지: task_started, task_working, task_completed, task_error
  • DB 매핑:
    • TaskStatusUpdateEventevents 테이블
    • TaskArtifactUpdateEventtodolist 테이블
  • ⚠️ str() 대신 무조건 json.dumps 사용!

7. 버전업

  • ./release.sh 버전
  • 오류 발생시 : python -m ensurepip --upgrade

8. integrations 모듈 안내

  • 스토리지 업로드 유틸은 processgpt_agent_sdk.integrations.storage 로 분리되었습니다.
  • 기존 processgpt_agent_sdk.utils.upload_file_to_bucket, upload_files_to_bucket 는 하위호환용으로 유지되지만 deprecated 입니다.
  • 신규 코드는 아래 경로를 사용하세요:
    • from processgpt_agent_sdk.integrations.storage import upload_file_to_bucket, upload_files_to_bucket

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

process_gpt_agent_sdk-0.4.15.tar.gz (21.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

process_gpt_agent_sdk-0.4.15-py3-none-any.whl (25.0 kB view details)

Uploaded Python 3

File details

Details for the file process_gpt_agent_sdk-0.4.15.tar.gz.

File metadata

  • Download URL: process_gpt_agent_sdk-0.4.15.tar.gz
  • Upload date:
  • Size: 21.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.12

File hashes

Hashes for process_gpt_agent_sdk-0.4.15.tar.gz
Algorithm Hash digest
SHA256 384cc1486e27fd7fbb5806cbd24a19b8a1c1370adb259f087d4faf5bcd5f7e3d
MD5 6cc9679d6b6a9c499adea21d19618b3b
BLAKE2b-256 ac9b70dab1500ed53acf63f220156f9b7eb8f141cecc354bf955f2ee882f0b4b

See more details on using hashes here.

File details

Details for the file process_gpt_agent_sdk-0.4.15-py3-none-any.whl.

File metadata

File hashes

Hashes for process_gpt_agent_sdk-0.4.15-py3-none-any.whl
Algorithm Hash digest
SHA256 f40343b4cc93ea10803adcdaca17ae4a8808c6f40c0a83e633b0b54fa0c9f143
MD5 c9f2e101976462a2fd26a5aebaceaa76
BLAKE2b-256 fd735a025f8a762add05b5eceba8a7e6e82580bd08e9f15885faf24360c8dd31

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page