Skip to main content

A python package implementation for the confluent kafka package. Managing producing and consuming.

Project description

Kafka Broker

A python package implementation for the confluent kafka package. Managing producing and consuming.

Has built-in implementation for FastAPI.

About this package

The package allows for easy setup and connecting to the Apache Kafka message broker.

Easily run a consumer in a second process so it does not act as a blocking operation.

To read the received data from the broker, poll the consumer storage. To wait for a specific item (based on correlation ID), await the consume function.

The BrokerManager

Has 3 simple functions:

produce - produce an event to the message broker.

init_consumer_app - initialize the consumer together with an FastAPI app.

init_consumer - initialize a regular consumer.

It has an attribute called the consumer_storage. This is where everything the consumer finds will be stored. Though in reality, you have to call receive() to actually get any data.

receive() is a blocking operation; poll() is not. And consume() finds a certain event object based on correlation_id, which is also a blocking operation.

These consumer_storage functions are all asyncronous so they do not block any process. For example for if you were using FastAPI.

There is also consume_all(), which is basicly a poll(), but also returns everything found within the storage.

The EventRouter

Our event router is based on RabbitMQ's routing keys and on FastAPI's router.

Exmaple implementation

app/__init__.py

from app.test import test_router


event_router = EventRouter("module")


event_router.include_binder(test_router)

Sidenote: We give the highest router the name of the module.

app/test/__init__.py

test_router = EventRouter("test")


@test_router.bind_event("return")
def return_event(event_object: EventObject):
    event_object.data = some_funtion()
    event_object.event = "respond"

    broker_manager.produce("SomeModule", event_object)

With the event string "test.return", the event router will first find the corresponding bind called 'test'. If 'test' is found, based on whether it's another EventRouter or a function, it will either continue the search chain in the next router or execute the found function.

The EventObject

A simple class intended to be used for communicating with other microservices.

A correlation_id to track if an object is the same one as the one you sent off.

An event-string to route the EventObject to the right place.

A data field containing any JSON serializable information.

An audit_log to track where this EventObject has been.

Examples

Send event and wait for response:

from kafka_broker import broker_manager, EventObject


correlation_id = str(uuid.uuid4())
event = "test.return"

event_object = EventObject(
    correlation_id=correlation_id, 
    event=event,
)

broker_manager.produce("SomeModule", event_object)

data = await broker_manager.consumer_storage.consume(correlation_id)
return data

Run a process based on all events:

from kafka_broker import broker_manager


async def main():
    broker_manager.init_consumer()

    while True:
        await asyncio.sleep(1)

        data = await broker_manager.consumer_storage.consume_all()

        if data:
            for _, event_object in data.items():
                event_router.execute_event(event_object)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kafka_broker-0.4.10.tar.gz (13.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

kafka_broker-0.4.10-py3-none-any.whl (16.3 kB view details)

Uploaded Python 3

File details

Details for the file kafka_broker-0.4.10.tar.gz.

File metadata

  • Download URL: kafka_broker-0.4.10.tar.gz
  • Upload date:
  • Size: 13.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.11.9

File hashes

Hashes for kafka_broker-0.4.10.tar.gz
Algorithm Hash digest
SHA256 5e0747445e535865b8496b529c2e354a0fc1fb64a5717a65ec41359248a8fd7a
MD5 c9d6093008a8a1cdf8e5cc0d9043ab5e
BLAKE2b-256 e04c5194251bd5dbc922ff9143f793dd3a964a3dce0f1fd8fb643c40724806c9

See more details on using hashes here.

File details

Details for the file kafka_broker-0.4.10-py3-none-any.whl.

File metadata

  • Download URL: kafka_broker-0.4.10-py3-none-any.whl
  • Upload date:
  • Size: 16.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.11.9

File hashes

Hashes for kafka_broker-0.4.10-py3-none-any.whl
Algorithm Hash digest
SHA256 b2645d5d38c3ac3d2af01e1826f837bb0ab8036da59260372a74b0ce93422baf
MD5 90402923c75bd5990f257fccd30d808f
BLAKE2b-256 b70ba5594e29b642a898b9ac21540bcbf8dae5b4184de6556330bea0d0e5cc16

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page