Skip to main content

Python SDK for Subnet Agent Development

Project description

Subnet SDK for Python

Python SDK for building agents that interact with the Subnet protocol.

Bids and execution reports automatically include metadata["chain_address"] when a chain address is configured or derived from the private key, enabling on-chain verification by matchers and validators.

Installation

Install from source (recommended until published to PyPI):

# Clone the repository
git clone https://github.com/PIN-AI/subnet-sdk.git
cd subnet-sdk/python

# Install in development mode
pip install -e .

# Or install directly from GitHub
pip install git+https://github.com/PIN-AI/subnet-sdk.git#subdirectory=python

Quick Start

import asyncio
from subnet_sdk import SDK, ConfigBuilder, Handler, Task, Result

class MyHandler(Handler):
    async def execute(self, task: Task) -> Result:
        # Process task
        print(f"Processing task: {task.id}")

        # Return result
        return Result(
            data=b"task completed",
            success=True
        )

async def main():
    # Configure SDK (NO default IDs - must be explicit)
    config = (
        ConfigBuilder()
        .with_subnet_id("my-subnet-1")      # REQUIRED
        .with_agent_id("my-agent-1")        # REQUIRED
        .with_private_key("" + "a" * 64)    # 64 hex chars, no 0x prefix
        .with_chain_address("0xYourAgentAddress")  # Optional when signer is external
        .with_matcher_addr("localhost:8090") # REQUIRED
        .with_capabilities("compute", "ml")  # REQUIRED
        .build()
    )

    # Create and start SDK
    sdk = SDK(config)
    sdk.register_handler(MyHandler())
    await sdk.start()

    print(f"Agent started: {sdk.get_agent_id()}")

    # Keep running
    await asyncio.Event().wait()

if __name__ == "__main__":
    asyncio.run(main())

Configuration

Using ConfigBuilder (Recommended)

config = (
    ConfigBuilder()
    # Identity (REQUIRED - no defaults)
    .with_subnet_id("subnet-1")
    .with_agent_id("agent-1")

    # Authentication
    .with_private_key("...")    # 64 hex chars, no 0x prefix
    .with_chain_address("0x...") # Optional when key is held elsewhere

    # Network (REQUIRED)
    .with_matcher_addr("localhost:8090")
    .with_validator_addr("localhost:9090")

    # Capabilities (REQUIRED)
    .with_capabilities("compute", "storage")
    .with_intent_types("compute")

    # Performance
    .with_task_timeout(60)  # seconds
    .with_max_concurrent_tasks(10)

    # Economics
    .with_bidding_strategy("dynamic", 50, 500)

    .build()
)

Direct Configuration

from subnet_sdk import Config, IdentityConfig

config = Config(
    identity=IdentityConfig(
        subnet_id="subnet-1",  # REQUIRED
        agent_id="agent-1"     # REQUIRED
    ),
    private_key="...",         # 64 hex chars, no 0x prefix
    chain_address="0x...",      # Optional when private key not local
    matcher_addr="localhost:8090",  # REQUIRED
    capabilities=["compute"],  # REQUIRED
    task_timeout=30,
    max_concurrent_tasks=5
)

Important Configuration Rules

  1. No Default IDs: SubnetID and AgentID MUST be configured explicitly
  2. Private Key Format: Exactly 64 hexadecimal characters (no 0x prefix)
  3. Required Fields: subnet_id, agent_id, matcher_addr, capabilities
  4. Chain Address: The SDK derives the on-chain address from the private key. If you sign elsewhere, set chain_address/with_chain_address so bids and reports carry metadata["chain_address"] for on-chain checks.

API Reference

SDK Class

# Create SDK
sdk = SDK(config)

# Register handler
sdk.register_handler(handler)
sdk.register_bidding_strategy(strategy)
sdk.register_callbacks(callbacks)

# Start/stop
await sdk.start()
await sdk.stop()

# Get configuration
sdk.get_agent_id()
sdk.get_subnet_id()
sdk.get_chain_address()  # Ethereum address used in metadata
sdk.get_capabilities()

# Execute task
result = await sdk.execute_task(task)

# Sign data
signature = sdk.sign(data)

# Get metrics
metrics = sdk.get_metrics()

Handler Interface

class Handler(ABC):
    @abstractmethod
    async def execute(self, task: Task) -> Result:
        pass

Data Types

@dataclass
class Task:
    id: str
    intent_id: str
    type: str
    data: bytes
    metadata: Dict[str, Any]
    deadline: datetime
    created_at: datetime

@dataclass
class Result:
    data: bytes
    success: bool
    error: Optional[str] = None
    metadata: Optional[Dict[str, Any]] = None

Batch Operations

The SDK supports batch operations for improved performance when submitting multiple bids or execution reports:

Batch Bid Submission

from subnet_sdk import MatcherClient, SigningConfig
from subnet_sdk.proto.subnet import matcher_pb2, bid_pb2

# Create matcher client
signing_config = SigningConfig(
    private_key_hex="your_64_hex_char_key",  # 64 hex chars, no 0x prefix
    chain_id="subnet-1"
)

matcher_client = MatcherClient(
    target="localhost:8090",
    signing_config=signing_config
)

# Prepare multiple bids
bids = [
    bid_pb2.Bid(
        bid_id="bid-1",
        intent_id="intent-123",
        agent_id="agent-1",
        price=100,
    ),
    bid_pb2.Bid(
        bid_id="bid-2",
        intent_id="intent-123",
        agent_id="agent-1",
        price=150,
    ),
]

# Submit batch
batch_req = matcher_pb2.SubmitBidBatchRequest(
    bids=bids,
    batch_id="batch-123",
    partial_ok=True,  # Continue on partial failures
)

try:
    response = await matcher_client.submit_bid_batch(batch_req)
    print(f"Batch results: {response.success} succeeded, {response.failed} failed")

    for i, ack in enumerate(response.acks):
        print(f"Bid {i}: accepted={ack.accepted}, reason={ack.reason}")
finally:
    await matcher_client.close()

Batch Execution Report Submission

from subnet_sdk import ValidatorClient, SigningConfig
from subnet_sdk.proto.subnet import service_pb2, execution_report_pb2
import time

# Create validator client
signing_config = SigningConfig(
    private_key_hex="your_64_hex_char_key",  # 64 hex chars, no 0x prefix
    chain_id="subnet-1"
)

validator_client = ValidatorClient(
    target="localhost:9090",
    signing_config=signing_config
)

# Prepare multiple reports
reports = [
    execution_report_pb2.ExecutionReport(
        report_id="report-1",
        assignment_id="assignment-1",
        intent_id="intent-123",
        agent_id="agent-1",
        status=execution_report_pb2.ExecutionReport.SUCCESS,
        timestamp=int(time.time()),
    ),
    execution_report_pb2.ExecutionReport(
        report_id="report-2",
        assignment_id="assignment-2",
        intent_id="intent-456",
        agent_id="agent-1",
        status=execution_report_pb2.ExecutionReport.SUCCESS,
        timestamp=int(time.time()),
    ),
]

# Submit batch
batch_req = service_pb2.ExecutionReportBatchRequest(
    reports=reports,
    batch_id="batch-456",
    partial_ok=False,  # Stop on first failure
)

try:
    response = await validator_client.submit_execution_report_batch(batch_req)
    print(f"Batch results: {response.success} succeeded, {response.failed} failed")

    for i, receipt in enumerate(response.receipts):
        print(f"Report {i}: status={receipt.status}, phase={receipt.phase}")
finally:
    await validator_client.close()

Batch Operation Benefits

  • Performance: Reduced network overhead and connection management
  • Atomicity: Optional partial success handling with partial_ok flag
  • Efficiency: Single RPC call for multiple operations
  • Idempotency: Use batch_id to prevent duplicate submissions

Batch Error Handling

# Stop on first failure (partial_ok = False)
batch_req = matcher_pb2.SubmitBidBatchRequest(
    bids=bids,
    batch_id="batch-123",
    partial_ok=False,
)
# If any bid fails, remaining bids are rejected

# Continue on failures (partial_ok = True)
batch_req = matcher_pb2.SubmitBidBatchRequest(
    bids=bids,
    batch_id="batch-123",
    partial_ok=True,
)
# All bids are processed, check individual acks for results

Complete Example

See example.py for a complete working example.

Testing

# Install dev dependencies
pip install -e .[dev]

# Run tests
pytest

# With coverage
pytest --cov=subnet_sdk

注册与执行报告

  1. 注册与心跳

    • 启用注册: 在配置中同时设置 registry_addragent_endpoint
    • 启动后 SDK 会调用 POST /agents 注册自身,并按 registry_heartbeat_interval (默认 30 秒) 调用 POST /agents/{id}/heartbeat
    • 停止时会调用 DELETE /agents/{id} 注销;建议在调试脚本中等待 sdk.stop() 完成。
  2. 发现验证节点

    • SDK.discover_validators() 会请求 GET /validators 并返回 ValidatorEndpoint 列表。
    • 若配置了 validator_addr 则会在注册发现失败时用作兜底。
  3. 提交执行报告

    • 使用 ExecutionReport 填写 report_idassignment_idintent_idstatus (默认 success)、result_data (bytes) 与 metadata
    • 调用 await sdk.submit_execution_report(report);SDK 会生成 /api/v1/execution-report URL 并广播到所有验证节点。
    • 返回值是 ExecutionReceipt 列表,包含 validator_idstatusmessage 及时间戳;可结合 metrics.report_counters() 统计成功/失败次数。

Development

# Format code
black subnet_sdk/

# Lint
flake8 subnet_sdk/

# Type checking
mypy subnet_sdk/

Bidding Strategy & Callbacks

from subnet_sdk import BiddingStrategy, Callbacks, Bid, Intent

class FixedPriceStrategy(BiddingStrategy):
    def should_bid(self, intent: Intent) -> bool:
        return intent.type == "compute"

    def calculate_bid(self, intent: Intent) -> Bid:
        return Bid(price=100, currency="PIN")

class LifecycleCallbacks(Callbacks):
    async def on_bid_submitted(self, intent_id: str, bid_id: str) -> None:
        print(f"bid {bid_id} submitted for intent {intent_id}")

    async def on_task_completed(self, task: Task, result: Result) -> None:
        print(f"task {task.id} finished: {result.success}")

sdk.register_bidding_strategy(FixedPriceStrategy())
sdk.register_callbacks(LifecycleCallbacks())

启用投标策略后,SDK 会自动订阅 matcher 的 StreamIntents,依据策略提交 SubmitBid,并通过回调上报竞标结果、任务处理与执行报告状态。

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

subnet_sdk-0.1.0.tar.gz (44.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

subnet_sdk-0.1.0-py3-none-any.whl (57.8 kB view details)

Uploaded Python 3

File details

Details for the file subnet_sdk-0.1.0.tar.gz.

File metadata

  • Download URL: subnet_sdk-0.1.0.tar.gz
  • Upload date:
  • Size: 44.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.0

File hashes

Hashes for subnet_sdk-0.1.0.tar.gz
Algorithm Hash digest
SHA256 b123684c040dad350ccf9e1f7874a894e3931979c28ab5360b4118dba0065ce0
MD5 ee733badb4ed7681ede6cb87bf14dbf3
BLAKE2b-256 3fd306959372c327db73fadf8a9886fd7dc298b051d45106b8b734b6716f44fa

See more details on using hashes here.

File details

Details for the file subnet_sdk-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: subnet_sdk-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 57.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.0

File hashes

Hashes for subnet_sdk-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 5456c1779a44cba1cac6072035e56f7214a132583e980f0d7e2b5c0be3f7b63b
MD5 b70ac8e5739d2555b40a593ec9218c04
BLAKE2b-256 93ab69b261bf26366597bc206d7efe3bb988cab81838408c8ee20c551598b316

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page