The World's First Neuro-Semantic Event Bus for Cognitive AI Systems
Project description
๐ง NeuroBUS
The World's First Neuro-Semantic Event Bus
"Don't send events. Send understanding."
๐ What is NeuroBUS?
NeuroBUS is a revolutionary event bus that transforms message passing into meaning passing for cognitive AI systems. Unlike traditional event buses (Redis, RabbitMQ, Kafka), NeuroBUS understands the semantic meaning of events and intelligently routes them based on context, not just pattern matching.
Why NeuroBUS?
Traditional event buses are dumb pipes - they match strings and forward messages. NeuroBUS is intelligent - it understands what events mean, maintains context across conversations, remembers past interactions, and can even reason about events using LLMs.
Perfect for:
- ๐ค AI Agent Systems - Multi-agent coordination with semantic understanding
- ๐ Microservices - Intelligent service-to-service communication
- ๐ Data Pipelines - Context-aware stream processing
- ๐ฎ Real-time Systems - Low-latency semantic routing
- ๐งช Event Sourcing - Time-travel debugging with causality tracking
โจ Key Features
๐ฏ Semantic Routing
Events are matched by meaning, not just exact strings. Using transformer embeddings, NeuroBUS understands that "user_logged_in" and "authentication_successful" are semantically similar.
@bus.subscribe("user authentication", semantic=True, threshold=0.8)
async def handle_auth(event: Event):
# Matches: user_login, auth_success, sign_in_complete, etc.
pass
๐ง Context-Aware Processing
Maintain state across 4 hierarchical scopes (global/session/user/event) with automatic context merging and DSL-based filtering.
# Set context at different scopes
bus.context.set_global("app_version", "1.0.0")
bus.context.set_session("user_id", "alice", session_id="sess_123")
# Filter events based on context
@bus.subscribe("alert", filter="priority >= 5 AND user.role == 'admin'")
async def handle_critical_alert(event: Event):
pass
โฐ Temporal Capabilities
Time-travel debugging with event replay, causality tracking, and temporal queries.
# Replay events from the past
async for event in bus.temporal.replay(
from_time=yesterday,
to_time=now,
speed=10.0 # 10x faster
):
# Re-process historical events
pass
# Track event causality
chain = bus.temporal.causality.get_causal_chain(event_id)
root = bus.temporal.causality.get_root(event_id)
๐พ Vector Memory Integration
Native support for Qdrant and LanceDB for semantic event search and long-term memory.
from neurobus.memory import QdrantAdapter
# Store events in vector database
adapter = QdrantAdapter(url="http://localhost:6333")
await adapter.store_event(event, embedding)
# Search semantically similar past events
results = await adapter.search_similar(query_embedding, k=5)
๐ค LLM Reasoning Hooks
Automatically trigger LLM reasoning when events match patterns - no manual integration needed.
from neurobus.llm import LLMBridge
bridge = LLMBridge(provider="openai", api_key="sk-...")
@bridge.hook("error.*", "Analyze this error: {topic}\nData: {data}")
async def analyze_error(event, reasoning):
print(f"LLM Analysis: {reasoning}")
# Automatically invoked on any error.* event
๐ Distributed Clustering
Horizontal scaling with Redis-based multi-node clustering, leader election, and distributed locking.
config = NeuroBusConfig(
distributed={
"enabled": True,
"redis_url": "redis://localhost:6379",
}
)
bus = NeuroBus(config=config)
# Events automatically broadcast across all nodes
๐ Observable & Production-Ready
Built-in metrics, comprehensive logging, and high test coverage (173 tests, 95% coverage).
from neurobus.monitoring.metrics import get_metrics
metrics = get_metrics()
stats = metrics.get_histogram_stats("dispatch_latency_seconds")
# Returns: min, max, mean, p50, p95, p99
๐ Quick Start
Installation
pip install neurobus
Basic Example
import asyncio
from neurobus import NeuroBus, Event
async def main():
# Create bus
bus = NeuroBus()
# Subscribe to events
@bus.subscribe("user.login")
async def handle_login(event: Event):
print(f"User {event.data['username']} logged in")
# Start bus
async with bus:
# Publish event
await bus.publish(Event(
topic="user.login",
data={"username": "alice"}
))
await asyncio.sleep(0.1)
asyncio.run(main())
Semantic Routing Example
from neurobus import NeuroBus, Event, NeuroBusConfig
async def main():
# Enable semantic routing
config = NeuroBusConfig(semantic={"enabled": True})
bus = NeuroBus(config=config)
# Subscribe with semantic matching
@bus.subscribe("greeting", semantic=True, threshold=0.75)
async def handle_greeting(event: Event):
print(f"Got greeting: {event.topic}")
async with bus:
# All these will match semantically!
await bus.publish(Event(topic="hello", data={}))
await bus.publish(Event(topic="hi_there", data={}))
await bus.publish(Event(topic="good_morning", data={}))
await asyncio.sleep(0.5)
asyncio.run(main())
LLM Integration Example
from neurobus import NeuroBus, Event
from neurobus.llm import LLMBridge
async def main():
bus = NeuroBus()
# Setup LLM bridge
llm = LLMBridge(provider="openai", api_key="sk-...")
await llm.initialize()
# Hook LLM to error events
@llm.hook("error.*", "Diagnose: {topic}\nDetails: {data}")
async def diagnose_error(event, reasoning):
print(f"LLM says: {reasoning}")
async with bus:
# This will automatically trigger LLM analysis
await bus.publish(Event(
topic="error.database",
data={"error": "Connection timeout"}
))
await asyncio.sleep(1)
asyncio.run(main())
๐ฆ Installation Options
Basic Installation
pip install neurobus
With Semantic Routing
pip install neurobus[semantic]
# Includes: sentence-transformers, torch
With Vector Databases
pip install neurobus[qdrant] # Qdrant support
pip install neurobus[lancedb] # LanceDB support
pip install neurobus[memory] # Both
With LLM Providers
pip install neurobus[openai] # OpenAI GPT
pip install neurobus[anthropic] # Anthropic Claude
pip install neurobus[ollama] # Local LLMs via Ollama
pip install neurobus[llm] # All LLM providers
With Distributed Support
pip install neurobus[distributed] # Redis clustering
Everything
pip install neurobus[all]
๐ Documentation
- API Documentation - Complete API reference
- Examples - 15+ working examples
- Architecture - System design and internals
- Contributing - How to contribute
- Changelog - Version history
๐ฏ Core Concepts
Events
Events are the fundamental unit of communication in NeuroBUS. Each event has a topic, data, optional context, and metadata.
event = Event(
topic="user.action.completed",
data={"action": "purchase", "amount": 99.99},
context={"user_id": "alice", "session": "xyz"},
metadata={"source": "web", "version": "2.0"}
)
Subscriptions
Subscribe to events using exact patterns, wildcards, or semantic similarity.
# Exact match
@bus.subscribe("user.login")
# Wildcard
@bus.subscribe("user.*")
# Semantic (requires sentence-transformers)
@bus.subscribe("user authentication", semantic=True)
# With context filtering
@bus.subscribe("alert", filter="priority > 5")
Context
Hierarchical state management across 4 scopes with automatic merging.
# Global context (shared across all)
bus.context.set_global("app_name", "MyApp")
# Session context (per-session)
bus.context.set_session("user_id", "alice", session_id="sess_1")
# User context (per-user)
bus.context.set_user("preferences", {"theme": "dark"}, user_id="alice")
# Event context (per-event)
event = Event(topic="action", data={}, context={"trace_id": "abc"})
Temporal
Time-travel debugging with event persistence, replay, and causality tracking.
# Store events (automatic with config)
config = NeuroBusConfig(temporal={"enabled": True})
# Query past events
events = await bus.temporal.query_events(
topic="user.*",
from_time=yesterday,
to_time=now
)
# Replay events
await bus.temporal.replay_events(from_time, to_time, speed=5.0)
# Track causality
chain = bus.temporal.causality.get_causal_chain(event_id)
Memory
Long-term event storage with vector similarity search.
# Enable memory
config = NeuroBusConfig(memory={"enabled": True})
# Search similar events
results = await bus.memory.search("user authentication issues", k=5)
# Get recent memories
recent = bus.memory.get_recent(limit=10)
๐๏ธ Architecture
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ NeuroBUS โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โ
โ โ Semantic โ โ Context โ โ Temporal โ โ LLM โ โ
โ โ Router โ โ Engine โ โ Store โ โ Bridge โ โ
โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โ
โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โ
โ โ Memory โ โ Metrics โ โ Cluster โ โ Core โ โ
โ โ Engine โ โCollector โ โ Manager โ โ Bus โ โ
โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ โ โ โ
โผ โผ โผ โผ
โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโ
โ Qdrant โ โ Redis โ โ SQLite โ โ LLM โ
โLanceDB โ โCluster โ โ WAL โ โProvider โ
โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโ
๐ Performance
- Latency: <2ms P95 for event dispatch
- Throughput: 10,000+ events/second
- Memory: <100MB base footprint
- Scalability: Horizontal scaling via Redis clustering
- Semantic: <5ms embedding generation (cached)
๐งช Testing
NeuroBUS has comprehensive test coverage:
# Run all tests
pytest
# Run with coverage
pytest --cov=neurobus --cov-report=html
# Run specific test suite
pytest tests/unit/
pytest tests/integration/
Test Statistics:
- 173 tests (100% passing)
- 95% code coverage
- 100% type coverage (mypy strict)
๐ค Contributing
We welcome contributions! Please see CONTRIBUTING.md for guidelines.
Quick Start for Contributors:
# Clone repository
git clone https://github.com/eshanized/neurobus.git
cd neurobus
# Install in development mode
pip install -e ".[dev,all]"
# Run tests
pytest
# Format code
black neurobus/ tests/
ruff check neurobus/ tests/
# Type check
mypy neurobus/
๐ License
NeuroBUS is released under the MIT License.
๐ฅ Authors
- Eshan Roy (@eshanized) - Creator & Lead Developer
- TIVerse Labs - Cognitive Infrastructure Division
๐ Acknowledgments
Special thanks to:
- The sentence-transformers team for semantic embeddings
- Qdrant and LanceDB teams for vector database support
- OpenAI and Anthropic for LLM capabilities
- The Python async community
๐ง Contact & Support
- Email: eshanized@proton.me
- Issues: GitHub Issues
- Discussions: GitHub Discussions
- Discord: Join our community (coming soon)
๐บ๏ธ Roadmap
v1.1 (Q1 2025)
- GraphQL API
- Admin Dashboard
- Enhanced monitoring (Grafana dashboards)
- Performance benchmarks suite
v1.2 (Q2 2025)
- Multi-tenancy support
- Rate limiting per subscription
- Schema evolution
- Additional vector DB adapters (Pinecone, Weaviate)
v2.0 (Q3 2025)
- Streaming support
- Plugin architecture
- Cloud-native deployment templates
- Enterprise features
โญ Star Us!
If you find NeuroBUS useful, please star the repository on GitHub!
๐ก Use Cases
AI Agent Coordination
# Multiple agents communicating semantically
@agent1.subscribe("help needed", semantic=True)
async def assist(event):
# Responds to "need help", "assistance required", etc.
pass
Microservices Communication
# Service-to-service with context
bus.context.set_session("request_id", req_id)
await bus.publish(Event("order.created", data=order_data))
IoT & Sensor Networks
# Semantic sensor fusion
@bus.subscribe("temperature reading", semantic=True)
async def process_temp(event):
# Matches various sensor formats
pass
Event Sourcing & CQRS
# Time-travel for debugging
events = await bus.temporal.query_events(
topic="order.*",
from_time=incident_time - 1hour,
to_time=incident_time + 1hour
)
Built with โค๏ธ by TIVerse Labs - Building Cognitive Infrastructure for AI
NeuroBUS: Where Events Meet Intelligence ๐ง โจ
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file neurobus-1.0.0.tar.gz.
File metadata
- Download URL: neurobus-1.0.0.tar.gz
- Upload date:
- Size: 85.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f061620e25c8f78098aca48f88f8b902159d868d78c8cd1f869de343e518a4ad
|
|
| MD5 |
5ab37bf52e108204abdb0250d8dc78d0
|
|
| BLAKE2b-256 |
427bbb065f4ff88a124a2e6a81c3aee0411147a8a46ed57d55abc901a038b6d4
|
File details
Details for the file neurobus-1.0.0-py3-none-any.whl.
File metadata
- Download URL: neurobus-1.0.0-py3-none-any.whl
- Upload date:
- Size: 103.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f0ef1126678a9cebc0308ef9604d6422bc0cf4c65f4d3d0e92de3239ca60dbb7
|
|
| MD5 |
8f4954bc4080fac800f5ceaddeb2c8ea
|
|
| BLAKE2b-256 |
f1320965bd5ae6b9d599009e5a946a23ab3ed0d2a8ea97edbeb242f3f4cc2c54
|