A simple way to build python services running on a raspberry pi.
Project description
Daebus
A simple framework for building systemd daemons and background services with a Flask-like interface, designed for inter-service communication in Linux environments. Daebus makes it easy to build, connect, and manage services for embedded devices like Raspberry Pi kiosks.
Overview
Daebus was created to solve the challenge of managing multiple systemd services on Linux devices such as Raspberry Pi kiosks. It provides a Flask-inspired API that makes it simple to:
- Create systemd daemons that communicate with each other
- Expose HTTP endpoints for frontend applications (e.g., kiosk browsers)
- Enable quick bootstrapping of new services with minimal boilerplate
Services communicate via Redis pub/sub channels, providing a lightweight and reliable messaging system ideal for local service orchestration.
Note: Daebus requires a Redis instance running on the system.
Features
- Flask-inspired API: Simple, decorator-based API for defining service functionality
- Redis Pub/Sub: Direct channel-based communication between services
- Action Routing: Route messages to specific handlers based on action field
- Background tasks: Easy scheduling of periodic background tasks
- Client library:
DaebusCallerallows for easy communication with Daebus services - HTTP endpoints: Expose lightweight HTTP APIs from your daemon without additional dependencies
- Specialized APIs: Distinct and purpose-built APIs for HTTP and pub/sub contexts
Installation
pip install daebus
Quick Start
Creating a Service
from Daebus import Daebus, request, response, broadcast, cache, logger
app = Daebus(__name__)
# Define an action handler (like a Flask route)
@app.action("get_status")
def handle_status_request():
try:
payload = request.payload
logger.info(f"Received status request: {payload}")
# Process the request
result = {"status": "healthy", "uptime": 3600}
# Send response back to the requester
return response.success(result)
except Exception as e:
return response.error(e)
# Define another action handler
@app.action("restart")
def handle_restart():
try:
logger.info("Handling restart request")
# Restart logic here
return response.success({"restarted": True})
except Exception as e:
return response.error(e)
# Define a channel listener for broadcasts
@app.listen("notifications")
def handle_notification(payload):
logger.info(f"Received notification: {payload}")
# Process the notification
# Define a background task that runs every 30 seconds
@app.background("health_check", 30)
def health_check():
logger.info("Running health check")
# Perform health check
# Broadcast status to any interested services
broadcast.send("system_status", {"status": "healthy"})
# Run the service
if __name__ == "__main__":
app.run(service="my_service", debug=True)
Communicating with Services using DaebusCaller
from Daebus import DaebusCaller
# Create a caller for the target service
caller = DaebusCaller("my_service")
try:
# Send a request to a specific action and wait for a response
response = caller.send_request("get_status", {
"detail_level": "full"
})
print(f"Service status: {response}")
# Send a restart request
restart_response = caller.send_request("restart", {
"mode": "graceful"
})
print(f"Restart result: {restart_response}")
# Send a message to a broadcast channel
caller.send_message("notifications", {
"type": "alert",
"message": "System temperature high"
})
# Send a message directly to the service with a specific action
caller.send_to_service({
"timestamp": 1625176230,
"info": "Direct message with action"
}, action="log_info")
finally:
# Clean up resources when done
caller.close()
Adding HTTP Endpoints to Your Service
You can easily add HTTP endpoints to your daemon using the built-in HTTP server:
from Daebus import Daebus, DaebusHttp
app = Daebus(__name__)
# Create and attach an HTTP endpoint
http = DaebusHttp(port=8080)
app.attach(http)
# Define HTTP routes (similar to Flask)
@app.route("/status")
def get_status(req):
# Use the request object to access request data
# req.payload contains JSON, form data, or query params
# For HTTP responses, use response.send with data and status code
return response.send({
"service": "my_service",
"status": "running",
"timestamp": time.time()
}, 200) # 200 OK status code
# Routes with parameters
@app.route("/devices/<device_id>")
def get_device(req, device_id):
# The device_id parameter is extracted from the URL
if device_id == "123":
return response.send({
"device_id": device_id,
"status": "online"
}, 200)
else:
# Error responses with appropriate status codes
return response.send({
"error": "Device not found"
}, 404) # 404 Not Found
# POST requests
@app.route("/control", methods=["POST"])
def control_action(req):
# Access JSON data from the request payload
data = req.payload
# Process the request
return response.send({
"success": True,
"message": "Command received"
}, 201) # 201 Created
# Run the service with HTTP enabled
app.run(service="my_service")
Configuring CORS for HTTP Endpoints
If your HTTP API needs to be accessed from web browsers, you can configure Cross-Origin Resource Sharing (CORS):
from Daebus import Daebus, DaebusHttp
app = Daebus(__name__)
# Configure CORS when creating the HTTP endpoint
cors_config = {
'allowed_origins': ['http://localhost:3000', 'https://example.com'], # Specific origins
'allowed_methods': ['GET', 'POST', 'OPTIONS'], # Allowed HTTP methods
'allowed_headers': ['Content-Type', 'Authorization'], # Allowed headers
'expose_headers': ['X-Custom-Header'], # Headers exposed to client
'allow_credentials': True, # Allow cookies
'max_age': 3600 # Cache preflight for 1 hour
}
# Create HTTP endpoint with CORS support
http = DaebusHttp(port=8080, cors_config=cors_config)
app.attach(http)
# Alternatively, configure CORS after initialization
# http = DaebusHttp(port=8080)
# http.configure_cors({
# 'allowed_origins': '*', # Allow all origins
# 'allowed_methods': ['GET', 'POST'],
# 'allowed_headers': '*', # Allow all headers
# })
@app.route("/api/data")
def get_data(req):
# This endpoint will now include CORS headers in the response
return response.send({"data": "example"}, 200)
You can use these CORS configuration options:
allowed_origins: List of allowed origins or'*'for any originallowed_methods: List of allowed HTTP methods or'*'for any methodallowed_headers: List of allowed headers or'*'for any headerexpose_headers: List of headers to expose to the clientallow_credentials: Boolean for allowing credentials (cookies)max_age: Cache time (seconds) for preflight requests
Protocol-Specific Response Methods
Daebus provides distinct response methods optimized for each protocol:
from Daebus import Daebus, DaebusHttp, request, response
app = Daebus(__name__)
http = DaebusHttp(port=8080)
app.attach(http)
# Shared function for business logic
def get_status_data():
return {
"service": "my_service",
"status": "healthy",
"uptime": 3600
}
# HTTP route with HTTP-specific response using response.send
@app.route("/status")
def http_status_handler(req):
try:
data = get_status_data()
# Use response.send() for HTTP responses, with status code
return response.send(data, 200)
except Exception as e:
# HTTP errors with appropriate status codes
return response.send({"error": str(e)}, 500)
# Redis action handler with pub/sub specific response methods
@app.action("get_status")
def redis_status_handler():
try:
data = get_status_data()
# Use response.success() for Redis responses
return response.success(data)
except Exception as e:
# Redis-specific error handling
return response.error(e)
Advanced: Direct Access to Protocol-Specific Classes
For advanced usage, you can access the protocol-specific classes directly:
from Daebus import (
Daebus, DaebusHttp,
HttpRequest, HttpResponse, # HTTP-specific classes
PubSubRequest, PubSubResponse # Pub/Sub-specific classes
)
app = Daebus(__name__)
http = DaebusHttp(port=8080)
app.attach(http)
# Use HTTP-specific classes directly for advanced customization
@app.route("/advanced")
def advanced_handler(req):
# Check if we have the right request type
if not isinstance(req, HttpRequest):
raise TypeError("Expected an HttpRequest object")
# Create a custom HTTP response
custom_response = HttpResponse(None)
return custom_response.send({
"message": "Custom HTTP response",
"path": req.path
}, 200)
Benefits of protocol-specific methods:
response.send(data, status_code)- HTTP focused, with explicit status codesresponse.success(data)- Pub/sub focused, adds necessary pub/sub metadataresponse.error(exception)- Pub/sub focused, formats exceptions for pub/sub- Clear distinction between HTTP and pub/sub handling
- Direct access to protocol-specific classes for advanced customization
How It Works
Action Routing
Daebus uses a simple action routing system:
- Each service automatically listens on its main channel (named after the service)
- Messages sent to this channel can include an
actionfield to route to specific handlers - Handlers are registered using
@app.action("action_name")
For example:
- When a message with
action: "get_status"is received, it's routed to the function decorated with@app.action("get_status") - When a message with
action: "restart"is received, it's routed to the function decorated with@app.action("restart")
Channel Types
Daebus uses two types of Redis pub/sub channels:
- Service Channels: Named after the service (e.g.,
my_service) and used for direct communication with action routing - Custom Channels: Any additional channels that services can publish to or subscribe to (e.g.,
notifications,system_status)
Message Flow
-
Request/Response:
- Client calls
send_request("action_name", payload) - DaebusCaller sends message to service's main channel with the action field
- Service routes to appropriate handler based on action
- Handler processes request and sends response back to caller's response channel
- Client calls
-
Broadcasts:
- Any service can broadcast to any channel with
broadcast.send(channel, payload) - Interested services subscribe to those channels with
@app.listen(channel)
- Any service can broadcast to any channel with
HTTP Endpoints
Daebus provides a lightweight HTTP server that:
- Uses Python's built-in
http.servermodule (no external dependencies) - Automatically converts Python dictionaries to JSON responses
- Parses JSON and form data from requests
- Supports URL parameters with the same syntax as Flask (
/path/<param>) - Runs in a background thread, allowing your service to handle both Redis and HTTP requests
Protocol-Specific Classes
Daebus provides specialized classes for each protocol:
-
For Pub/Sub (Redis):
PubSubRequest: Handles pub/sub message payloads, reply channels, and request IDsPubSubResponse: Providessuccess(),error(), andprogress()methods for pub/sub responses
-
For HTTP:
HttpRequest: Adapts HTTP requests to a similar interface as pub/sub requestsHttpResponse: Providessend()method for HTTP responses with status codes
These specialized classes are normally accessed through the request and response proxies, which automatically select the appropriate implementation based on the current context.
Key Concepts
Action Handlers vs Channel Listeners
-
Action Handlers (
@app.action):- Receive messages from the service's main channel
- Automatically routed based on the
actionfield - Support request/response pattern with
response.success()andresponse.error()
-
Channel Listeners (
@app.listen):- Receive broadcast messages from any channel
- Used for one-way communications (no responses)
- Useful for event notifications and broadcasts
Lifecycle Hooks
Daebus provides lifecycle hooks to run code at specific points during the service's lifecycle:
- On Start (
@app.on_start()):- Runs once after the service has fully started
- All Redis connections, threads, and components are initialized
- Ideal for initialization tasks, broadcasting startup events, or setting initial cache values
Example:
@app.on_start()
def initialize_service():
logger.info("Service is starting up...")
# Set initial cache values
cache.set("service.start_time", str(time.time()))
cache.set("service.status", "running")
# Broadcast a startup message
broadcast.send("system.events", {
"event": "startup",
"service": app.name,
"timestamp": time.time()
})
Context Objects
Within handlers, you have access to:
request: Contains the incoming request data (adapts to HTTP or Redis context)response: Used to send responses back to the requester- For Redis:
response.success(),response.error() - For HTTP:
response.send(data, status_code)
- For Redis:
broadcast: Used to publish messages to channelscache: Direct access to Redis for storagelogger: Configured logger for the service
Advanced Usage
Custom Service Extensions
You can extend DaebusCaller to create custom service-specific clients:
class CameraServiceCaller(DaebusCaller):
def __init__(self, redis_host='localhost', redis_port=6379):
super().__init__("camera_service", redis_host, redis_port)
def take_picture(self, resolution="high"):
return self.send_request("take_picture", {
"resolution": resolution
})
def start_recording(self, duration=60):
return self.send_request("start_recording", {
"duration": duration
})
Error Handling
Daebus provides built-in error handling for actions:
@app.action("risky_operation")
def risky_operation():
try:
# Perform risky operation
result = perform_operation()
return response.success({"result": result})
except Exception as e:
logger.error(f"Operation failed: {e}")
return response.error(e)
Background
Daebus was born from the need to manage multiple systemd services on Raspberry Pi kiosks. The project required several background daemons that could:
- Communicate with each other efficiently
- Expose HTTP endpoints for frontend applications running in Chromium
- Be easily bootstrapped with minimal boilerplate
Traditional approaches like Unix sockets proved limiting, so Redis pub/sub was chosen as the communication backbone due to its reliability and simplicity. The Flask-like interface was inspired by the ease of setting up web services with Flask, bringing that same developer experience to systemd daemon development.
For proper operation, Daebus requires a Redis instance running on the system to handle the pub/sub messaging between services.
Version Management
To increment the version number, use bump2version:
# For patch version (0.0.1 -> 0.0.2)
bump2version patch
# For minor version (0.1.0 -> 0.2.0)
bump2version minor
# For major version (1.0.0 -> 2.0.0)
bump2version major
# Don't forget to push both the commit and the tags
git push && git push --tags
You can also use the GitHub Actions workflow "Bump Version" to automatically bump versions and push tags.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file daebus-0.0.9.tar.gz.
File metadata
- Download URL: daebus-0.0.9.tar.gz
- Upload date:
- Size: 31.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
53d1c428faea61ccc4cc1dc1637e77d7c02c7773df82ce6ab4971aa00b739aa0
|
|
| MD5 |
25e24af63b195636aaf7ab4692f458f0
|
|
| BLAKE2b-256 |
2b032556e539b34969385044c278dd58c35b036bc2de4cfadd39d0d7de82cb97
|
Provenance
The following attestation bundles were made for daebus-0.0.9.tar.gz:
Publisher:
python-publish.yml on twestos/Daebus
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
daebus-0.0.9.tar.gz -
Subject digest:
53d1c428faea61ccc4cc1dc1637e77d7c02c7773df82ce6ab4971aa00b739aa0 - Sigstore transparency entry: 210268108
- Sigstore integration time:
-
Permalink:
twestos/Daebus@5f407dc585e2341d85ef35943964dc084b9e1d18 -
Branch / Tag:
refs/heads/main - Owner: https://github.com/twestos
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
python-publish.yml@5f407dc585e2341d85ef35943964dc084b9e1d18 -
Trigger Event:
workflow_run
-
Statement type:
File details
Details for the file daebus-0.0.9-py3-none-any.whl.
File metadata
- Download URL: daebus-0.0.9-py3-none-any.whl
- Upload date:
- Size: 27.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6d4184d2b66c871616af883dca31c278581919475475d171c833f8b2d54fe45d
|
|
| MD5 |
0698fcf8ee6a6eae3bc8f7e178a9226d
|
|
| BLAKE2b-256 |
20738fc7def0bc9a98ee0f9e9ad542db2f455ddd49c49bd27d9eef617d175407
|
Provenance
The following attestation bundles were made for daebus-0.0.9-py3-none-any.whl:
Publisher:
python-publish.yml on twestos/Daebus
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
daebus-0.0.9-py3-none-any.whl -
Subject digest:
6d4184d2b66c871616af883dca31c278581919475475d171c833f8b2d54fe45d - Sigstore transparency entry: 210268119
- Sigstore integration time:
-
Permalink:
twestos/Daebus@5f407dc585e2341d85ef35943964dc084b9e1d18 -
Branch / Tag:
refs/heads/main - Owner: https://github.com/twestos
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
python-publish.yml@5f407dc585e2341d85ef35943964dc084b9e1d18 -
Trigger Event:
workflow_run
-
Statement type: