Skip to main content

kafka application endpoint

Project description

A lightweight application for consistent handling of kafka events

This application was developed to be a universal api endpoint for microservices that process kafka events.

Installation

pip install kafka-python-app

Usage

Configure and create application instance.

Configuration parameters:

  • app_name [OPTIONAL]: application name.
  • bootstrap_servers: list of kafka bootstrap servers addresses 'host:port'.
  • producer_config [OPTIONAL]: kafka producer configuration ( see kafka-python documentation).
# Default producer config
producer_config = {
    'key_serializer': lambda x: x.encode('utf-8') if x else x,
    "value_serializer": lambda x: json.dumps(x).encode('utf-8')
}
# Default consumer config
conf = {
    'group_id': str(uuid.uuid4()),
    'auto_offset_reset': 'earliest',
    'enable_auto_commit': False,
    'key_deserializer': lambda x: x.decode('utf-8') if x else x,
    'value_deserializer': lambda x: json.loads(x.decode('utf-8')),
    'session_timeout_ms': 25000
}
  • listen_topics: list of subscribed topics.
  • message_key_as_event [OPTIONAL]: set to True if using kafka message key as event name.

NOTE When setting message_key_as_event to True, make sure to specify valid key_deserializer in consumer_config.

  • message_value_cls [OPTIONAL]: dictionary {"topic_name": dataclass_type} that maps message dataclass type to specific topic. The application uses this mapping to create specified dataclass from kafka message.value.
  • middleware_message_cb [OPTIONAL]: if provided, this callback will be executed with raw kafka message argument before calling any handler.
  • emit_with_response_options [OPTIONAL]: configuration that supports use of the emit_with_response method
  • pipelines_map [OPTIONAL]: Dict[str, MessagePipeline] - provides mapping of a pipelint to event name or "topic.event" combination.
  • max_concurrent_tasks [OPTIONAL]: int - number of individual event handlers executed concurrently
  • max_concurrent_pipelines [OPTIONAL]: int - number of pipelines executed concurrently
  • logger [OPTIONAL]: any logger with standard log methods. If not provided, the standard python logger is used.
from kafka_python_app.app import AppConfig, KafkaApp


# Setup kafka message payload:
class MyMessage(pydantic.BaseModel):
    event: str
    prop1: str
    prop2: int


# This message type is specific for "test_topic3" only. 
class MyMessageSpecific(pydantic.BaseModel):
    event: str
    prop3: bool
    prop4: float


# Create application config
config = AppConfig(
    app_name='Test application',
    bootstrap_servers=['localhost:9092'],
    consumer_config={
        'group_id': 'test_app_group'
    },
    listen_topics=['test_topic1', 'test_topic2', 'test_topic3'],
    message_value_cls={
        'test_topic1': MyMessage,
        'test_topic2': MyMessage,
        'test_topic3': MyMessageSpecific
    }
)

# Create application
app = KafkaApp(config)

Create event handlers using @app.on decorator:

# This handler will handle 'some_event' no matter which topic it comes from
@app.on(event='some_event')
def handle_some_event(message: MyMessage, **kwargs):
    print('Handling event: {}..'.format(message.event))
    print('prop1: {}\n'.format(message.prop1))
    print('prop2: {}\n'.format(message.prop2))


# This handler will handle 'another_event' from 'test_topic2' only.
# Events with this name but coming from another topic will be ignored.
@app.on(event='another_event', topic='test_topic2')
def handle_some_event(message: MyMessage, **kwargs):
    print('Handling event: {}..'.format(message.event))
    print('prop1: {}\n'.format(message.prop1))
    print('prop2: {}\n'.format(message.prop2))


# This handler will handle 'another_event' from 'test_topic3' only.
@app.on(event='another_event', topic='test_topic3')
def handle_some_event(message: MyMessageSpecific, **kwargs):
    print('Handling event: {}..'.format(message.event))
    print('prop3: {}\n'.format(message.prop3))
    print('prop4: {}\n'.format(message.prop4))

NOTE: If topic argument is provided to @app.on decorator, the decorated function is mapped to particular topic.event key that means an event that comes from a topic other than the specified will not be processed. Otherwise, event will be processed no matter which topic it comes from.

Use message pipelines

The @app.on decorator is used to register a single handler for a message. However, now it is possible to pass a message through a sequence of handlers by providing a pipelines_map option into the application config. The pipelines_map is a dictionary of type Dict[str, MessagePipeline].

MessagePipeline class has following properties:

  • transactions: List[MessageTransaction]
  • logger (optional): any object that has standard logging methods (debug, info, error, etc.)

MessageTransaction has properties:

  • fnc: Callable - transaction function that takes message, somehow transforms it and returns it to next transaction in the pipeline. Every transaction function must have a signature: fnc(message, logger, **kwargs) and has to return message of the same type.

NOTE: Transaction function can be sync or async.

  • args: Dict - a dictionary of keyword arguments for transaction function.
  • pipe_result_options (optional): TransactionPipeWithReturnOptions - configuration to pipe message to another service (kafka app).

TransactionPipeResultOptions class properties:

  • pipe_event_name: str - event name that will be sent pipe_to_topic: str - destination topic with_response_options (optional): TransactionPipeWithReturnOptions - this config is used when pipeline needs to wait response from another service before proceeding to next transaction.

TransactionPipeWithReturnOptions class properties:

  • response_event_name: str - event name of the response
  • response_from_topic: str - topic name that the response should come from
  • cache_client: Any - client of any caching service that has methods get and set

get method signature: get(key: str) -> bytes

set method signature: set(name: str, value: bytes, ex: int (record expiration in sec))

  • return_event_timeout: int - timeout in seconds for returned event

Example:

Suppose we receive a string with each message and that string needs to be processed. Let's say we need replace all commas with a custom symbol and then add a number to the end of the string that correspond to the number of some substring occurrences.

from kafka_python_app.app import AppConfig, KafkaApp, MessagePipeline, MessageTransaction


# Define transaction functions
def txn_replace(message: str, symbol: str, logger=None):
  return message.replace(',', symbol)

def txn_add_count(message: str, substr: str, logger=None):
  return ' '.join([message, message.count(substr)])

# Define a pipeline
pipeline = MessagePipeline(
  transactions=[
    MessageTransaction(fnc=txn_replace, args={'symbol': '|'}),
    MessageTransaction(fnc=txn_add_count, args={'substr': 'foo'})
  ]
)

# Define a pipelines map
pipelines_map = {'some_event': pipeline}

# Create application config
config = AppConfig(
    app_name='Test application',
    bootstrap_servers=['localhost:9092'],
    consumer_config={
        'group_id': 'test_app_group'
    },
    listen_topics=['test_topic1'],
    pipelines_map=pipelines_map
)

# Create application
app = KafkaApp(config)

NOTE: Define pipelines_map keys in a format 'topic.event' in order to restrict pipeline execution to events that come from a certain topic.

Use app.emit() method to send messages to kafka:

from kafka_python_app import ProducerRecord

msg = ProducerRecord(
  key='my_message_key',
  value='my_payload'
)

app.emit(topic='some_topic', message=msg)

Use app.emit_with_response() method to send messages to kafka and wait for response event:

Specify emit_with_response_options in the application config.

EmitWithResponseOptions class properties:

  • topic_event_list: list of tuples (topic_name: str, event_name: str) for returned events. This defines what events and from what topics should be cached as "response" events.
  • cache_client: Any - client of any caching service that has methods get and set

get method signature: get(key: str) -> bytes

set method signature: set(name: str, value: bytes, ex: int (record expiration in sec))

  • return_event_timeout: int - timeout in seconds for returned event
from kafka_python_app import KafkaApp, AppConfig, \
  EmitWithResponseOptions, ProducerRecord, 

# Create application config
config = AppConfig(
    app_name='Test application',
    bootstrap_servers=['localhost:9092'],
    consumer_config={
        'group_id': 'test_app_group'
    },
    listen_topics=['test_topic1'],
    emit_with_response_options=EmitWithReturnOptions(
        event_topic_list=[
            ("test_topic_2", "response_event_name")
        ],
        cache_client=redis_client,
        return_event_timeout=30
    ),
)

# Create application
app = KafkaApp(config)
msg = ProducerRecord(
  value={"event": "test_event_name": "payload": "Hello?"}
)

response = app.emit_with_response(topic='some_topic', message=msg)

Start application:

if __name__ == "__main__":
    asyncio.run(app.run())

Use standalone kafka connector

The KafkaConnector class has two factory methods:

  • get_producer method returns kafka-python producer.
  • get_listener method returns instance of the KafkaListener class that wraps up kafka-python consumer and ** listen()** method that starts consumption loop.

Use ListenerConfig class to create listener configuration:

from kafka_python_app.connector import ListenerConfig

kafka_listener_config = ListenerConfig(
    bootstrap_servers=['ip1:port1', 'ip2:port2', ...],
    process_message_cb=my_process_message_func,
    consumer_config={'group_id': 'test_group'},
    topics=['topic1', 'topic2', ...],
    logger=my_logger
)
  • bootstrap_servers: list of kafka bootstrap servers addresses 'host:port'.
  • process_message_cb: a function that will be called on each message.
from kafka.consumer.fetcher import ConsumerRecord

def my_process_message_func(message: ConsumerRecord):
    print('Received kafka message: key: {}, value: {}'.format(
      message.key,
      message.value
    ))

NOTE: Use async function for process_message_cb in order to process messages concurrently.

  • consumer_config [OPTIONAL]: kafka consumer configuration ( see kafka-python documentation).
  • topics: list of topics to listen from.
  • logger [OPTIONAL]: any logger with standard log methods. If not provided, the standard python logger is used.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kafka_python_app-0.2.2.tar.gz (20.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

kafka_python_app-0.2.2-py3-none-any.whl (11.8 kB view details)

Uploaded Python 3

File details

Details for the file kafka_python_app-0.2.2.tar.gz.

File metadata

  • Download URL: kafka_python_app-0.2.2.tar.gz
  • Upload date:
  • Size: 20.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.0 CPython/3.8.10

File hashes

Hashes for kafka_python_app-0.2.2.tar.gz
Algorithm Hash digest
SHA256 badf15d95c9c3b80c1efc92945e64758c68053e54ecda178c53dbb57f992930e
MD5 5133b58448ee7122674caf38034bdc80
BLAKE2b-256 e67a6f06918c4cb2b3ec82bc62e2a65b6995df8eb50585ff3603a9ff853d6809

See more details on using hashes here.

File details

Details for the file kafka_python_app-0.2.2-py3-none-any.whl.

File metadata

File hashes

Hashes for kafka_python_app-0.2.2-py3-none-any.whl
Algorithm Hash digest
SHA256 9f58fc2443217afb9cca3254dd9a366f7d79c6ad67f77ee9e6a0fdb3f8a2ea46
MD5 da888dab6d8960b44f9cd820ef0c894a
BLAKE2b-256 1a553ba15ddadea2c9f1ac7c44d0f77e161ca269928935ebfbe22946bf656356

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page