Skip to main content

A serverless Python publish-subscribe interprocess messaging library

Project description

formix-pubsub

PyPI version Python License: MIT

A serverless, zero-dependency publish-subscribe library for Python interprocess communication.

Why Serverless?

Unlike traditional pub/sub systems (Redis, RabbitMQ, ZeroMQ), formix-pubsub requires no broker, no server process, and no external service. Messages are routed through kernel FIFO named pipes and stored on the shared-memory filesystem (/dev/shm on Linux), making communication fast and entirely local.

  • No infrastructure — install and use, nothing to start or configure
  • Zero dependencies — pure Python, standard library only
  • Automatic cleanup — channels are tied to process IDs; stale channels from crashed processes are detected and cleaned up automatically
  • Messages persist until consumed — published messages are stored as files and delivered to all matching subscribers

Installation

pip install formix-pubsub

Requires Python 3.11 or later.

Quick Start: Publish & Subscribe Across Processes

The core use case is communication between separate processes. Create two files:

subscriber.py

from pubsub import Channel, subscribe

channel = Channel(topic="greetings")

with channel:
    def on_message(msg):
        print(f"Received: {msg.content.decode()}")

    # Blocks and listens until terminated with Ctrl+C (SIGINT) or SIGTERM
    subscribe(channel, on_message)

publisher.py

from pubsub import publish

count = publish("greetings", b"Hello from another process!")
print(f"Published to {count} subscriber(s)")

Run the subscriber first, then the publisher in a second terminal:

# Terminal 1
python subscriber.py

# Terminal 2
python publisher.py

The subscriber prints Received: Hello from another process! and keeps listening. Press Ctrl+C to stop it gracefully.

Non-Blocking Fetch

Use fetch() to poll for messages without blocking. It returns None immediately when the queue is empty.

from pubsub import Channel, publish, fetch

channel = Channel(topic="tasks")

with channel:
    # Publish a few messages
    publish("tasks", b"task-1")
    publish("tasks", b"task-2")
    publish("tasks", b"task-3")

    # Poll for messages
    msg = fetch(channel)
    while msg is not None:
        print(f"Processing: {msg.content.decode()}")
        msg = fetch(channel)

    print("Queue empty, moving on.")

This is useful when your application needs to check for messages as part of a larger loop without surrendering control flow to subscribe().

RPC Pattern (Request-Response)

For request-response workflows, use message headers to route replies back to the caller. This example implements a multiply service.

rpc_server.py

from pubsub import Channel, subscribe, publish

channel = Channel(topic="rpc.math.multiply")

with channel:
    def handle_request(request):
        response_topic = request.headers.get("response-topic")
        correlation_id = request.headers.get("correlation-id")

        if not response_topic or not correlation_id:
            print(f"Malformed request (missing headers), skipping.")
            return

        # Parse operands from the payload
        a, b = request.content.decode().split(",")
        result = float(a) * float(b)

        # Publish the response back to the caller's private channel
        publish(
            response_topic,
            str(result).encode(),
            headers={"correlation-id": correlation_id},
        )
        print(f"[{correlation_id}] {a} * {b} = {result}")

    print("RPC server listening on 'rpc.math.multiply'...")
    subscribe(channel, handle_request)

rpc_client.py

import os
import uuid

from pubsub import Channel, publish, subscribe

def rpc_multiply(a: float, b: float, timeout: float = 2.0) -> float:
    """Send a multiply request and wait for the response."""

    correlation_id = str(uuid.uuid4())
    result = {correlation_id: None}

    # Create a private response channel unique to this process
    response_topic = f"rpc.response.{os.getpid()}"
    response_channel = Channel(topic=response_topic)

    with response_channel:
        # Send the request with routing headers
        publish(
            "rpc.math.multiply",
            f"{a},{b}".encode(),
            headers={
                "response-topic": response_topic,
                "correlation-id": correlation_id,
            },
        )

        # Wait for the response using subscribe with a timeout
        def on_response(msg):
            response_correlation_id = msg.headers.get("correlation-id")
            result[response_correlation_id] = float(msg.content.decode())

        subscribe(response_channel, on_response, timeout_seconds=timeout)

    if result[correlation_id] is None:
        raise TimeoutError(
            f"No response received for correlation-id {correlation_id} "
            f"within {timeout}s"
        )

    return result[correlation_id]


if __name__ == "__main__":
    result = rpc_multiply(6, 7)
    print(f"6 * 7 = {result}")

Run the server first, then the client:

# Terminal 1
python rpc_server.py

# Terminal 2
python rpc_client.py
# Output:
#   6 * 7 = 42.0

How it works:

  1. The client generates a unique correlation-id and creates a private response channel using its PID.
  2. It publishes a request to rpc.math.multiply with response-topic and correlation-id headers.
  3. The server processes the request, computes the result, and publishes it back to the client's response-topic.
  4. The client calls subscribe() with a timeout. The callback captures the response when the correlation-id matches.
  5. If no response arrives before the timeout, a TimeoutError is raised.

Wildcard Topics

Wildcards let a single channel receive messages from multiple topics:

Wildcard Matches Example
= Exactly one word logs.= matches logs.info, logs.error but not logs.app.info
+ One or more words logs.+ matches logs.info, logs.error, and logs.app.info
from pubsub import Channel, subscribe, publish

# Subscribe to all log topics
channel = Channel(topic="logs.+")

with channel:
    def on_log(msg):
        print(f"[{msg.topic}] {msg.content.decode()}")

    # In another process:
    # publish("logs.info", b"Started")
    # publish("logs.app.debug", b"Cache hit")

    subscribe(channel, on_log)

Note: Wildcards are only valid when creating channels for subscribing. You must publish to concrete topics (e.g. logs.info, not logs.+).

Configuration

Environment Variable Description Default
PUBSUB_HOME Directory for channel and message storage /dev/shm/pubsub on Linux, system temp dir elsewhere

API Summary

publish(topic, data, headers=None) -> int

Publish a message to all channels matching topic. Returns the number of channels that received the message.

fetch(channel) -> Message | None

Fetch the next message from a channel without blocking. Returns None if the queue is empty.

subscribe(channel, callback, timeout_seconds=0) -> int

Block and deliver messages to callback as they arrive. Set timeout_seconds to limit listening duration (0 = indefinite). Returns the number of messages processed, or -1 if interrupted by a signal.

Channel(topic)

Represents a subscription endpoint. Use as a context manager to ensure cleanup. Topics may include = and + wildcards for pattern matching.

Message

A received message with the following attributes:

  • id — unique message identifier
  • timestamp — microsecond-precision timestamp
  • topic — the concrete topic the message was published to
  • content — payload as bytes
  • headers — optional dict[str, str | int | float | bool | None]

Best Practices

  1. Use separate processes — designed for true parallelism without GIL limitations
  2. Always use context managerswith Channel(...) as ch: ensures FIFO cleanup
  3. Publish to concrete topics — wildcards are for subscribing only
  4. Structure topic hierarchies — e.g. app.service.event for flexible wildcard matching
  5. Stop subscribers with signalsSIGTERM or SIGINT triggers graceful shutdown

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

formix_pubsub-1.0.0.tar.gz (21.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

formix_pubsub-1.0.0-py3-none-any.whl (14.7 kB view details)

Uploaded Python 3

File details

Details for the file formix_pubsub-1.0.0.tar.gz.

File metadata

  • Download URL: formix_pubsub-1.0.0.tar.gz
  • Upload date:
  • Size: 21.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.4

File hashes

Hashes for formix_pubsub-1.0.0.tar.gz
Algorithm Hash digest
SHA256 2ae6d1a2bc5ac44a8ed24f246bb5f38ae29f8f7866a7382c6d030c29737b43a0
MD5 329def3226b2b22e8f422fb60fa83a76
BLAKE2b-256 2f182e9ace371e0bd28720da74b76f89343cfc2afbc2e6ae94a6803da6c61972

See more details on using hashes here.

File details

Details for the file formix_pubsub-1.0.0-py3-none-any.whl.

File metadata

  • Download URL: formix_pubsub-1.0.0-py3-none-any.whl
  • Upload date:
  • Size: 14.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.4

File hashes

Hashes for formix_pubsub-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 5d184b63e84333d891b2d6128362415303f97640b8157fccfaf58a59e2ae03a4
MD5 cca3cd4d779a8af04fba8530fbb30f56
BLAKE2b-256 e665e24cf7e24733ae8f1034b92a6bcfedd43ba231543caf91804254bd6bafd8

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page