Skip to main content

Rill - AI Agent Framework

Project description

Rill Logo

PyPI version Python Version License Downloads GitHub Stars

A zero-dependency flow orchestration kernel for building AI workflows your way.

A minimal orchestration layer that lets you use any LLM client, any tools, any storage to build your AI agent applications.

Inspired by CrewAI and LangGraph, designed to be lighter and simpler.


Why Rill?

Building AI agents shouldn't require heavy frameworks. Sometimes you just need a simple orchestration piece:

  • Want to use your own LLM client (chak / OpenAI SDK / Anthropic SDK)? ✅
  • Want to use your own tools (functions / MCP servers / custom implementations)? ✅
  • Want to keep your codebase lightweight and dependencies minimal? ✅
  • Prefer code over YAML/JSON configs? Code is the orchestration. ✅

Rill is just an orchestration component - bring your own pieces, Rill handles the flow.


Core Philosophy

Rill embraces the "code is flow" design philosophy pioneered by CrewAI - use decorators to define nodes, use Python functions to express logic, no YAML or JSON configs needed.

Built on this foundation, Rill adds:

  1. Forward routing: Declare next steps with goto right where you are, not reverse subscription
  2. Zero binding: Framework handles orchestration only, everything else is your call

Special thanks to CrewAI and LangGraph for inspiring Rill's design.


Quick Start

Installation

# From PyPI (coming soon)
pip install rillpy

# From GitHub
pip install git+https://github.com/zhixiangxue/rill-ai.git@main

# Local development
git clone https://github.com/zhixiangxue/rill-ai.git
cd rill-ai
pip install -e .

Build a RAG workflow in 30 seconds

from rill import Flow, node, DYNAMIC, goto

class MyRAGFlow(Flow):
    @node(start=True, goto=DYNAMIC)
    async def query(self, user_input):
        # Use your own LLM client (e.g., chak)
        from chak import Conversation
        conv = Conversation("openai/gpt-4o-mini", api_key="YOUR_KEY")
        result = await conv.asend(user_input)
        
        # Decide routing based on LLM response
        if "search" in result.content.lower():
            # List means parallel: trigger vector search and web search simultaneously
            return goto([self.vector_search, self.web_search], user_input)
        return goto(self.answer, result.content)
    
    @node(goto="answer")
    async def vector_search(self, query):
        # Use your favorite vector database
        return await my_chromadb.search(query)
    
    @node(goto="answer")
    async def web_search(self, query):
        # Use your own search tool
        return await my_searxng.search(query)
    
    @node()
    async def answer(self, sources):
        # Multiple predecessors auto-merge: sources = {"vector_search": [...], "web_search": [...]}
        from chak import Conversation
        conv = Conversation("openai/gpt-4o-mini", api_key="YOUR_KEY")
        return await conv.asend(str(sources))

# Run
await MyRAGFlow().run("What is quantum entanglement?")

Notice:

  • ✅ LLM client: chak (or OpenAI SDK / Anthropic SDK, your choice)
  • ✅ Vector database: chromadb (or Pinecone / Qdrant, your choice)
  • ✅ Search tool: your own implementation (or MCP / LangChain Tools, your choice)
  • ✅ Rill only handles: which node first, which ones parallel, how to merge inputs
  • ✅ Data flows through return values (node → node) and shared state (self.state)

Core Features

🌱 Zero Binding

Framework doesn't care what LLM, tools, or databases you use. Only provides orchestration.

🪴 Code is Flow

Define nodes with @node decorator, declare routing with goto, no DSL needed.

🌻 Forward Declaration

Use goto(next, data) to directly specify next step in current node. Matches how humans think.

🌾 List Means Parallel

goto([A, B], data) automatically triggers asyncio.gather for parallel execution.

🌿 Auto Input Merge

When multiple predecessors point to same node, framework auto-merges outputs as {pred_name: output} dict.

🍀 Safety Guards

Graph validation (start point / cycles / reachability) + max_loop to prevent infinite loops.

🌲 Observable

Flow.stats() tracks node execution time, logger traces execution flow.

🪴 Shared State Management

Rill provides FlowState for sharing data across nodes. Simpler than LangGraph's in-node state updates.

TODO: Parallel nodes updating state simultaneously may have thread-safety issues. Community contributions welcome.

🌾 Return Value as Input

Predecessor node's return value becomes successor node's input parameter. No need to put everything in state.


Common Patterns

Conditional Branching + Parallel Execution

class ResearchFlow(Flow):
    @node(start=True, goto=DYNAMIC)
    async def decide(self, topic):
        complexity = await self.analyze_complexity(topic)
        
        if complexity > 0.8:
            # High complexity: parallel deep research
            return goto([self.academic_search, self.expert_interview], topic)
        else:
            # Low complexity: quick search
            return goto(self.web_search, topic)
    
    @node(goto="synthesize")
    async def academic_search(self, topic):
        return await search_papers(topic)
    
    @node(goto="synthesize")
    async def expert_interview(self, topic):
        return await interview_experts(topic)
    
    @node(goto="synthesize")
    async def web_search(self, topic):
        return await search_web(topic)
    
    @node()
    async def synthesize(self, sources):
        # Auto-merge: sources could be {"academic_search": ..., "expert_interview": ...}
        # or just web_search output (single predecessor)
        return await generate_report(sources)

Loop + Exit Condition

class IterativeFlow(Flow):
    @node(start=True, goto=DYNAMIC, max_loop=5)
    async def generate(self, prompt):
        result = await llm_generate(prompt)
        quality = await self.evaluate(result)
        
        if quality > 0.9:
            return goto(self.finalize, result)
        else:
            # Loop back with feedback
            return goto(self.generate, {"prompt": prompt, "feedback": quality})
    
    @node()
    async def finalize(self, result):
        return result

Using Shared State

class MyWorkflow(Flow):
    @node(start=True, goto=["fetch_data", "process_config"])
    async def begin(self, inputs):
        # Store inputs in state for other nodes to access
        self.state.user_id = inputs["user_id"]
        self.state.query = inputs["query"]
        self.state.results = []  # Initialize shared collection
    
    @node(goto="merge")
    async def fetch_data(self, previous_result):
        # Access state from parallel node
        data = await api_call(self.state.user_id, self.state.query)
        
        # Accumulate results in state
        self.state.results.append({"source": "api", "data": data})
        return data
    
    @node(goto="merge")
    async def process_config(self, previous_result):
        # Another parallel node accessing same state
        config = load_config(self.state.user_id)
        
        # Also update shared state
        self.state.config = config
        return config
    
    @node()
    async def merge(self, inputs):
        # inputs = {"fetch_data": ..., "process_config": ...}
        # state contains accumulated data from all nodes
        final_result = combine(
            inputs["fetch_data"],
            inputs["process_config"],
            self.state.results  # Access shared state
        )
        
        self.state.final_output = final_result
        return final_result

# Run the workflow
flow = MyWorkflow()
final_state = await flow.run({
    "user_id": 123,
    "query": "hello"
})  # 🎯 Rill auto-converts your dict to a Pydantic FlowState object!

# Access final state (Pydantic model)
print(final_state.final_output)  # 🎉 Flow.run() returns the final FlowState
print(final_state.user_id)        # Access any field stored during execution
print(final_state.results)         # All accumulated data persists here

State vs Return Value:

  • Return value: Direct data passing from predecessor to successor (the main data pipeline)
  • State: Shared context accessible from any node (for metadata, counters, cross-branch data)
  • Key difference: Return values flow through edges, state persists across the entire workflow
  • Use return values for primary data flow, use state for auxiliary data that multiple nodes need

Known Issue:

  • ⚠️ Parallel nodes updating state simultaneously may cause race conditions
  • 🔧 TODO: Need thread-safe state update mechanism (community contributions welcome)

When to Use Rill?

Your Situation Recommendation
Quick GPT app, don't want to manage anything 👉 LangChain / LangGraph (all-in-one convenience)
Want to use my own LLM client (chak / OpenAI SDK) + custom tools 👉 Rill (orchestration freedom)
Just want pure orchestration layer, pick other components myself 👉 Rill

FAQ

Q: What's the difference between Rill and LangGraph?
A: LangGraph is an all-in-one suite (orchestration + LLM + tools + memory), Rill only handles orchestration layer, other components are your choice.

Q: I'm already using LangChain Tools, can I use Rill?
A: Yes! Rill doesn't care where your tools come from, just call them directly in nodes.

Q: Does Rill support state persistence?
A: Current FlowState is in-memory state (Pydantic model), persistence is your choice (Redis / PostgreSQL / files), no binding to any storage solution.

Q: I want to use my own LLM client (e.g., chak), how to integrate?
A: Just import chak in nodes and call it, Rill doesn't care which LLM you use. Example:

@node(start=True, goto="process")
async def query(self, user_input):
    from chak import Conversation  # Your LLM client
    conv = Conversation("openai/gpt-4o-mini", api_key="YOUR_KEY")
    return await conv.asend(user_input)

Q: When do I need max_loop?
A: When your flow has cycles (e.g., "generate → evaluate → regenerate"), use max_loop to limit loop iterations and prevent infinite loops.

Q: How does input merging work?
A: When multiple predecessor nodes point to the same target node, and all predecessors complete, the framework merges their outputs as a dict {pred_name: output} and passes it to the target node.

Q: What's the difference between state and return value?
A: They serve different purposes:

  • Node return value: Passes data to the next node(s) through the flow edge. This is the main data pipeline.
  • State (self.state): A shared Pydantic object accessible from all nodes throughout the workflow. Use it for metadata, counters, configuration, or data that multiple branches need to access.
  • Example: Return the processed result to next node, but store statistics/metadata in state.

Q: Is state update thread-safe in parallel nodes?
A: Not yet. Parallel nodes updating state simultaneously may cause race conditions. This is a known TODO. For now, avoid state updates in parallel nodes or use return values instead.


API Reference

Flow

Orchestration engine, inherit to define your workflow.

class MyFlow(Flow):
    def __init__(self, initial_state=None, max_steps=1000, validate=True):
        super().__init__(initial_state, max_steps, validate)
  • initial_state: Initial state dict or Pydantic model
  • max_steps: Max execution steps (prevent infinite loops)
  • validate: Whether to validate graph before execution

@node

Decorator to mark methods as executable flow nodes.

@node(start=False, goto=None, max_loop=None)
def my_node(self, inputs):
    pass
  • start: Whether this is the start node
  • goto: Next node(s), can be:
    • None: No successors (end node)
    • "node_name": Single next node
    • ["node1", "node2"]: Multiple nodes (parallel execution)
    • DYNAMIC: Runtime-determined routing (must return goto(...) in node)
  • max_loop: Max loop count for this node (for cycle detection)

goto(target, data)

Construct routing decision for DYNAMIC nodes.

@node(goto=DYNAMIC)
async def decide(self, inputs):
    if condition:
        return goto(self.next_node, data)
    else:
        return goto([self.task_a, self.task_b], data)  # Parallel
  • target: Single node or list of nodes (list triggers parallel execution)
  • data: Payload passed to target node(s)

DYNAMIC

Constant for runtime-determined routing. Use with goto().

FlowState

Shared mutable state container (Pydantic model).

# Access state from any node
self.state.custom_field = "value"  # Runtime field injection
self.state.user_id = 123
self.state.results = []  # Shared collection

# Two independent data channels:
# 1. Return value: flows through edges (node → successor)
# 2. State: shared context persists across entire workflow

Known Issue: Parallel nodes updating state simultaneously may cause race conditions (TODO).

Flow.run(initial_input)

Execute the flow.

result_state = await flow.run({"user_input": "Hello"})

Flow.stats()

Get execution statistics.

stats = flow.stats()
# {
#     "timing": {
#         "total_duration": 2.35,
#         "nodes": {
#             "query": {"duration": 1.2, "percentage": 51.06},
#             "search": {"duration": 0.8, "percentage": 34.04}
#         }
#     }
# }

Architecture

┌─────────────────────────────────────────┐
│      Your Application Layer             │
│  LLM: chak / OpenAI / Anthropic / ...   │
│  Tools: MCP / Functions / LangChain     │
│  Storage: ChromaDB / PostgreSQL / Redis │
└──────────────┬──────────────────────────┘
               │ Only depends on Rill for orchestration
┌──────────────▼──────────────────────────┐
│         Rill Orchestration Layer        │
│  @node + goto + parallel + State        │
└─────────────────────────────────────────┘

Dependencies

  • Python >= 3.8
  • pydantic >= 2.0.0
  • loguru >= 0.7.0

License

MIT License - see LICENSE file for details.

Demo Video

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

rillpy-0.1.0.tar.gz (19.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

rillpy-0.1.0-py3-none-any.whl (14.7 kB view details)

Uploaded Python 3

File details

Details for the file rillpy-0.1.0.tar.gz.

File metadata

  • Download URL: rillpy-0.1.0.tar.gz
  • Upload date:
  • Size: 19.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for rillpy-0.1.0.tar.gz
Algorithm Hash digest
SHA256 0868bd8840fcc2404e89cb9ad65c3eaa5f85b9e2742a0f2f35934c80aa551a9d
MD5 aac80dca584b235ae2a4f87c5a74b9e5
BLAKE2b-256 a69ed25a320dc46d35a058bfa023b625c2aab3c0446b9e0410eb93b759a507b9

See more details on using hashes here.

Provenance

The following attestation bundles were made for rillpy-0.1.0.tar.gz:

Publisher: publish.yml on zhixiangxue/rill-ai

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file rillpy-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: rillpy-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 14.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for rillpy-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 8e1ba274784b21e4568436bfcf408d7ad84e5bcce7d476875706a20fcb554705
MD5 305ab10f9602154453c5033824a96a91
BLAKE2b-256 aeac7e63bb53c6800f0f3ff5e05c3b9bba56f9b90bc7d075ab5e77285af13fe4

See more details on using hashes here.

Provenance

The following attestation bundles were made for rillpy-0.1.0-py3-none-any.whl:

Publisher: publish.yml on zhixiangxue/rill-ai

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page