Redis-based bus to communicate between microservices that handle long-running tasks
Project description
Superbus 🚏
Redis-based bus for microservice communication handling long-running tasks
📦 Installation
To install via pip:
pip install superbus
To install dependencies and use the package with Poetry:
poetry install
To build the package:
poetry build
🚀 Getting Started
-
Run Redis (locally or in Docker):
docker run --name superbus-redis -p 6379:6379 -d redis:7
-
Start a worker:
python examples/dummy/worker.py -
Start a client and create a task:
python examples/dummy/client.py -
Clean up completed and orphaned tasks (in a Python console):
from superbus.client import Client c = Client(redis_host="localhost") c.clearCompleted()
For more advanced usage examples, see the examples directory.
🧠 Concept
Superbus is a lightweight library (bus) designed for asynchronous communication between microservices using Redis.
It enables you to:
- distribute tasks between workers,
- track task execution state,
- manage data processing pipelines,
- use webhooks for result notifications.
🧩 Core Classes
Client
Used by client microservices to enqueue tasks.
Methods:
pushTask(task_data, workflow, wait_result=False, webhook=None)— creates and enqueues a task.getTask(task_id)— retrieves task state.getQueue(workflow=None)— returns queue lengths for workflow operators.listTasks()— returns a list of all active task IDs.clearCompleted()— removes completed and orphaned tasks from Redis.
Worker
Runs on operators (pipeline workers).
Methods:
run(operators: dict)— registers operator functions and continuously processes tasks from Redis.
Example:
from superbus.worker import Worker
def add(data):
data["x"] += 1
return data
worker = Worker(redis_host="localhost")
worker.run({"adder": add})
StatusUpdater
Service component for managing task metadata.
- supports statuses:
CREATED,IN PROGRESS,SUCCESS,ERROR,TIMEOUT - updates and serializes tasks in Redis
TaskModel
Pydantic model representing a task:
class TaskModel(BaseModel):
id: str
workflow: List[str]
timestamp: Optional[str]
status: Optional[str]
stage: Optional[str]
error: Optional[str]
webhook: Optional[str]
🧰 Utilities
keydb_expiremember— safely works with both KeyDB and Redis.- In KeyDB, uses
EXPIREMEMBER. - In Redis, TTL for subkeys is ignored (skipped).
- In KeyDB, uses
get_logger()— structured JSON logger.
🧪 Example end-to-end execution
from superbus.client import Client
c = Client(redis_host="localhost")
task = c.pushTask({"text": "Hi"}, ["adder"], wait_result=True)
print(task)
📜 License
MIT © starrabb1t
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file superbus-1.0.1.tar.gz.
File metadata
- Download URL: superbus-1.0.1.tar.gz
- Upload date:
- Size: 6.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.2.1 CPython/3.11.13 Linux/6.11.0-1018-azure
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2e04d6960d7de8ef6e6e13d061a824eb8eeb02660b3445f8fedef7e89fde637c
|
|
| MD5 |
5315f71770ca9118bbe1b541a0300d7f
|
|
| BLAKE2b-256 |
283c89c802ebbc784fa18346680dd5d92b10d26e50f21c2b4bf20d425a865670
|
File details
Details for the file superbus-1.0.1-py3-none-any.whl.
File metadata
- Download URL: superbus-1.0.1-py3-none-any.whl
- Upload date:
- Size: 9.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.2.1 CPython/3.11.13 Linux/6.11.0-1018-azure
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
31482367b1f7f86690e6faf34bef2f508b803671ac72af0e0fc787e01180269d
|
|
| MD5 |
69ebeb526a58f843f9322c60cf39285a
|
|
| BLAKE2b-256 |
0605126bde12ff1b071656071e95711c48a13d86a9c8991786970f82c16e002b
|