Rill - AI Agent Framework
Project description
A zero-dependency flow orchestration kernel for building AI workflows your way.
A minimal orchestration layer that lets you use any LLM client, any tools, any storage to build your AI agent applications.
Inspired by CrewAI and LangGraph, designed to be lighter and simpler.
Why Rill?
Building AI agents shouldn't require heavy frameworks. Sometimes you just need a simple orchestration piece:
- Want to use your own LLM client (chak / OpenAI SDK / Anthropic SDK)? ✅
- Want to use your own tools (functions / MCP servers / custom implementations)? ✅
- Want to keep your codebase lightweight and dependencies minimal? ✅
- Prefer code over YAML/JSON configs? Code is the orchestration. ✅
Rill is just an orchestration component - bring your own pieces, Rill handles the flow.
Core Philosophy
Rill embraces the "code is flow" design philosophy pioneered by CrewAI - use decorators to define nodes, use Python functions to express logic, no YAML or JSON configs needed.
Built on this foundation, Rill adds:
- Forward routing: Declare next steps with
gotoright where you are, not reverse subscription - Zero binding: Framework handles orchestration only, everything else is your call
Special thanks to CrewAI and LangGraph for inspiring Rill's design.
Quick Start
Installation
# From PyPI (coming soon)
pip install rillpy
# From GitHub
pip install git+https://github.com/zhixiangxue/rill-ai.git@main
# Local development
git clone https://github.com/zhixiangxue/rill-ai.git
cd rill-ai
pip install -e .
Build a RAG workflow in 30 seconds
from rill import Flow, node, DYNAMIC, goto
class MyRAGFlow(Flow):
@node(start=True, goto=DYNAMIC)
async def query(self, user_input):
# Use your own LLM client (e.g., chak)
from chak import Conversation
conv = Conversation("openai/gpt-4o-mini", api_key="YOUR_KEY")
result = await conv.asend(user_input)
# Decide routing based on LLM response
if "search" in result.content.lower():
# List means parallel: trigger vector search and web search simultaneously
return goto([self.vector_search, self.web_search], user_input)
return goto(self.answer, result.content)
@node(goto="answer")
async def vector_search(self, query):
# Use your favorite vector database
return await my_chromadb.search(query)
@node(goto="answer")
async def web_search(self, query):
# Use your own search tool
return await my_searxng.search(query)
@node()
async def answer(self, sources):
# Multiple predecessors auto-merge: sources = {"vector_search": [...], "web_search": [...]}
from chak import Conversation
conv = Conversation("openai/gpt-4o-mini", api_key="YOUR_KEY")
return await conv.asend(str(sources))
# Run
await MyRAGFlow().run("What is quantum entanglement?")
Notice:
- ✅ LLM client:
chak(or OpenAI SDK / Anthropic SDK, your choice) - ✅ Vector database:
chromadb(or Pinecone / Qdrant, your choice) - ✅ Search tool: your own implementation (or MCP / LangChain Tools, your choice)
- ✅ Rill only handles: which node first, which ones parallel, how to merge inputs
- ✅ Data flows through return values (node → node) and shared state (
self.state)
Core Features
🌱 Zero Binding
Framework doesn't care what LLM, tools, or databases you use. Only provides orchestration.
🪴 Code is Flow
Define nodes with @node decorator, declare routing with goto, no DSL needed.
🌻 Forward Declaration
Use goto(next, data) to directly specify next step in current node. Matches how humans think.
🌾 List Means Parallel
goto([A, B], data) automatically triggers asyncio.gather for parallel execution.
🌿 Auto Input Merge
When multiple predecessors point to same node, framework auto-merges outputs as {pred_name: output} dict.
🍀 Safety Guards
Graph validation (start point / cycles / reachability) + max_loop to prevent infinite loops.
🌲 Observable
Flow.stats() tracks node execution time, logger traces execution flow.
🪴 Shared State Management
Rill provides FlowState for sharing data across nodes. Simpler than LangGraph's in-node state updates.
TODO: Parallel nodes updating state simultaneously may have thread-safety issues. Community contributions welcome.
🌾 Return Value as Input
Predecessor node's return value becomes successor node's input parameter. No need to put everything in state.
Common Patterns
Conditional Branching + Parallel Execution
class ResearchFlow(Flow):
@node(start=True, goto=DYNAMIC)
async def decide(self, topic):
complexity = await self.analyze_complexity(topic)
if complexity > 0.8:
# High complexity: parallel deep research
return goto([self.academic_search, self.expert_interview], topic)
else:
# Low complexity: quick search
return goto(self.web_search, topic)
@node(goto="synthesize")
async def academic_search(self, topic):
return await search_papers(topic)
@node(goto="synthesize")
async def expert_interview(self, topic):
return await interview_experts(topic)
@node(goto="synthesize")
async def web_search(self, topic):
return await search_web(topic)
@node()
async def synthesize(self, sources):
# Auto-merge: sources could be {"academic_search": ..., "expert_interview": ...}
# or just web_search output (single predecessor)
return await generate_report(sources)
Loop + Exit Condition
class IterativeFlow(Flow):
@node(start=True, goto=DYNAMIC, max_loop=5)
async def generate(self, prompt):
result = await llm_generate(prompt)
quality = await self.evaluate(result)
if quality > 0.9:
return goto(self.finalize, result)
else:
# Loop back with feedback
return goto(self.generate, {"prompt": prompt, "feedback": quality})
@node()
async def finalize(self, result):
return result
Using Shared State
class MyWorkflow(Flow):
@node(start=True, goto=["fetch_data", "process_config"])
async def begin(self, inputs):
# Store inputs in state for other nodes to access
self.state.user_id = inputs["user_id"]
self.state.query = inputs["query"]
self.state.results = [] # Initialize shared collection
@node(goto="merge")
async def fetch_data(self, previous_result):
# Access state from parallel node
data = await api_call(self.state.user_id, self.state.query)
# Accumulate results in state
self.state.results.append({"source": "api", "data": data})
return data
@node(goto="merge")
async def process_config(self, previous_result):
# Another parallel node accessing same state
config = load_config(self.state.user_id)
# Also update shared state
self.state.config = config
return config
@node()
async def merge(self, inputs):
# inputs = {"fetch_data": ..., "process_config": ...}
# state contains accumulated data from all nodes
final_result = combine(
inputs["fetch_data"],
inputs["process_config"],
self.state.results # Access shared state
)
self.state.final_output = final_result
return final_result
# Run the workflow
flow = MyWorkflow()
final_state = await flow.run({
"user_id": 123,
"query": "hello"
}) # 🎯 Rill auto-converts your dict to a Pydantic FlowState object!
# Access final state (Pydantic model)
print(final_state.final_output) # 🎉 Flow.run() returns the final FlowState
print(final_state.user_id) # Access any field stored during execution
print(final_state.results) # All accumulated data persists here
State vs Return Value:
- Return value: Direct data passing from predecessor to successor (the main data pipeline)
- State: Shared context accessible from any node (for metadata, counters, cross-branch data)
- Key difference: Return values flow through edges, state persists across the entire workflow
- Use return values for primary data flow, use state for auxiliary data that multiple nodes need
Known Issue:
- ⚠️ Parallel nodes updating state simultaneously may cause race conditions
- 🔧 TODO: Need thread-safe state update mechanism (community contributions welcome)
When to Use Rill?
| Your Situation | Recommendation |
|---|---|
| Quick GPT app, don't want to manage anything | 👉 LangChain / LangGraph (all-in-one convenience) |
| Want to use my own LLM client (chak / OpenAI SDK) + custom tools | 👉 Rill (orchestration freedom) |
| Just want pure orchestration layer, pick other components myself | 👉 Rill |
FAQ
Q: What's the difference between Rill and LangGraph?
A: LangGraph is an all-in-one suite (orchestration + LLM + tools + memory), Rill only handles orchestration layer, other components are your choice.
Q: I'm already using LangChain Tools, can I use Rill?
A: Yes! Rill doesn't care where your tools come from, just call them directly in nodes.
Q: Does Rill support state persistence?
A: Current FlowState is in-memory state (Pydantic model), persistence is your choice (Redis / PostgreSQL / files), no binding to any storage solution.
Q: I want to use my own LLM client (e.g., chak), how to integrate?
A: Just import chak in nodes and call it, Rill doesn't care which LLM you use. Example:
@node(start=True, goto="process")
async def query(self, user_input):
from chak import Conversation # Your LLM client
conv = Conversation("openai/gpt-4o-mini", api_key="YOUR_KEY")
return await conv.asend(user_input)
Q: When do I need max_loop?
A: When your flow has cycles (e.g., "generate → evaluate → regenerate"), use max_loop to limit loop iterations and prevent infinite loops.
Q: How does input merging work?
A: When multiple predecessor nodes point to the same target node, and all predecessors complete, the framework merges their outputs as a dict {pred_name: output} and passes it to the target node.
Q: What's the difference between state and return value?
A: They serve different purposes:
- Node return value: Passes data to the next node(s) through the flow edge. This is the main data pipeline.
- State (
self.state): A shared Pydantic object accessible from all nodes throughout the workflow. Use it for metadata, counters, configuration, or data that multiple branches need to access. - Example: Return the processed result to next node, but store statistics/metadata in state.
Q: Is state update thread-safe in parallel nodes?
A: Not yet. Parallel nodes updating state simultaneously may cause race conditions. This is a known TODO. For now, avoid state updates in parallel nodes or use return values instead.
API Reference
Flow
Orchestration engine, inherit to define your workflow.
class MyFlow(Flow):
def __init__(self, initial_state=None, max_steps=1000, validate=True):
super().__init__(initial_state, max_steps, validate)
initial_state: Initial state dict or Pydantic modelmax_steps: Max execution steps (prevent infinite loops)validate: Whether to validate graph before execution
@node
Decorator to mark methods as executable flow nodes.
@node(start=False, goto=None, max_loop=None)
def my_node(self, inputs):
pass
start: Whether this is the start nodegoto: Next node(s), can be:None: No successors (end node)"node_name": Single next node["node1", "node2"]: Multiple nodes (parallel execution)DYNAMIC: Runtime-determined routing (must returngoto(...)in node)
max_loop: Max loop count for this node (for cycle detection)
goto(target, data)
Construct routing decision for DYNAMIC nodes.
@node(goto=DYNAMIC)
async def decide(self, inputs):
if condition:
return goto(self.next_node, data)
else:
return goto([self.task_a, self.task_b], data) # Parallel
target: Single node or list of nodes (list triggers parallel execution)data: Payload passed to target node(s)
DYNAMIC
Constant for runtime-determined routing. Use with goto().
FlowState
Shared mutable state container (Pydantic model).
# Access state from any node
self.state.custom_field = "value" # Runtime field injection
self.state.user_id = 123
self.state.results = [] # Shared collection
# Two independent data channels:
# 1. Return value: flows through edges (node → successor)
# 2. State: shared context persists across entire workflow
Known Issue: Parallel nodes updating state simultaneously may cause race conditions (TODO).
Flow.run(initial_input)
Execute the flow.
result_state = await flow.run({"user_input": "Hello"})
Flow.stats()
Get execution statistics.
stats = flow.stats()
# {
# "timing": {
# "total_duration": 2.35,
# "nodes": {
# "query": {"duration": 1.2, "percentage": 51.06},
# "search": {"duration": 0.8, "percentage": 34.04}
# }
# }
# }
Architecture
┌─────────────────────────────────────────┐
│ Your Application Layer │
│ LLM: chak / OpenAI / Anthropic / ... │
│ Tools: MCP / Functions / LangChain │
│ Storage: ChromaDB / PostgreSQL / Redis │
└──────────────┬──────────────────────────┘
│ Only depends on Rill for orchestration
┌──────────────▼──────────────────────────┐
│ Rill Orchestration Layer │
│ @node + goto + parallel + State │
└─────────────────────────────────────────┘
Dependencies
- Python >= 3.8
- pydantic >= 2.0.0
- loguru >= 0.7.0
License
MIT License - see LICENSE file for details.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file rillpy-0.1.0.tar.gz.
File metadata
- Download URL: rillpy-0.1.0.tar.gz
- Upload date:
- Size: 19.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0868bd8840fcc2404e89cb9ad65c3eaa5f85b9e2742a0f2f35934c80aa551a9d
|
|
| MD5 |
aac80dca584b235ae2a4f87c5a74b9e5
|
|
| BLAKE2b-256 |
a69ed25a320dc46d35a058bfa023b625c2aab3c0446b9e0410eb93b759a507b9
|
Provenance
The following attestation bundles were made for rillpy-0.1.0.tar.gz:
Publisher:
publish.yml on zhixiangxue/rill-ai
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
rillpy-0.1.0.tar.gz -
Subject digest:
0868bd8840fcc2404e89cb9ad65c3eaa5f85b9e2742a0f2f35934c80aa551a9d - Sigstore transparency entry: 725045369
- Sigstore integration time:
-
Permalink:
zhixiangxue/rill-ai@111f2d7df463e47b5f58d1b763997b9b01ce11d5 -
Branch / Tag:
refs/tags/v0.1.0 - Owner: https://github.com/zhixiangxue
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@111f2d7df463e47b5f58d1b763997b9b01ce11d5 -
Trigger Event:
release
-
Statement type:
File details
Details for the file rillpy-0.1.0-py3-none-any.whl.
File metadata
- Download URL: rillpy-0.1.0-py3-none-any.whl
- Upload date:
- Size: 14.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8e1ba274784b21e4568436bfcf408d7ad84e5bcce7d476875706a20fcb554705
|
|
| MD5 |
305ab10f9602154453c5033824a96a91
|
|
| BLAKE2b-256 |
aeac7e63bb53c6800f0f3ff5e05c3b9bba56f9b90bc7d075ab5e77285af13fe4
|
Provenance
The following attestation bundles were made for rillpy-0.1.0-py3-none-any.whl:
Publisher:
publish.yml on zhixiangxue/rill-ai
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
rillpy-0.1.0-py3-none-any.whl -
Subject digest:
8e1ba274784b21e4568436bfcf408d7ad84e5bcce7d476875706a20fcb554705 - Sigstore transparency entry: 725045371
- Sigstore integration time:
-
Permalink:
zhixiangxue/rill-ai@111f2d7df463e47b5f58d1b763997b9b01ce11d5 -
Branch / Tag:
refs/tags/v0.1.0 - Owner: https://github.com/zhixiangxue
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@111f2d7df463e47b5f58d1b763997b9b01ce11d5 -
Trigger Event:
release
-
Statement type: