Rill - AI Agent Framework
Project description
A zero-dependency flow orchestration library for building AI workflows your way.
Minimal orchestration that lets you use any LLM client, any tools, any storage to build your AI agent applications.
Why Rill?
You don't need heavy frameworks to build AI agents. Sometimes you just need simple orchestration:
- Want to use your own LLM client (chak / OpenAI SDK / Anthropic SDK)? ✅
- Want to use your own tools (functions / MCP servers / custom implementations)? ✅
- Want to keep your codebase lightweight and dependencies minimal? ✅
- Prefer code over YAML/JSON configs? Code is the orchestration. ✅
Rill just handles orchestration - bring your own pieces, Rill manages the flow.
Core Concepts
Rill draws inspiration from CrewAI's "code is flow" approach - use decorators to define nodes, use Python functions to express logic, no YAML or JSON configs needed.
Built on this foundation, Rill adds:
- Direct routing: Declare next steps with
gotoright where you are, not reverse subscription - No dependencies: Framework handles orchestration only, everything else is your call
Thanks to CrewAI and LangGraph for the inspiration.
Quick Start
Installation
# From PyPI
pip install rillpy
# From GitHub
pip install git+https://github.com/zhixiangxue/rill-ai.git@main
# Local development
git clone https://github.com/zhixiangxue/rill-ai.git
cd rill-ai
pip install -e .
Core Features
🌱 No Lock-in
Framework doesn't care what LLM, tools, or databases you use. Only provides orchestration.
🪴 Code is Flow
Define nodes with @node decorator, declare routing with goto, no DSL needed.
🌻 Direct Routing
Use goto(next, data) to directly specify next step in current node. Matches how humans think.
🌾 List Means Parallel
goto([A, B], data) automatically triggers asyncio.gather for parallel execution.
🌿 Auto Input Merge
When multiple predecessors point to same node, framework auto-merges outputs as {pred_name: output} dict.
🍀 Safety Guards
Graph validation (start point / cycles / reachability) + max_loop to prevent infinite loops.
🌲 Observable
Flow.stats() tracks node execution time, logger traces execution flow.
🪴 Shared State Management
Rill provides FlowState for sharing data across nodes. Simpler than LangGraph's in-node state updates.
TODO: Parallel nodes updating state simultaneously may have thread-safety issues. Community contributions welcome.
🌾 Return Value as Input
Predecessor node's return value becomes successor node's input parameter. No need to put everything in state.
Common Patterns
Conditional Branching + Parallel Execution
class ResearchFlow(Flow):
@node(start=True, goto=DYNAMIC)
async def decide(self, topic):
complexity = await self.analyze_complexity(topic)
if complexity > 0.8:
# High complexity: parallel deep research
return goto([self.academic_search, self.expert_interview], topic)
else:
# Low complexity: quick search
return goto(self.web_search, topic)
@node(goto="synthesize")
async def academic_search(self, topic):
return await search_papers(topic)
@node(goto="synthesize")
async def expert_interview(self, topic):
return await interview_experts(topic)
@node(goto="synthesize")
async def web_search(self, topic):
return await search_web(topic)
@node()
async def synthesize(self, sources):
# Auto-merge: sources could be {"academic_search": ..., "expert_interview": ...}
# or just web_search output (single predecessor)
return await generate_report(sources)
Loop + Exit Condition
class IterativeFlow(Flow):
@node(start=True, goto=DYNAMIC, max_loop=5)
async def generate(self, prompt):
result = await llm_generate(prompt)
quality = await self.evaluate(result)
if quality > 0.9:
return goto(self.finalize, result)
else:
# Loop back with feedback
return goto(self.generate, {"prompt": prompt, "feedback": quality})
@node()
async def finalize(self, result):
return result
Using Shared State
class MyWorkflow(Flow):
@node(start=True, goto=["fetch_data", "process_config"])
async def begin(self, inputs):
# Store inputs in state for other nodes to access
self.state.user_id = inputs["user_id"]
self.state.query = inputs["query"]
self.state.results = [] # Initialize shared collection
@node(goto="merge")
async def fetch_data(self, previous_result):
# Access state from parallel node
data = await api_call(self.state.user_id, self.state.query)
# Accumulate results in state
self.state.results.append({"source": "api", "data": data})
return data
@node(goto="merge")
async def process_config(self, previous_result):
# Another parallel node accessing same state
config = load_config(self.state.user_id)
# Also update shared state
self.state.config = config
return config
@node()
async def merge(self, inputs):
# inputs = {"fetch_data": ..., "process_config": ...}
# state contains accumulated data from all nodes
final_result = combine(
inputs["fetch_data"],
inputs["process_config"],
self.state.results # Access shared state
)
self.state.final_output = final_result
return final_result
# Run the workflow
flow = MyWorkflow()
final_state = await flow.run({
"user_id": 123,
"query": "hello"
}) # 🎯 Rill auto-converts your dict to a Pydantic FlowState object!
# Access final state (Pydantic model)
print(final_state.final_output) # 🎉 Flow.run() returns the final FlowState
print(final_state.user_id) # Access any field stored during execution
print(final_state.results) # All accumulated data persists here
State vs Return Value:
- Return value: Direct data passing from predecessor to successor (the main data pipeline)
- State: Shared context accessible from any node (for metadata, counters, cross-branch data)
- Key difference: Return values flow through edges, state persists across the entire workflow
- Use return values for primary data flow, use state for auxiliary data that multiple nodes need
Known Issue:
- ⚠️ Parallel nodes updating state simultaneously may cause race conditions
- 🔧 TODO: Need thread-safe state update mechanism (community contributions welcome)
When to Use Rill?
| Your Situation | Recommendation |
|---|---|
| Quick GPT app, don't want to manage anything | 👉 LangChain / LangGraph (all-in-one convenience) |
| Want to use my own LLM client (chak / OpenAI SDK) + custom tools | 👉 Rill (orchestration flexibility) |
| Just want orchestration only, pick other components myself | 👉 Rill |
FAQ
Q: What's the difference between Rill and LangGraph?
A: LangGraph is a complete package (orchestration + LLM + tools + memory), Rill only handles orchestration, other components are up to you.
Q: I'm already using LangChain Tools, can I use Rill?
A: Yes! Rill doesn't care where your tools come from, just call them directly in nodes.
Q: Does Rill support state persistence?
A: Current FlowState is in-memory state (Pydantic model), persistence is your choice (Redis / PostgreSQL / files), no tie to any storage.
Q: I want to use my own LLM client (e.g., chak), how to integrate?
A: Just import chak in nodes and call it, Rill doesn't care which LLM you use. Example:
@node(start=True, goto="process")
async def query(self, user_input):
from chak import Conversation # Your LLM client
conv = Conversation("openai/gpt-4o-mini", api_key="YOUR_KEY")
return await conv.asend(user_input)
Q: When do I need max_loop?
A: When your flow has cycles (e.g., "generate → evaluate → regenerate"), use max_loop to limit loop iterations and prevent infinite loops.
Q: How does input merging work?
A: When multiple predecessor nodes point to the same target node, and all predecessors complete, the framework merges their outputs as a dict {pred_name: output} and passes it to the target node.
Q: What's the difference between state and return value?
A: They serve different purposes:
- Node return value: Passes data to the next node(s) through the flow edge. This is the main data pipeline.
- State (
self.state): A shared Pydantic object accessible from all nodes throughout the workflow. Use it for metadata, counters, configuration, or data that multiple branches need to access. - Example: Return the processed result to next node, but store statistics/metadata in state.
Q: Is state update thread-safe in parallel nodes?
A: Not yet. Parallel nodes updating state simultaneously may cause race conditions. This is a known TODO. For now, avoid state updates in parallel nodes or use return values instead.
API Reference
Flow
Main workflow class, inherit to define your workflow.
class MyFlow(Flow):
def __init__(self, initial_state=None, max_steps=1000, validate=True):
super().__init__(initial_state, max_steps, validate)
initial_state: Initial state dict or Pydantic modelmax_steps: Max execution steps (prevent infinite loops)validate: Whether to validate graph before execution
@node
Decorator to mark methods as workflow nodes.
@node(start=False, goto=None, max_loop=None)
def my_node(self, inputs):
pass
start: Whether this is the start nodegoto: Next node(s), can be:None: No successors (end node)"node_name": Single next node["node1", "node2"]: Multiple nodes (parallel execution)DYNAMIC: Runtime-determined routing (must returngoto(...)in node)
max_loop: Max loop count for this node (for cycle detection)
goto(target, data)
Choose next node for DYNAMIC routing.
@node(goto=DYNAMIC)
async def decide(self, inputs):
if condition:
return goto(self.next_node, data)
else:
return goto([self.task_a, self.task_b], data) # Parallel
target: Single node or list of nodes (list triggers parallel execution)data: Payload passed to target node(s)
DYNAMIC
Use this for dynamic routing decisions. Use with goto().
FlowState
Shared state object (Pydantic model).
# Access state from any node
self.state.custom_field = "value" # Runtime field injection
self.state.user_id = 123
self.state.results = [] # Shared collection
# Two independent data channels:
# 1. Return value: flows through edges (node → successor)
# 2. State: shared context persists across entire workflow
Known Issue: Parallel nodes updating state simultaneously may cause race conditions (TODO).
Flow.run(initial_input)
Execute the flow.
result_state = await flow.run({"user_input": "Hello"})
Flow.stats()
Get execution statistics.
stats = flow.stats()
# {
# "timing": {
# "total_duration": 2.35,
# "nodes": {
# "query": {"duration": 1.2, "percentage": 51.06},
# "search": {"duration": 0.8, "percentage": 34.04}
# }
# }
# }
Architecture
┌─────────────────────────────────────────┐
│ Your Application │
│ LLM: chak / OpenAI / Anthropic / ... │
│ Tools: MCP / Functions / LangChain │
│ Storage: ChromaDB / PostgreSQL / Redis │
└──────────────┬──────────────────────────┘
│ Use Rill for orchestration
┌──────────────▼──────────────────────────┐
│ Rill Orchestration │
│ @node + goto + parallel + State │
└─────────────────────────────────────────┘
Dependencies
- Python >= 3.8
- pydantic >= 2.0.0
- loguru >= 0.7.0
License
MIT License - see LICENSE file for details.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file rillpy-0.1.1.tar.gz.
File metadata
- Download URL: rillpy-0.1.1.tar.gz
- Upload date:
- Size: 18.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4fed2e69e04aeb0007ba3b9dad3a1775e26d7dc3c4afbc6d2eeeed5eb3f680da
|
|
| MD5 |
7e1b6696fc6cb7e39e7b091eb47c768b
|
|
| BLAKE2b-256 |
9d446a927f2076188cb168aa1fb73cf4dae3800c543cd21315e33f410901125c
|
Provenance
The following attestation bundles were made for rillpy-0.1.1.tar.gz:
Publisher:
publish.yml on zhixiangxue/rill-ai
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
rillpy-0.1.1.tar.gz -
Subject digest:
4fed2e69e04aeb0007ba3b9dad3a1775e26d7dc3c4afbc6d2eeeed5eb3f680da - Sigstore transparency entry: 1100976130
- Sigstore integration time:
-
Permalink:
zhixiangxue/rill-ai@0b758c0cfb8ebda2a6ba5ce0dc5605c88f29ecf8 -
Branch / Tag:
refs/tags/v0.1.1 - Owner: https://github.com/zhixiangxue
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@0b758c0cfb8ebda2a6ba5ce0dc5605c88f29ecf8 -
Trigger Event:
release
-
Statement type:
File details
Details for the file rillpy-0.1.1-py3-none-any.whl.
File metadata
- Download URL: rillpy-0.1.1-py3-none-any.whl
- Upload date:
- Size: 14.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7d503b5922fbbfe197781ad03af4087d54a1917e2d280670b40df669c9a370c2
|
|
| MD5 |
66b34dbe8f3dfa87b9865d68d93c9e93
|
|
| BLAKE2b-256 |
b38819b2d999620bb805a87526a74e0e4b3585440c3ebc95a8bb53bec8024ab2
|
Provenance
The following attestation bundles were made for rillpy-0.1.1-py3-none-any.whl:
Publisher:
publish.yml on zhixiangxue/rill-ai
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
rillpy-0.1.1-py3-none-any.whl -
Subject digest:
7d503b5922fbbfe197781ad03af4087d54a1917e2d280670b40df669c9a370c2 - Sigstore transparency entry: 1100976180
- Sigstore integration time:
-
Permalink:
zhixiangxue/rill-ai@0b758c0cfb8ebda2a6ba5ce0dc5605c88f29ecf8 -
Branch / Tag:
refs/tags/v0.1.1 - Owner: https://github.com/zhixiangxue
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@0b758c0cfb8ebda2a6ba5ce0dc5605c88f29ecf8 -
Trigger Event:
release
-
Statement type: