A Pyramid plugin for Apache Kafka integration (producer/consumer support)
Project description
pyramid-kafka
A Pyramid plugin for Apache Kafka integration using confluent-kafka. Provides producer/consumer support with lazy initialization, Pyramid event integration, and a CLI consumer runner.
Installation
uv add pyramid-kafka
Or with pip:
pip install pyramid-kafka
Quick Start
1. Include the plugin
from pyramid.config import Configurator
def main(global_config, **settings):
config = Configurator(settings=settings)
config.include("pyramid_kafka")
return config.make_wsgi_app()
2. Configure your .ini file
# Required
kafka.bootstrap_servers = localhost:9092
# Consumer settings (required for consumer only)
kafka.group_id = my-service
kafka.auto_offset_reset = earliest
# Optional
kafka.client_id = my-service
kafka.topics = my.topic.v1 another.topic.v1
kafka.handler = myapp.stream:process_message
# Pass-through to confluent-kafka (any librdkafka setting)
kafka.extra.security.protocol = SASL_SSL
kafka.extra.sasl.mechanisms = PLAIN
kafka.extra.sasl.username = my-key
kafka.extra.sasl.password = my-secret
3. Produce events
Option A: Subclass KafkaEvent
from pyramid_kafka import KafkaEvent
class OrderCreated(KafkaEvent):
def __init__(self, request, order):
super().__init__(
request,
topic="orders.created.v1",
key=str(order.id),
order_id=str(order.id),
amount=float(order.amount),
)
# Fire the event from your view or service:
request.registry.notify(OrderCreated(request, order))
Option B: Register any Pyramid event
from dataclasses import dataclass
@dataclass
class PaymentReceived:
request: object
payment_id: str
amount: float
# In your app's startup/includeme:
config.register_kafka_event(
PaymentReceived,
topic="payments.received.v1",
key=lambda e: e.payment_id,
value=lambda e: {"payment_id": e.payment_id, "amount": e.amount},
)
# Then just fire the standard Pyramid event:
request.registry.notify(PaymentReceived(request=request, payment_id="p-1", amount=50.0))
If value is omitted, the library auto-extracts all public fields (uses dataclasses.fields() for dataclasses, vars() for others, excluding request and private attributes).
4. Consume messages
Run the built-in CLI consumer:
kafka-consumer development.ini
The consumer reads kafka.topics (space or comma-separated) and kafka.handler from settings. The handler is a dotted Python path to a callable with signature (request, message) -> None:
# myapp/stream.py
import json
def process_message(request, message):
value = json.loads(message.value().decode("utf-8"))
topic = message.topic()
# Route to your business logic...
CLI options override settings:
kafka-consumer development.ini --handler myapp.stream:process_message --topics "topic-a,topic-b" --timeout 2.0
Configuration Reference
| Setting | Required | Default | Description |
|---|---|---|---|
kafka.bootstrap_servers |
Yes | — | Comma-separated Kafka broker list |
kafka.group_id |
Consumer only | — | Consumer group ID |
kafka.auto_offset_reset |
No | earliest |
Where to start consuming (earliest / latest) |
kafka.client_id |
No | — | Client identifier |
kafka.topics |
Consumer only | — | Space or comma-separated topic list |
kafka.handler |
Consumer only | — | Dotted path to handler callable (module:function) |
kafka.extra.* |
No | — | Pass-through to confluent-kafka/librdkafka config |
API
KafkaManager
Attached to config.registry.kafka after config.include("pyramid_kafka").
manager.producer— Lazily-initializedconfluent_kafka.Producermanager.consumer— Lazily-initializedconfluent_kafka.Consumermanager.produce(topic, value, key=None)— JSON-serialize and produce a messagemanager.close()— Flush producer and close consumer
KafkaEvent
Base class for Kafka-bound Pyramid events. Subclass it and fire via request.registry.notify().
config.register_kafka_event(event_type, topic, key=None, value=None)
Pyramid config directive to wire any event class to Kafka. topic can be a string or callable (event) -> str. key and value are optional callables (event) -> str|dict.
Development
Prerequisites
- uv for dependency management
Setup
git clone https://github.com/cartaorobbin/pyramid-kafka.git
cd pyramid-kafka
uv sync --dev
Running Tests
uv run pytest
Linting and Formatting
uv run ruff check .
uv run black .
Building Documentation
uv run mkdocs serve
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pyramid_kafka-0.1.0.tar.gz.
File metadata
- Download URL: pyramid_kafka-0.1.0.tar.gz
- Upload date:
- Size: 57.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
eeefc2ea34e462ce6ddb92e755445933459ef5832cbdbe138506e6eb4548df5f
|
|
| MD5 |
1157e40a1f71d7330048d5a3866f81c8
|
|
| BLAKE2b-256 |
df4409d0276156e7112b898f99d27d6af2e785382b00cf28d85fdd07089e4e69
|
Provenance
The following attestation bundles were made for pyramid_kafka-0.1.0.tar.gz:
Publisher:
ci.yml on cartaorobbin/pyramid-kafka
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
pyramid_kafka-0.1.0.tar.gz -
Subject digest:
eeefc2ea34e462ce6ddb92e755445933459ef5832cbdbe138506e6eb4548df5f - Sigstore transparency entry: 1092382942
- Sigstore integration time:
-
Permalink:
cartaorobbin/pyramid-kafka@951ec703b697e2e6af33192a3c3a51a3d1ddad5c -
Branch / Tag:
refs/tags/v0.0.1 - Owner: https://github.com/cartaorobbin
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
ci.yml@951ec703b697e2e6af33192a3c3a51a3d1ddad5c -
Trigger Event:
release
-
Statement type:
File details
Details for the file pyramid_kafka-0.1.0-py3-none-any.whl.
File metadata
- Download URL: pyramid_kafka-0.1.0-py3-none-any.whl
- Upload date:
- Size: 8.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9ddc8e40fbab826302303e2e04b51386a6517edde3e514c4bd40de83055779b9
|
|
| MD5 |
eed3b80b8008214fc863c5f6ac16ae1b
|
|
| BLAKE2b-256 |
b2efdda6cd424ea830dea574e6e6338d0603edd428fca973df4a30f4e3604e76
|
Provenance
The following attestation bundles were made for pyramid_kafka-0.1.0-py3-none-any.whl:
Publisher:
ci.yml on cartaorobbin/pyramid-kafka
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
pyramid_kafka-0.1.0-py3-none-any.whl -
Subject digest:
9ddc8e40fbab826302303e2e04b51386a6517edde3e514c4bd40de83055779b9 - Sigstore transparency entry: 1092382943
- Sigstore integration time:
-
Permalink:
cartaorobbin/pyramid-kafka@951ec703b697e2e6af33192a3c3a51a3d1ddad5c -
Branch / Tag:
refs/tags/v0.0.1 - Owner: https://github.com/cartaorobbin
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
ci.yml@951ec703b697e2e6af33192a3c3a51a3d1ddad5c -
Trigger Event:
release
-
Statement type: