Skip to main content

Redis-based bus to communicate between microservices that handle long-running tasks

Project description

Superbus 🚏

Redis-based bus for microservice communication handling long-running tasks


📦 Installation

To install via pip:

pip install superbus

To install dependencies and use the package with Poetry:

poetry install

To build the package:

poetry build

🚀 Getting Started

  1. Run Redis (locally or in Docker):

    docker run --name superbus-redis -p 6379:6379 -d redis:7
    
  2. Start a worker:

    python examples/dummy/worker.py
    
  3. Start a client and create a task:

    python examples/dummy/client.py
    
  4. Clean up completed and orphaned tasks (in a Python console):

    from superbus.client import Client
    c = Client(redis_host="localhost")
    c.clearCompleted()
    

For more advanced usage examples, see the examples directory.


🧠 Concept

Superbus is a lightweight library (bus) designed for asynchronous communication between microservices using Redis.
It enables you to:

  • distribute tasks between workers,
  • track task execution state,
  • manage data processing pipelines,
  • use webhooks for result notifications.

🧩 Core Classes

Client

Used by client microservices to enqueue tasks.
Methods:

  • pushTask(task_data, workflow, wait_result=False, webhook=None) — creates and enqueues a task.
  • getTask(task_id) — retrieves task state.
  • getQueue(workflow=None) — returns queue lengths for workflow operators.
  • listTasks() — returns a list of all active task IDs.
  • clearCompleted() — removes completed and orphaned tasks from Redis.

Worker

Runs on operators (pipeline workers).
Methods:

  • run(operators: dict) — registers operator functions and continuously processes tasks from Redis.

Example:

from superbus.worker import Worker

def add(data):
    data["x"] += 1
    return data

worker = Worker(redis_host="localhost")
worker.run({"adder": add})

StatusUpdater

Service component for managing task metadata.

  • supports statuses: CREATED, IN PROGRESS, SUCCESS, ERROR, TIMEOUT
  • updates and serializes tasks in Redis

TaskModel

Pydantic model representing a task:

class TaskModel(BaseModel):
    id: str
    workflow: List[str]
    timestamp: Optional[str]
    status: Optional[str]
    stage: Optional[str]
    error: Optional[str]
    webhook: Optional[str]

🧰 Utilities

  • keydb_expiremember — safely works with both KeyDB and Redis.
    • In KeyDB, uses EXPIREMEMBER.
    • In Redis, TTL for subkeys is ignored (skipped).
  • get_logger() — structured JSON logger.

🧪 Example end-to-end execution

from superbus.client import Client

c = Client(redis_host="localhost")
task = c.pushTask({"text": "Hi"}, ["adder"], wait_result=True)
print(task)

📜 License

MIT © starrabb1t

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

superbus-1.0.1.tar.gz (6.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

superbus-1.0.1-py3-none-any.whl (9.0 kB view details)

Uploaded Python 3

File details

Details for the file superbus-1.0.1.tar.gz.

File metadata

  • Download URL: superbus-1.0.1.tar.gz
  • Upload date:
  • Size: 6.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.2.1 CPython/3.11.13 Linux/6.11.0-1018-azure

File hashes

Hashes for superbus-1.0.1.tar.gz
Algorithm Hash digest
SHA256 2e04d6960d7de8ef6e6e13d061a824eb8eeb02660b3445f8fedef7e89fde637c
MD5 5315f71770ca9118bbe1b541a0300d7f
BLAKE2b-256 283c89c802ebbc784fa18346680dd5d92b10d26e50f21c2b4bf20d425a865670

See more details on using hashes here.

File details

Details for the file superbus-1.0.1-py3-none-any.whl.

File metadata

  • Download URL: superbus-1.0.1-py3-none-any.whl
  • Upload date:
  • Size: 9.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.2.1 CPython/3.11.13 Linux/6.11.0-1018-azure

File hashes

Hashes for superbus-1.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 31482367b1f7f86690e6faf34bef2f508b803671ac72af0e0fc787e01180269d
MD5 69ebeb526a58f843f9322c60cf39285a
BLAKE2b-256 0605126bde12ff1b071656071e95711c48a13d86a9c8991786970f82c16e002b

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page