A distributed task queue system similar to Celery, built with Python, SurrealDB, and LangChain
Project description
Surreal Commands
A distributed task queue system similar to Celery, built with Python, SurrealDB, and LangChain. This system allows you to define, submit, and execute asynchronous commands/tasks with real-time processing capabilities.
Features
- Real-time Processing: Uses SurrealDB LIVE queries for instant command pickup
- Concurrent Execution: Configurable concurrent task execution with semaphore controls
- Type Safety: Pydantic models for input/output validation
- LangChain Integration: Commands are LangChain Runnables for maximum flexibility
- Dynamic CLI: Auto-generates CLI from registered commands
- Status Tracking: Track command status through lifecycle (new � running � completed/failed)
- Persistent Queue: Commands persist in SurrealDB across worker restarts
- Comprehensive Logging: Built-in logging with loguru
Architecture Overview
graph TD
CLI[CLI Interface] --> SurrealDB[(SurrealDB Queue)]
SurrealDB --> Worker[Worker Process]
Worker --> Registry[Command Registry]
Registry --> Commands[Registered Commands]
Worker --> |Execute| Commands
Commands --> |Results| SurrealDB
Installation
Install the package using pip:
pip install surreal-commands
Set up environment variables in .env:
# SurrealDB Configuration
SURREAL_URL=ws://localhost:8000/rpc
SURREAL_USER=root
SURREAL_PASSWORD=root
SURREAL_NAMESPACE=test
SURREAL_DATABASE=test
- Ensure SurrealDB is running:
# Using Docker
docker run --rm -p 8000:8000 surrealdb/surrealdb:latest start --user root --pass root
# Or install locally
# See: https://surrealdb.com/install
Quick Start
1. Define Commands
Create your commands using the @command decorator:
# my_app/tasks.py
from surreal_commands import command, submit_command
from pydantic import BaseModel
class ProcessInput(BaseModel):
message: str
uppercase: bool = False
class ProcessOutput(BaseModel):
result: str
length: int
@command("process_text") # Auto-detects app name as "my_app"
def process_text(input_data: ProcessInput) -> ProcessOutput:
result = input_data.message.upper() if input_data.uppercase else input_data.message
return ProcessOutput(result=result, length=len(result))
# Alternative: explicit app name
@command("analyze", app="analytics")
def analyze_data(input_data: ProcessInput) -> ProcessOutput:
return ProcessOutput(result=f"Analyzed: {input_data.message}", length=len(input_data.message))
2. Submit and Monitor Commands
from surreal_commands import submit_command, wait_for_command_sync
# Submit a command
cmd_id = submit_command("my_app", "process_text", {
"message": "hello world",
"uppercase": True
})
print(f"Submitted command: {cmd_id}")
# Wait for completion
result = wait_for_command_sync(cmd_id, timeout=30)
if result.is_success():
print(f"Result: {result.result}")
3. Start the Worker
# Start the worker process (import modules from environment variable)
export SURREAL_COMMANDS_MODULES="tasks"
surreal-commands-worker
# Or specify modules directly via CLI
surreal-commands-worker --import-modules "tasks"
# With debug logging
surreal-commands-worker --debug --import-modules "tasks"
# With custom concurrent task limit
surreal-commands-worker --max-tasks 10 --import-modules "tasks"
# Import multiple modules
surreal-commands-worker --import-modules "tasks,my_app.commands"
4. Monitor with CLI Tools
# View command dashboard
surreal-commands-dashboard
# View real-time logs
surreal-commands-logs
Library Structure
surreal-commands/
├── apps/ # Your command applications
│ └── text_utils/ # Example app
│ ├── __init__.py
│ └── commands.py # Command definitions
├── cli/ # CLI components
│ ├── __init__.py
│ ├── launcher.py # Dynamic CLI generator
│ ├── dashboard.py # (Future) Dashboard UI
│ └── logs.py # (Future) Log viewer
├── commands/ # Core command system
│ ├── __init__.py
│ ├── command_service.py # Command lifecycle management
│ ├── executor.py # Command execution engine
│ ├── loader.py # Command discovery
│ ├── registry.py # Command registry (singleton)
│ ├── registry_types.py # Type definitions
│ └── worker.py # Worker process
├── repository/ # Database layer
│ └── __init__.py # SurrealDB helpers
├── cli.py # CLI entry point
├── run_worker.py # Worker entry point
└── .env # Environment configuration
Core Components
Command Registry
- Singleton pattern for global command management
- Stores commands as LangChain Runnables
- Organizes commands by app namespace
Command Service
- Manages command lifecycle
- Validates arguments against schemas
- Updates command status in real-time
Worker
- Long-running process polling SurrealDB
- Processes existing commands on startup
- Listens for new commands via LIVE queries
- Configurable concurrency limits
Executor
- Handles sync/async command execution
- Type conversion and validation
- Streaming support for long-running tasks
Advanced Usage
Custom Command with Complex Types
from typing import List, Optional
from datetime import datetime
from pydantic import BaseModel, Field
class AnalysisInput(BaseModel):
data: List[float]
method: str = Field(default="mean", description="Analysis method")
threshold: Optional[float] = None
class AnalysisOutput(BaseModel):
result: float
method_used: str
items_processed: int
warnings: List[str] = []
def analyze_data(input_data: AnalysisInput) -> AnalysisOutput:
# Your analysis logic here
pass
Async Commands
async def async_process(input_data: MyInput) -> MyOutput:
# Async processing
await some_async_operation()
return MyOutput(...)
# LangChain handles both sync and async
command = RunnableLambda(async_process)
Working with Execution Context
Commands can access execution metadata (command_id, execution time, etc.) using the CommandInput and CommandOutput base classes. This is the recommended approach that works with all registration methods.
Using CommandInput and CommandOutput
from surreal_commands import command, CommandInput, CommandOutput, ExecutionContext
# Input that can access execution context
class ProcessInput(CommandInput):
message: str
uppercase: bool = False
# Output that includes execution metadata
class ProcessOutput(CommandOutput):
result: str
# command_id, execution_time, and execution_metadata are inherited
@command("process_with_context")
def process_with_context(input_data: ProcessInput) -> ProcessOutput:
# Access execution context from input
ctx = input_data.execution_context
if ctx:
command_id = ctx.command_id
app_name = ctx.app_name
user_context = ctx.user_context or {}
user_id = user_context.get("user_id", "anonymous")
else:
command_id = "unknown"
user_id = "anonymous"
# Process the message
result = input_data.message.upper() if input_data.uppercase else input_data.message
result = f"Processed by {user_id}: {result}"
# Return output - framework automatically populates:
# - command_id (from execution context)
# - execution_time (measured by framework)
# - execution_metadata (additional context info)
return ProcessOutput(result=result)
Benefits of the New Pattern
- Works with all registration methods (decorator and direct registry.register())
- Type-safe with full IDE support
- Automatic metadata population in outputs
- Backward compatible - existing commands continue to work
- Clean API that aligns with LangChain's expectations
Base Class Usage Options
- CommandInput only: Access execution context in your command
- CommandOutput only: Get automatic execution metadata in outputs
- Both: Full execution context support with automatic metadata
- Neither: Regular commands without execution context (backward compatible)
# Option 1: Access context only
class MyInput(CommandInput):
data: str
@command("context_input_only")
def my_command(input_data: MyInput) -> RegularOutput:
ctx = input_data.execution_context
# ... use context ...
return RegularOutput(result="done")
# Option 2: Output metadata only
class MyOutput(CommandOutput):
result: str
@command("context_output_only")
def my_command(input_data: RegularInput) -> MyOutput:
# ... process ...
return MyOutput(result="done") # Gets auto-populated metadata
# Option 3: Full context support
class MyInput(CommandInput):
data: str
class MyOutput(CommandOutput):
result: str
@command("full_context")
def my_command(input_data: MyInput) -> MyOutput:
# ... access context and get metadata ...
ExecutionContext Properties
command_id: Database ID of the commandexecution_started_at: When execution beganapp_name: Application namecommand_name: Command nameuser_context: Optional CLI context (user_id, scope, etc.)
CommandOutput Auto-populated Fields
command_id: Automatically set from execution contextexecution_time: Measured execution time in secondsexecution_metadata: Additional execution information
Migration from Old Pattern
If you have commands using the old pattern with execution_context as a parameter:
# OLD (causes errors):
def old_command(input_data: MyInput, execution_context: ExecutionContext):
command_id = execution_context.command_id
# ...
# NEW (recommended):
class MyInput(CommandInput):
# your fields here
def new_command(input_data: MyInput):
command_id = input_data.execution_context.command_id if input_data.execution_context else None
# ...
See examples/migration_example.py for detailed migration examples.
Monitoring
Check Command Status
# check_results.py
import asyncio
from repository import db_connection
async def check_status():
async with db_connection() as db:
commands = await db.query(
"SELECT * FROM command ORDER BY created DESC LIMIT 10"
)
for cmd in commands:
print(f"{cmd['id']}: {cmd['status']}")
asyncio.run(check_status())
View Logs
# Worker logs with debug info
uv run python run_worker.py --debug
# Filter logs by level
LOGURU_LEVEL=INFO uv run python run_worker.py
Database Schema
Commands are stored in SurrealDB with the following structure:
{
id: "command:unique_id",
app: "app_name",
name: "command_name",
args: { /* command arguments */ },
context: { /* optional context */ },
status: "new" | "running" | "completed" | "failed",
result: { /* command output */ },
error_message: "error details if failed",
created: "2024-01-07T10:30:00Z",
updated: "2024-01-07T10:30:05Z"
}
Development
Adding New Commands
- Create a new app directory under
apps/ - Define your command models and logic
- Register with the command registry
- Restart the worker to pick up new commands
Running Tests
# Run tests (when implemented)
uv run pytest
# With coverage
uv run pytest --cov=commands
Debugging
Use the debug mode to see detailed logs:
# Debug CLI
LOGURU_LEVEL=DEBUG uv run python cli.py text_utils uppercase --text "test"
# Debug Worker
uv run python run_worker.py --debug
Contributing
- Fork the repository
- Create a feature branch
- Make your changes
- Add tests if applicable
- Submit a pull request
Retry Logic
Surreal Commands includes built-in retry logic using the tenacity library. Commands can be configured with automatic retry behavior for handling transient failures.
Basic Retry Configuration
from surreal_commands import command
# Simple retry with 3 attempts
@command("process_data", retry={"max_attempts": 3, "wait_strategy": "fixed", "wait_time": 1})
def process_data(input_data: MyInput) -> MyOutput:
# This command will retry up to 3 times with 1 second wait between attempts
return MyOutput(result="processed")
Wait Strategies
Choose the right wait strategy for your use case:
# Fixed wait time (good for rate-limited APIs)
@command("api_call", retry={"max_attempts": 3, "wait_strategy": "fixed", "wait_time": 2})
def api_call(input_data: MyInput) -> MyOutput:
pass
# Exponential backoff (recommended for network calls)
@command("network_call", retry={
"max_attempts": 5,
"wait_strategy": "exponential",
"wait_min": 1,
"wait_max": 60,
"wait_multiplier": 2
})
def network_call(input_data: MyInput) -> MyOutput:
# Wait times: 1s, 2s, 4s, 8s, 16s (capped at 60s)
pass
# Exponential with jitter (prevents thundering herd)
@command("distributed_call", retry={"wait_strategy": "exponential_jitter"})
def distributed_call(input_data: MyInput) -> MyOutput:
pass
Exception Filtering
Control which exceptions trigger retries:
# Retry only on specific exceptions
@command("selective_retry", retry={
"max_attempts": 3,
"retry_on": [ConnectionError, TimeoutError]
})
def selective_retry(input_data: MyInput) -> MyOutput:
pass
# Don't retry on permanent errors
@command("skip_permanent", retry={
"max_attempts": 3,
"stop_on": [ValueError, TypeError, KeyError]
})
def skip_permanent(input_data: MyInput) -> MyOutput:
pass
Controlling Retry Log Verbosity
Control the log level for retry attempt messages to reduce noise in high-concurrency scenarios:
# Reduce logging for bulk operations with expected transient failures
@command("bulk_embed", retry={
"max_attempts": 5,
"wait_strategy": "exponential",
"retry_log_level": "debug" # Only show retries in debug mode
})
def bulk_embed(input_data: MyInput) -> MyOutput:
pass
# Suppress all retry logs for high-volume background tasks
@command("background_sync", retry={
"max_attempts": 3,
"retry_log_level": "none" # No retry logging
})
def background_sync(input_data: MyInput) -> MyOutput:
pass
# Use warning level for critical operations
@command("critical_api_call", retry={
"max_attempts": 3,
"retry_log_level": "warning" # Make retries more visible
})
def critical_api_call(input_data: MyInput) -> MyOutput:
pass
Available log levels: "debug", "info" (default), "warning", "error", "none"
Global Retry Configuration
Set default retry behavior via environment variables:
# Enable retries globally
SURREAL_COMMANDS_RETRY_ENABLED=true
SURREAL_COMMANDS_RETRY_MAX_ATTEMPTS=3
SURREAL_COMMANDS_RETRY_WAIT_STRATEGY=exponential
SURREAL_COMMANDS_RETRY_WAIT_MIN=1
SURREAL_COMMANDS_RETRY_WAIT_MAX=60
SURREAL_COMMANDS_RETRY_LOG_LEVEL=info
Per-command configuration always overrides global defaults.
Disabling Retry
# Explicitly disable retry for a command
@command("no_retry", retry=None)
def no_retry(input_data: MyInput) -> MyOutput:
# Fails immediately without retry
pass
# Or use enabled flag
@command("also_no_retry", retry={"enabled": False})
def also_no_retry(input_data: MyInput) -> MyOutput:
pass
Type-Safe Configuration
Use RetryConfig for IDE autocomplete and validation:
from surreal_commands import command, RetryConfig, RetryStrategy
config = RetryConfig(
enabled=True,
max_attempts=5,
wait_strategy=RetryStrategy.EXPONENTIAL,
wait_min=1,
wait_max=60,
retry_log_level="debug" # Control log verbosity
)
@command("typed_retry", retry=config)
def typed_retry(input_data: MyInput) -> MyOutput:
pass
See examples/retry_examples.py for more examples.
Future Enhancements
- Web dashboard for monitoring
- Command scheduling (cron-like)
- Priority queues
- Result callbacks
- Retry mechanisms (completed!)
- Command chaining/workflows
- Metrics and monitoring
- REST API endpoint
- Command result TTL
- Dead letter queue
License
This project is licensed under the MIT License - see the LICENSE file for details.
Acknowledgments
- Inspired by Celery's design patterns
- Built with SurrealDB for real-time capabilities
- Leverages LangChain for flexible command definitions
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file surreal_commands-1.3.0.tar.gz.
File metadata
- Download URL: surreal_commands-1.3.0.tar.gz
- Upload date:
- Size: 209.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.21 {"installer":{"name":"uv","version":"0.9.21","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
86984b8572ece504f3261a65e655977f91689203320a9f6ada0749ada35d1158
|
|
| MD5 |
82f22463ff63a5120ae2beb19d452e85
|
|
| BLAKE2b-256 |
2f0b0acea0ee41d2fe815cefdd2948b1bc49e50e9697e74f0f85069c22cc9bbe
|
File details
Details for the file surreal_commands-1.3.0-py3-none-any.whl.
File metadata
- Download URL: surreal_commands-1.3.0-py3-none-any.whl
- Upload date:
- Size: 38.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.21 {"installer":{"name":"uv","version":"0.9.21","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c0787f0111788b8e1852e816865f1ca51e5498d42b4216f3daefd53e30a2cffc
|
|
| MD5 |
3f8ec19985ecc1ab276538573a016708
|
|
| BLAKE2b-256 |
9b5c6ca879072f576f611a78ba761610e3787422b7a96feb25c6c33c72b0bcdf
|