Skip to main content

Build AI workflows and agents as fully-distributed and event-driven microservices.

Project description

🐮 Calfkit SDK

PyPI version PyPI Downloads Python versions codecov License

The SDK to build AI agents as composable, orchestratable microservices.

Calfkit lets you compose agents with independent services—agents, tools, workflows—that communicate asynchronously. Add agent capabilities without coordination. Scale each component independently. Stream agent outputs to any downstream listener.

Agents should work like real teams, with independent, distinct roles, async communication, and the ability to onboard new teammates or tools without restructuring the whole org. Build AI employees that integrate.

pip install calfkit

Why Calfkit?

The problem

Building agents like traditional web applications—tight coupling and synchronous API calls—creates the same scalability problems that plagued early microservices:

  • Tight coupling: Changing one tool or agent breaks dependent agents and tools
  • Scaling bottlenecks: All agents and tools live on one runtime, so everything must scale together
  • Siloed: Agent communication models are difficult to wire into existing upstream and downstream systems
  • Non-streaming: Agents do not naturally follow a livestreaming pattern, making data stream consumption difficult to manage

What Calfkit provides

Calfkit is a Python SDK that builds event-stream agents out-the-box. You get the benefits of an asynchronous, distributed system without managing the infrastructure yourself.

  • Distributed to the core: Agents aren't monoliths that just sit on top of the transportation layer. Agents are decomposed into independent services — the agent itself is a deeply distributed system.

  • Independent scaling: Each service can scale on its own based on demand.

  • Livestream agents by default: Agents already listen on event streams, so consuming data streams — realtime market feeds, IoT sensors, user activity event streams — is the native pattern, not a bolted-on integration.

  • Compose agents without coupling: Compose multi-agent teams and workflows by deploying agents on communication channels that are already tapped into the messaging stream. No extra wiring, and no editing existing code — agents don't even need to know about each other.

  • Universal data flow: Agents plug into any stream — integrate and consume from any upstream data sources and publish to downstream systems like CRMs, warehouses, or even other agents.


Quick Start

Prerequisites

  • Python 3.10 or later
  • Docker installed and running (for testing with a local Calfkit broker)
  • LLM Provider API key

1. Install

pip install calfkit

2. Start a Calfkit Broker

Option A: Local Broker (Requires Docker)

Calfkit uses Kafka as the event broker. Run the following command to clone the calfkit-broker repo and start a local Kafka broker container:

git clone https://github.com/calf-ai/calfkit-broker && cd calfkit-broker && make dev-up

Once the broker is ready, open a new terminal tab to continue with the quickstart.

Option B: ☁️ Calfkit Cloud (In Beta)

Skip the infrastructure. Calfkit Cloud is a fully-managed broker service built for Calfkit AI agents and multi-agent teams. No server infrastructure to self-host or maintain, with built-in observability and agent-event tracing.

You will be provided a Calfkit broker API to deploy your agents instead of setting up and maintaining a broker locally.

Sign up for access →


3. Define and Deploy the Tool Node

Define and deploy a tool as an independent service. Tools are not owned by or coupled to any specific agent—once deployed, any agent in your system can discover and invoke the tool. Deploy once, use everywhere.

# weather_tool.py
import asyncio
from calfkit.nodes import agent_tool
from calfkit.client import Client
from calfkit.worker import Worker

# Define a tool — the @agent_tool decorator turns any function into a deployable tool node
@agent_tool
def get_weather(location: str) -> str:
    """Get the current weather at a location"""
    return f"It's sunny in {location}"

async def main():
    client = Client.connect("localhost:9092")  # Connect to Kafka broker
    worker = Worker(client, nodes=[get_weather])  # Initialize a worker with the tool node
    await worker.run()  # (Blocking call) Deploy the service to start serving traffic

if __name__ == "__main__":
    asyncio.run(main())

Run the file to deploy the tool service:

python weather_tool.py

4. Deploy the Agent Node

Deploy the agent as its own service. The Agent handles LLM chat, tool orchestration, and conversation management in a single node. Import the tool definition to register it with the agent—the tool definition is reusable and does not couple the agent to the tool's deployment.

# agent_service.py
import asyncio
from calfkit.nodes import Agent
from calfkit.providers import OpenAIResponsesModelClient
from calfkit.client import Client
from calfkit.worker import Worker
from weather_tool import get_weather  # Import the tool definition (reusable)

agent = Agent(
    "weather_agent",
    system_prompt="You are a helpful assistant.",
    subscribe_topics="weather_agent.input",
    model_client=OpenAIResponsesModelClient(model_name="gpt-5.4-nano"),
    tools=[get_weather],  # Register tool definitions with the agent
)

async def main():
    client = Client.connect("localhost:9092")  # Connect to Kafka broker
    worker = Worker(client, nodes=[agent])  # Initialize a worker with the agent node
    await worker.run()  # (Blocking call) Deploy the service to start serving traffic

if __name__ == "__main__":
    asyncio.run(main())

Set your OpenAI API key:

export OPENAI_API_KEY=sk-...

Run the file to deploy the agent service:

python agent_service.py

5. Invoke the Agent

Send a request and receive the response. The Client handles broker communication and request correlation automatically.

# invoke.py
import asyncio
from calfkit.client import Client

async def main():
    client = Client.connect("localhost:9092")  # Connect to Kafka broker

    # Send a request and await the response
    result = await client.execute_node(
        "What's the weather in Tokyo?",
        "agent.input",  # The topic the agent subscribes to
    )
    print(f"Assistant: {result.output}")

if __name__ == "__main__":
    asyncio.run(main())

Run the file to invoke the agent:

python invoke.py

Structured Outputs (Optional)

Agents can be deployed with a final_output_type to enforce structured output from the LLM. The output is type-safe and deserialized automatically on the client side.

from dataclasses import dataclass
from calfkit.nodes import Agent
from calfkit.providers import OpenAIResponsesModelClient

@dataclass
class WeatherReport:
    location: str
    summary: str

agent = Agent(
    "weather_agent",
    system_prompt="You are a helpful assistant.",
    subscribe_topics="weather_agent.input",
    model_client=OpenAIResponsesModelClient(model_name="gpt-5.4-nano"),
    final_output_type=WeatherReport,  # Enforce structured output
)

When invoking, pass the matching output_type to deserialize the response:

result = await client.execute_node(
    "What's the weather in Tokyo?",
    "weather_agent.input",
    output_type=WeatherReport,
)
print(result.output.location)  # "Tokyo"
print(result.output.summary)   # "It's sunny in Tokyo"

Client-Side Features (Optional)

The Client supports multi-turn conversations, runtime dependency injection, and temporary instruction overrides—all without redeploying the agent.

Multi-turn conversations — pass the message history from a previous result to maintain context:

result = await client.execute_node("What's the weather in Tokyo?", "agent.input")

# Continue the conversation with full context
result = await client.execute_node(
    "How about in Osaka?",
    "agent.input",
    message_history=result.message_history,
)

Runtime dependency injection — pass runtime data to tools via the deps parameter:

result = await client.execute_node(
    "What's my phone number?",
    "agent.input",
    deps={"user_id": "usr_123"},  # Available to tools via ctx.deps.provided_deps
)

Temporary instructions — temporarily add system-level instructions scoped per request:

result = await client.execute_node(
    "What's the weather in Tokyo?",
    "agent.input",
    temp_instructions="Always respond in Japanese.",
)

Gating Node Invocations (Optional)

When multiple agents share an input topic (each with its own consumer group), every agent receives every message published to that topic. A gate stack lets a node decide whether to handle an inbound event before run() runs — avoiding wasted LLM tokens on messages addressed elsewhere.

Gates are predicates: Callable[[SessionRunContext], bool | Awaitable[bool]]. They stack with AND semantics in registration order and short-circuit on the first False, exception, or non-bool return. When any gate rejects, run() is skipped and the envelope is returned unchanged — the Kafka offset still commits.

Constructor form — good for shared, cross-cutting predicates passed in as values:

def is_scheduler_target(ctx) -> bool:
    discord = ctx.deps.provided_deps.get("discord", {})
    return discord.get("slash_target") == "scheduler"

scheduler = Agent(
    "scheduler",
    subscribe_topics="discord.thread.123",
    model_client=OpenAIResponsesModelClient(model_name="gpt-5.4-nano"),
    gates=[is_scheduler_target],
)

Decorator form — good for node-specific gates defined inline:

scheduler = Agent("scheduler", subscribe_topics="discord.thread.123", model_client=...)

@scheduler.gate
def is_scheduler_target(ctx) -> bool:
    discord = ctx.deps.provided_deps.get("discord", {})
    return discord.get("slash_target") == "scheduler"

Constructor and decorator forms can be combined; constructor gates run first.

Idempotency requirement: Kafka may redeliver an event before its offset commits, so gates may run more than once for the same logical message. Keep gate functions deterministic and side-effect-free.

Failure behavior: If a gate raises or returns a non-bool, the framework logs the failure and rejects the message (fail-safe). Place cheap fast-reject gates first to maximize short-circuit efficiency.

For tool-node gating, pass gates=[...] to ToolNodeDef.create_tool_node(...) directly; the @agent_tool decorator doesn't expose gates= because tool topics are typically 1:1.


Documentation

Full documentation is coming soon. In the meantime, this README serves as the primary reference for getting started with Calfkit.


Contact

X LinkedIn


Support

If you found this project interesting or useful, please consider:

  • ⭐ Starring the repository — it helps others discover it!
  • 🐛 Reporting issues
  • 🔀 Submitting PRs

License

This project is licensed under the Apache License 2.0. See the LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

calfkit-0.2.6.tar.gz (443.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

calfkit-0.2.6-py3-none-any.whl (561.0 kB view details)

Uploaded Python 3

File details

Details for the file calfkit-0.2.6.tar.gz.

File metadata

  • Download URL: calfkit-0.2.6.tar.gz
  • Upload date:
  • Size: 443.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for calfkit-0.2.6.tar.gz
Algorithm Hash digest
SHA256 00d391ca2eeefba3b05e90b390e51356834834cd8a2bbbcb64f13428ba0372b2
MD5 c544164cf25d8d74370bb075e645a12c
BLAKE2b-256 136d3a067e0e298ea3bd37d676198486882460fd194625dcb8d1fedf566ce8a2

See more details on using hashes here.

Provenance

The following attestation bundles were made for calfkit-0.2.6.tar.gz:

Publisher: release.yml on calf-ai/calfkit-sdk

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file calfkit-0.2.6-py3-none-any.whl.

File metadata

  • Download URL: calfkit-0.2.6-py3-none-any.whl
  • Upload date:
  • Size: 561.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for calfkit-0.2.6-py3-none-any.whl
Algorithm Hash digest
SHA256 cfd4818f0515bd67c5d6b2bcaa3ea8efe191d1a83e52cfd4064c33fc60129632
MD5 4fffa67075fbe31aff9d6605eb94ee09
BLAKE2b-256 c0a6e087cf57f1157f6f8ceede680e2d2c8b0f7b758c1d17e2e16e260cc9b0b8

See more details on using hashes here.

Provenance

The following attestation bundles were made for calfkit-0.2.6-py3-none-any.whl:

Publisher: release.yml on calf-ai/calfkit-sdk

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page