Skip to main content

A Pyramid plugin for Apache Kafka integration (producer/consumer support)

Project description

pyramid-kafka

A Pyramid plugin for Apache Kafka integration using confluent-kafka. Provides producer/consumer support with lazy initialization, Pyramid event integration, and a CLI consumer runner.

Installation

uv add pyramid-kafka

Or with pip:

pip install pyramid-kafka

Quick Start

1. Include the plugin

from pyramid.config import Configurator

def main(global_config, **settings):
    config = Configurator(settings=settings)
    config.include("pyramid_kafka")
    return config.make_wsgi_app()

2. Configure your .ini file

# Required
kafka.bootstrap_servers = localhost:9092

# Consumer settings (required for consumer only)
kafka.group_id = my-service
kafka.auto_offset_reset = earliest

# Optional
kafka.client_id = my-service
kafka.topics = my.topic.v1 another.topic.v1
kafka.handler = myapp.stream:process_message

# Pass-through to confluent-kafka (any librdkafka setting)
kafka.extra.security.protocol = SASL_SSL
kafka.extra.sasl.mechanisms = PLAIN
kafka.extra.sasl.username = my-key
kafka.extra.sasl.password = my-secret

3. Produce events

Option A: Subclass KafkaEvent

from pyramid_kafka import KafkaEvent

class OrderCreated(KafkaEvent):
    def __init__(self, request, order):
        super().__init__(
            request,
            topic="orders.created.v1",
            key=str(order.id),
            order_id=str(order.id),
            amount=float(order.amount),
        )

# Fire the event from your view or service:
request.registry.notify(OrderCreated(request, order))

Option B: Register any Pyramid event

from dataclasses import dataclass

@dataclass
class PaymentReceived:
    request: object
    payment_id: str
    amount: float

# In your app's startup/includeme:
config.register_kafka_event(
    PaymentReceived,
    topic="payments.received.v1",
    key=lambda e: e.payment_id,
    value=lambda e: {"payment_id": e.payment_id, "amount": e.amount},
)

# Then just fire the standard Pyramid event:
request.registry.notify(PaymentReceived(request=request, payment_id="p-1", amount=50.0))

If value is omitted, the library auto-extracts all public fields (uses dataclasses.fields() for dataclasses, vars() for others, excluding request and private attributes).

4. Consume messages

Run the built-in CLI consumer:

kafka-consumer development.ini

The consumer reads kafka.topics (space or comma-separated) and kafka.handler from settings. The handler is a dotted Python path to a callable with signature (request, message) -> None:

# myapp/stream.py
import json

def process_message(request, message):
    value = json.loads(message.value().decode("utf-8"))
    topic = message.topic()
    # Route to your business logic...

CLI options override settings:

kafka-consumer development.ini --handler myapp.stream:process_message --topics "topic-a,topic-b" --timeout 2.0

Configuration Reference

Setting Required Default Description
kafka.bootstrap_servers Yes Comma-separated Kafka broker list
kafka.group_id Consumer only Consumer group ID
kafka.auto_offset_reset No earliest Where to start consuming (earliest / latest)
kafka.client_id No Client identifier
kafka.topics Consumer only Space or comma-separated topic list
kafka.handler Consumer only Dotted path to handler callable (module:function)
kafka.extra.* No Pass-through to confluent-kafka/librdkafka config

API

KafkaManager

Attached to config.registry.kafka after config.include("pyramid_kafka").

  • manager.producer — Lazily-initialized confluent_kafka.Producer
  • manager.consumer — Lazily-initialized confluent_kafka.Consumer
  • manager.produce(topic, value, key=None) — JSON-serialize and produce a message
  • manager.close() — Flush producer and close consumer

KafkaEvent

Base class for Kafka-bound Pyramid events. Subclass it and fire via request.registry.notify().

config.register_kafka_event(event_type, topic, key=None, value=None)

Pyramid config directive to wire any event class to Kafka. topic can be a string or callable (event) -> str. key and value are optional callables (event) -> str|dict.

Development

Prerequisites

  • uv for dependency management

Setup

git clone https://github.com/cartaorobbin/pyramid-kafka.git
cd pyramid-kafka
uv sync --dev

Running Tests

uv run pytest

Linting and Formatting

uv run ruff check .
uv run black .

Building Documentation

uv run mkdocs serve

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyramid_kafka-0.2.0.tar.gz (60.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pyramid_kafka-0.2.0-py3-none-any.whl (12.0 kB view details)

Uploaded Python 3

File details

Details for the file pyramid_kafka-0.2.0.tar.gz.

File metadata

  • Download URL: pyramid_kafka-0.2.0.tar.gz
  • Upload date:
  • Size: 60.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for pyramid_kafka-0.2.0.tar.gz
Algorithm Hash digest
SHA256 d5c19a122e367154b9377d3c2630e26aaf4fddb60c59f6175fb6f6a236006afa
MD5 621e866a43e2b81e2b2beb4869d85f5d
BLAKE2b-256 a5dd7802eec0df99b1e14a43c019a1573ed2dc80edc6640c3110668d4a6d7c38

See more details on using hashes here.

Provenance

The following attestation bundles were made for pyramid_kafka-0.2.0.tar.gz:

Publisher: ci.yml on cartaorobbin/pyramid-kafka

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file pyramid_kafka-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: pyramid_kafka-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 12.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for pyramid_kafka-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 12de61d7f6031af71ba6a2404ed7eec7c957a2819aff9c3c93af0b9febc24d7b
MD5 1309c5f6d5c72fca9324367480969f27
BLAKE2b-256 1e3fb5849099576704f325d6adc14989f6b90704d37030837e05fafc78edb318

See more details on using hashes here.

Provenance

The following attestation bundles were made for pyramid_kafka-0.2.0-py3-none-any.whl:

Publisher: ci.yml on cartaorobbin/pyramid-kafka

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page