Python SDK for Pulse Broker
Project description
Pulse Python SDK
Official Python client for Pulse Broker.
Installation
pip install pulse-broker
Configuration
The SDK looks for a pulse.yaml (or pulse.yml) file in your project root. If not found, it defaults to localhost:5555 (HTTP) and localhost:5556 (gRPC).
Example pulse.yaml
# Connection Settings
broker:
host: "localhost"
http_port: 5555
grpc_port: 5556
timeout_ms: 5000
# Client Defaults
client:
id: "my-python-app"
auto_commit: true # Automatically commit offsets after successful processing
max_retries: 3
# Topic Configuration
topics:
- name: "events"
create_if_missing: true
config:
fifo: false
retention_bytes: 1073741824 # 1GB
consume:
auto_commit: true
- name: "transactions"
create_if_missing: true
config:
fifo: true
consume:
auto_commit: false # Manual commit required
Usage
Producer
You can send dictionaries (automatically serialized to JSON) or raw bytes.
from pulse import Producer
# Initialize (uses pulse.yaml or defaults)
# You can override settings: Producer(host="10.0.0.1", port=9090)
producer = Producer()
# Send JSON
producer.send("events", {"type": "user_created", "id": 123})
# Send Bytes
producer.send("logs", b"raw log line")
producer.close()
Consumer
Use the @consumer decorator to register message handlers.
from pulse import consumer, commit, run
# Simple Consumer (uses auto_commit from config)
@consumer("events")
def handle_event(msg):
print(f"Received event: {msg.payload}")
# msg.payload is a dict if JSON, else bytes
# Manual Commit Consumer
# Override config params directly in the decorator if needed
@consumer("transactions", auto_commit=False)
def handle_transaction(msg):
try:
process_payment(msg.payload)
commit() # Manually commit offset
print(f"Processed transaction {msg.offset}")
except Exception as e:
print(f"Failed to process: {e}")
# Do not commit, message will be redelivered on restart/rebalance
if __name__ == "__main__":
print("Starting consumers...")
run() # Blocks and runs all registered consumers
Development
- Clone the repository.
- Navigate to
sdk/python. - Create a virtual environment:
python3 -m venv .venv source .venv/bin/activate
- Install dependencies:
pip install -r requirements.txt
- Run tests:
pytest
Publishing to PyPI
- Build the package:
python3 setup.py sdist bdist_wheel
- Upload to PyPI:
twine upload dist/*
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pulse_broker-0.0.1.tar.gz.
File metadata
- Download URL: pulse_broker-0.0.1.tar.gz
- Upload date:
- Size: 9.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
3f2600bf00f9827dcae112b562ce65a8cb43e4ec552cd87405bf1a70fe92b621
|
|
| MD5 |
7372c40d983ddffb31d5457f8b6e1394
|
|
| BLAKE2b-256 |
229dd9f3e7c7ff23c6cb7766286ed6c7d466eab40f248b6aac1f274b1032b162
|
File details
Details for the file pulse_broker-0.0.1-py3-none-any.whl.
File metadata
- Download URL: pulse_broker-0.0.1-py3-none-any.whl
- Upload date:
- Size: 9.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ee15f3977d8392831994e61699464c76291293ea1ddc44671dba71e46c734e1e
|
|
| MD5 |
a1fe772fb1ff062aae2ca0f9e3a559d8
|
|
| BLAKE2b-256 |
c8fcd41d85c9bbc07c50928d2389bd1df274c21a360f81d594cd0aaa9797d180
|