Skip to main content

A lightweight framework for building Python background services with a Flask-like API, enabling seamless inter-service communication via Redis pub/sub.

Project description

Daebus

A lightweight framework for building Python background services with a Flask-like API, enabling seamless inter-service communication via Redis pub/sub.

Overview

Daebus provides a simple way to create and connect distributed background services. It offers a Flask-inspired API that makes it easy to:

  • Create systemd daemons that communicate with each other
  • Expose HTTP endpoints for frontend applications (e.g., kiosk browsers)
  • Enable quick bootstrapping of new services with minimal boilerplate

Services communicate via Redis pub/sub channels, providing a lightweight and reliable messaging system ideal for local service orchestration.

Note: Daebus requires a Redis instance running on the system.

Features

  • Flask-inspired API: Simple, decorator-based API for defining service functionality
  • Redis Pub/Sub: Direct channel-based communication between services
  • Action Routing: Route messages to specific handlers based on action field
  • Background tasks: Easy scheduling of periodic background tasks
  • Client library: DaebusCaller allows for easy communication with Daebus services
  • HTTP endpoints: Expose lightweight HTTP APIs from your daemon without additional dependencies
  • Specialized APIs: Distinct and purpose-built APIs for HTTP and pub/sub contexts

Installation

pip install daebus

Quick Start

Creating a Service

from Daebus import Daebus, request, response, broadcast, cache, logger

app = Daebus(__name__)

# Define an action handler (like a Flask route)
@app.action("get_status")
def handle_status_request():
    try:
        payload = request.payload
        logger.info(f"Received status request: {payload}")

        # Process the request
        result = {"status": "healthy", "uptime": 3600}

        # Send response back to the requester
        return response.success(result)
    except Exception as e:
        return response.error(e)

# Define another action handler
@app.action("restart")
def handle_restart():
    try:
        logger.info("Handling restart request")
        # Restart logic here
        return response.success({"restarted": True})
    except Exception as e:
        return response.error(e)

# Define a channel listener for broadcasts
@app.listen("notifications")
def handle_notification(payload):
    logger.info(f"Received notification: {payload}")
    # Process the notification

# Define a background task that runs every 30 seconds
@app.background("health_check", 30)
def health_check():
    logger.info("Running health check")
    # Perform health check

    # Broadcast status to any interested services
    broadcast.send("system_status", {"status": "healthy"})

# Run the service
if __name__ == "__main__":
    app.run(service="my_service", debug=True)

Communicating with Services using DaebusCaller

from Daebus import DaebusCaller

# Create a caller for the target service
caller = DaebusCaller("my_service")

try:
    # Send a request to a specific action and wait for a response
    response = caller.send_request("get_status", {
        "detail_level": "full"
    })
    print(f"Service status: {response}")

    # Send a restart request
    restart_response = caller.send_request("restart", {
        "mode": "graceful"
    })
    print(f"Restart result: {restart_response}")

    # Send a message to a broadcast channel
    caller.send_message("notifications", {
        "type": "alert",
        "message": "System temperature high"
    })

    # Send a message directly to the service with a specific action
    caller.send_to_service({
        "timestamp": 1625176230,
        "info": "Direct message with action"
    }, action="log_info")
finally:
    # Clean up resources when done
    caller.close()

Adding HTTP Endpoints to Your Service

You can easily add HTTP endpoints to your daemon using the built-in HTTP server:

from Daebus import Daebus, DaebusHttp

app = Daebus(__name__)

# Create and attach an HTTP endpoint
http = DaebusHttp(port=8080)
app.attach(http)

# Define HTTP routes (similar to Flask)
@app.route("/status")
def get_status(req):
    # Use the request object to access request data
    # req.payload contains JSON, form data, or query params

    # For HTTP responses, use response.send with data and status code
    return response.send({
        "service": "my_service",
        "status": "running",
        "timestamp": time.time()
    }, 200)  # 200 OK status code

# Routes with parameters
@app.route("/devices/<device_id>")
def get_device(req, device_id):
    # The device_id parameter is extracted from the URL
    if device_id == "123":
        return response.send({
            "device_id": device_id,
            "status": "online"
        }, 200)
    else:
        # Error responses with appropriate status codes
        return response.send({
            "error": "Device not found"
        }, 404)  # 404 Not Found

# POST requests
@app.route("/control", methods=["POST"])
def control_action(req):
    # Access JSON data from the request payload
    data = req.payload

    # Process the request
    return response.send({
        "success": True,
        "message": "Command received"
    }, 201)  # 201 Created

# Run the service with HTTP enabled
app.run(service="my_service")

Configuring CORS for HTTP Endpoints

If your HTTP API needs to be accessed from web browsers, you can configure Cross-Origin Resource Sharing (CORS):

from Daebus import Daebus, DaebusHttp

app = Daebus(__name__)

# Configure CORS when creating the HTTP endpoint
cors_config = {
    'allowed_origins': ['http://localhost:3000', 'https://example.com'],  # Specific origins
    'allowed_methods': ['GET', 'POST', 'OPTIONS'],                        # Allowed HTTP methods
    'allowed_headers': ['Content-Type', 'Authorization'],                 # Allowed headers
    'expose_headers': ['X-Custom-Header'],                                # Headers exposed to client
    'allow_credentials': True,                                            # Allow cookies
    'max_age': 3600                                                       # Cache preflight for 1 hour
}

# Create HTTP endpoint with CORS support
http = DaebusHttp(port=8080, cors_config=cors_config)
app.attach(http)

# Alternatively, configure CORS after initialization
# http = DaebusHttp(port=8080)
# http.configure_cors({
#     'allowed_origins': '*',  # Allow all origins
#     'allowed_methods': ['GET', 'POST'],
#     'allowed_headers': '*',  # Allow all headers
# })

@app.route("/api/data")
def get_data(req):
    # This endpoint will now include CORS headers in the response
    return response.send({"data": "example"}, 200)

You can use these CORS configuration options:

  • allowed_origins: List of allowed origins or '*' for any origin
  • allowed_methods: List of allowed HTTP methods or '*' for any method
  • allowed_headers: List of allowed headers or '*' for any header
  • expose_headers: List of headers to expose to the client
  • allow_credentials: Boolean for allowing credentials (cookies)
  • max_age: Cache time (seconds) for preflight requests

Protocol-Specific Response Methods

Daebus provides distinct response methods optimized for each protocol:

from Daebus import Daebus, DaebusHttp, request, response

app = Daebus(__name__)
http = DaebusHttp(port=8080)
app.attach(http)

# Shared function for business logic
def get_status_data():
    return {
        "service": "my_service",
        "status": "healthy",
        "uptime": 3600
    }

# HTTP route with HTTP-specific response using response.send
@app.route("/status")
def http_status_handler(req):
    try:
        data = get_status_data()

        # Use response.send() for HTTP responses, with status code
        return response.send(data, 200)
    except Exception as e:
        # HTTP errors with appropriate status codes
        return response.send({"error": str(e)}, 500)

# Redis action handler with pub/sub specific response methods
@app.action("get_status")
def redis_status_handler():
    try:
        data = get_status_data()

        # Use response.success() for Redis responses
        return response.success(data)
    except Exception as e:
        # Redis-specific error handling
        return response.error(e)

Advanced: Direct Access to Protocol-Specific Classes

For advanced usage, you can access the protocol-specific classes directly:

from Daebus import (
    Daebus, DaebusHttp,
    HttpRequest, HttpResponse,  # HTTP-specific classes
    PubSubRequest, PubSubResponse  # Pub/Sub-specific classes
)

app = Daebus(__name__)
http = DaebusHttp(port=8080)
app.attach(http)

# Use HTTP-specific classes directly for advanced customization
@app.route("/advanced")
def advanced_handler(req):
    # Check if we have the right request type
    if not isinstance(req, HttpRequest):
        raise TypeError("Expected an HttpRequest object")

    # Create a custom HTTP response
    custom_response = HttpResponse(None)
    return custom_response.send({
        "message": "Custom HTTP response",
        "path": req.path
    }, 200)

Benefits of protocol-specific methods:

  • response.send(data, status_code) - HTTP focused, with explicit status codes
  • response.success(data) - Pub/sub focused, adds necessary pub/sub metadata
  • response.error(exception) - Pub/sub focused, formats exceptions for pub/sub
  • Clear distinction between HTTP and pub/sub handling
  • Direct access to protocol-specific classes for advanced customization

How It Works

Action Routing

Daebus uses a simple action routing system:

  1. Each service automatically listens on its main channel (named after the service)
  2. Messages sent to this channel can include an action field to route to specific handlers
  3. Handlers are registered using @app.action("action_name")

For example:

  • When a message with action: "get_status" is received, it's routed to the function decorated with @app.action("get_status")
  • When a message with action: "restart" is received, it's routed to the function decorated with @app.action("restart")

Channel Types

Daebus uses two types of Redis pub/sub channels:

  1. Service Channels: Named after the service (e.g., my_service) and used for direct communication with action routing
  2. Custom Channels: Any additional channels that services can publish to or subscribe to (e.g., notifications, system_status)

Message Flow

  1. Request/Response:

    • Client calls send_request("action_name", payload)
    • DaebusCaller sends message to service's main channel with the action field
    • Service routes to appropriate handler based on action
    • Handler processes request and sends response back to caller's response channel
  2. Broadcasts:

    • Any service can broadcast to any channel with broadcast.send(channel, payload)
    • Interested services subscribe to those channels with @app.listen(channel)

HTTP Endpoints

Daebus provides a lightweight HTTP server that:

  1. Uses Python's built-in http.server module (no external dependencies)
  2. Automatically converts Python dictionaries to JSON responses
  3. Parses JSON and form data from requests
  4. Supports URL parameters with the same syntax as Flask (/path/<param>)
  5. Runs in a background thread, allowing your service to handle both Redis and HTTP requests

Protocol-Specific Classes

Daebus provides specialized classes for each protocol:

  1. For Pub/Sub (Redis):

    • PubSubRequest: Handles pub/sub message payloads, reply channels, and request IDs
    • PubSubResponse: Handles pub/sub responses and error handling

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

daebus-0.0.15.tar.gz (30.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

daebus-0.0.15-py3-none-any.whl (25.6 kB view details)

Uploaded Python 3

File details

Details for the file daebus-0.0.15.tar.gz.

File metadata

  • Download URL: daebus-0.0.15.tar.gz
  • Upload date:
  • Size: 30.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for daebus-0.0.15.tar.gz
Algorithm Hash digest
SHA256 e9928967445f9828aa74fe16e30f86b8c31ae7d08db383604f36b9fb654e4b78
MD5 52f5cba8a49f9cb8c5d8c834aac65f8b
BLAKE2b-256 07d08fc9112d171afd21ab76c2c1b481b6a4ccf22643f6b2587ad4c0d6d3621d

See more details on using hashes here.

Provenance

The following attestation bundles were made for daebus-0.0.15.tar.gz:

Publisher: python-publish.yml on twestos/Daebus

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file daebus-0.0.15-py3-none-any.whl.

File metadata

  • Download URL: daebus-0.0.15-py3-none-any.whl
  • Upload date:
  • Size: 25.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for daebus-0.0.15-py3-none-any.whl
Algorithm Hash digest
SHA256 145e31c7c72e06fa0179b9e4538c3fb0fe245d28a4f2d4d576d04660afbb7b6c
MD5 df1e271ed0936b3b68da4eb62a6f0bc6
BLAKE2b-256 8df63f773df5dcd50947cd8994c4ec414b77bdb3c7311ccfccaaa9d8205698f7

See more details on using hashes here.

Provenance

The following attestation bundles were made for daebus-0.0.15-py3-none-any.whl:

Publisher: python-publish.yml on twestos/Daebus

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page