Skip to main content

Python SDK for Pulse Broker

Project description

Pulse Python SDK

Official Python client for Pulse Broker.

Installation

pip install pulse-broker

Configuration

The SDK looks for a pulse.yaml (or pulse.yml) file in your project root. If not found, it defaults to localhost:5555 (HTTP) and localhost:5556 (gRPC).

Example pulse.yaml

# Connection Settings
broker:
  host: "localhost"
  http_port: 5555
  grpc_port: 5556
  timeout_ms: 5000

# Client Defaults
client:
  id: "my-python-app"
  auto_commit: true       # Automatically commit offsets after successful processing
  max_retries: 3

# Topic Configuration
topics:
  - name: "events"
    create_if_missing: true
    config:
      fifo: false
      retention_bytes: 1073741824  # 1GB
    consume:
      auto_commit: true

  - name: "transactions"
    create_if_missing: true
    config:
      fifo: true
    consume:
      auto_commit: false  # Manual commit required

  - name: "logs"
    create_if_missing: true
    config:
      fifo: false
    consume:
      auto_commit: true

Usage

Producer

You can send dictionaries (automatically serialized to JSON) or raw bytes.

from pulse import Producer

# Initialize (uses pulse.yaml or defaults)
# You can override settings: Producer(host="10.0.0.1", port=9090)
producer = Producer()

# Send JSON
producer.send("events", {"type": "user_created", "id": 123})

# Send Bytes
producer.send("logs", b"raw log line")

producer.close()

Consumer

Use the @consumer decorator to register message handlers.

from pulse import consumer, commit, run

# Simple Consumer (uses auto_commit from config)
@consumer("events")
def handle_event(msg):
    print(f"Received event: {msg.payload}")
    # msg.payload is a dict if JSON, else bytes

# Manual Commit Consumer
# Override config params directly in the decorator if needed
@consumer("transactions", auto_commit=False)
def handle_transaction(msg):
    try:
        process_payment(msg.payload)
        commit()  # Manually commit offset
        print(f"Processed transaction {msg.offset}")
    except Exception as e:
        print(f"Failed to process: {e}")
        # Do not commit, message will be redelivered on restart/rebalance

if __name__ == "__main__":
    print("Starting consumers...")
    run()  # Blocks and runs all registered consumers

Development

  1. Clone the repository.
  2. Navigate to sdk/python.
  3. Create a virtual environment:
    python3 -m venv .venv
    source .venv/bin/activate
    
  4. Install dependencies:
    pip install -r requirements.txt
    
  5. Run tests:
    pytest
    

Publishing to PyPI

  1. Build the package:
    python3 setup.py sdist bdist_wheel
    
  2. Upload to PyPI:
    twine upload dist/*
    

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pulse_broker-0.0.3.tar.gz (10.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pulse_broker-0.0.3-py3-none-any.whl (10.3 kB view details)

Uploaded Python 3

File details

Details for the file pulse_broker-0.0.3.tar.gz.

File metadata

  • Download URL: pulse_broker-0.0.3.tar.gz
  • Upload date:
  • Size: 10.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.2

File hashes

Hashes for pulse_broker-0.0.3.tar.gz
Algorithm Hash digest
SHA256 d7197a5990a7326c281ce6b15ca4f8d82ca372d843e257b80a4ea4fa8fa7b669
MD5 f0dcb32be4ed89b6a2b4e2c14e5d654d
BLAKE2b-256 bd5fb5069789926cbac192a1fdace64397f39398a7a269354d31be948bfd83f8

See more details on using hashes here.

File details

Details for the file pulse_broker-0.0.3-py3-none-any.whl.

File metadata

  • Download URL: pulse_broker-0.0.3-py3-none-any.whl
  • Upload date:
  • Size: 10.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.2

File hashes

Hashes for pulse_broker-0.0.3-py3-none-any.whl
Algorithm Hash digest
SHA256 f36d140cce36674ab4da9f47369e331ac42d2e07d428bccf1b4f5a0ef7dbc20c
MD5 6ce30a5ab498cd49d5a9cfba78c6bd5a
BLAKE2b-256 8e69bb852a671f8aba744d7d644591e1c9bc8315d38c4156a6b1f02e8d2b1b8e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page