kafka application endpoint
Project description
A lightweight application for consistent handling of kafka events
This application was developed to be a universal api endpoint for microservices that process kafka events.
Installation
pip install kafka-python-app
Usage
Configure and create application instance.
Configuration parameters:
- app_name [OPTIONAL]: application name.
- bootstrap_servers: list of kafka bootstrap servers addresses 'host:port'.
- producer_config [OPTIONAL]: kafka producer configuration ( see kafka-python documentation).
# Default producer config
producer_config = {
'key_serializer': lambda x: x.encode('utf-8') if x else x,
"value_serializer": lambda x: json.dumps(x).encode('utf-8')
}
- consumer_config [OPTIONAL]: kafka consumer configuration ( see kafka-python documentation).
# Default consumer config
conf = {
'group_id': str(uuid.uuid4()),
'auto_offset_reset': 'earliest',
'enable_auto_commit': False,
'key_deserializer': lambda x: x.decode('utf-8') if x else x,
'value_deserializer': lambda x: json.loads(x.decode('utf-8')),
'session_timeout_ms': 25000
}
- listen_topics: list of subscribed topics.
- message_key_as_event [OPTIONAL]: set to True if using kafka message key as event name.
NOTE When setting message_key_as_event to True, make sure to specify valid key_deserializer in consumer_config.
- message_value_cls: dictionary {"topic_name": dataclass_type} that maps message dataclass type to specific topic. The application uses this mapping to create specified dataclass from kafka message.value.
- middleware_message_cb [OPTIONAL]: if provided, this callback will be executed with raw kafka message argument before calling any handler.
- logger [OPTIONAL]: any logger with standard log methods. If not provided, the standard python logger is used.
from kafka_python_app.app import AppConfig, KafkaApp
# Setup kafka message payload:
class MyMessage(pydantic.BaseModel):
event: str
prop1: str
prop2: int
# This message type is specific for "test_topic3" only.
class MyMessageSpecific(pydantic.BaseModel):
event: str
prop3: bool
prop4: float
# Create application config
config = AppConfig(
app_name='Test application',
bootstrap_servers=['localhost:9092'],
consumer_config={
'group_id': 'test_app_group'
},
listen_topics=['test_topic1', 'test_topic2', 'test_topic3'],
message_value_cls={
'test_topic1': MyMessage,
'test_topic2': MyMessage,
'test_topic3': MyMessageSpecific
}
)
# Create application
app = KafkaApp(config)
Create event handlers using @app.on decorator:
# This handler will handle 'some_event' no matter which topic it comes from
@app.on(event='some_event')
def handle_some_event(message: MyMessage, **kwargs):
print('Handling event: {}..'.format(message.event))
print('prop1: {}\n'.format(message.prop1))
print('prop2: {}\n'.format(message.prop2))
# This handler will handle 'another_event' from 'test_topic2' only.
# Events with this name but coming from another topic will be ignored.
@app.on(event='another_event', topic='test_topic2')
def handle_some_event(message: MyMessage, **kwargs):
print('Handling event: {}..'.format(message.event))
print('prop1: {}\n'.format(message.prop1))
print('prop2: {}\n'.format(message.prop2))
# This handler will handle 'another_event' from 'test_topic3' only.
@app.on(event='another_event', topic='test_topic3')
def handle_some_event(message: MyMessageSpecific, **kwargs):
print('Handling event: {}..'.format(message.event))
print('prop3: {}\n'.format(message.prop3))
print('prop4: {}\n'.format(message.prop4))
NOTE: If topic argument is provided to @app.on decorator, the decorated function is mapped to particular topic.event key that means an event that comes from a topic other than the specified will not be processed. Otherwise, event will be processed no matter which topic it comes from.
Use message pipelines
The @app.on decorator is used to register a single handler for a message. However, now it is possible to pass a message through a sequence of handlers by providing a pipelines_map option into the application config. The pipelines_map is a dictionary of type Dict[str, MessagePipeline].
The MessagePipeline class has following properties:
- transactions: List[MessageTransaction]
- logger (optional): any object that has standard logging methods (debug, info, error, etc.)
The MessageTransaction has two properties 'fnc' and 'args'. The 'fnc' is a function that is called in order to perform a transaction. The transaction function 'fnc' must have a mandatory 'message' argument that is same type as handled kafka message, optional argument 'logger' and **kwargs. Function must return same message type. The 'args' property is a dictionary of keyword arguments for transaction function.
NOTE: Transaction function can be sync or async.
Example:
Suppose we receive a string with each message and that string needs to be processed. Let's say we need replace all commas with a custom symbol and then add a number to the end of the string that correspond to the number of some substring occurrences.
from kafka_python_app.app import AppConfig, KafkaApp, MessagePipeline, MessageTransaction
# Define transaction functions
def txn_replace(message: str, symbol: str, logger=None):
return message.replace(',', symbol)
def txn_add_count(message: str, substr: str, logger=None):
return ' '.join([message, message.count(substr)])
# Define a pipeline
pipeline = MessagePipeline(
transactions=[
MessageTransaction(fnc=txn_replace, args={'symbol': '|'}),
MessageTransaction(fnc=txn_add_count, args={'substr': 'foo'})
]
)
# Define a pipelines map
pipelines_map = {'some_event': pipeline}
# Create application config
config = AppConfig(
app_name='Test application',
bootstrap_servers=['localhost:9092'],
consumer_config={
'group_id': 'test_app_group'
},
listen_topics=['test_topic1'],
pipelines_map=pipelines_map
)
# Create application
app = KafkaApp(config)
NOTE: Define pipelines_map keys in a format 'topic.event' in order to restrict pipeline execution to events that come from a certain topic.
Use app.emit() method to send messages to kafka:
from kafka_python_app import ProducerRecord
msg = ProducerRecord(
key='my_message_key',
value='my_payload'
)
app.emit(topic='some_topic', message=msg)
Start application:
if __name__ == "__main__":
asyncio.run(app.run())
Use standalone kafka connector
The KafkaConnector class has two factory methods:
- get_producer method returns kafka-python producer.
- get_listener method returns instance of the KafkaListener class that wraps up kafka-python consumer and ** listen()** method that starts consumption loop.
Use ListenerConfig class to create listener configuration:
from kafka_python_app.connector import ListenerConfig
kafka_listener_config = ListenerConfig(
bootstrap_servers=['ip1:port1', 'ip2:port2', ...],
process_message_cb=my_process_message_func,
consumer_config={'group_id': 'test_group'},
topics=['topic1', 'topic2', ...],
logger=my_logger
)
- bootstrap_servers: list of kafka bootstrap servers addresses 'host:port'.
- process_message_cb: a function that will be called on each message.
from kafka.consumer.fetcher import ConsumerRecord
def my_process_message_func(message: ConsumerRecord):
print('Received kafka message: key: {}, value: {}'.format(
message.key,
message.value
))
NOTE: Use async function for process_message_cb in order to process messages concurrently.
- consumer_config [OPTIONAL]: kafka consumer configuration ( see kafka-python documentation).
- topics: list of topics to listen from.
- logger [OPTIONAL]: any logger with standard log methods. If not provided, the standard python logger is used.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for kafka_python_app-0.2.0-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 1df3897fc82c1db5de69d91122791c5f1ea149e191b5a8944d54768e41f32df5 |
|
MD5 | 7fab0a18c7bfaa25ea305e1e9f697eed |
|
BLAKE2b-256 | f0f4517ccd95142eb0cd7f5159c8f4f1917978c28fa6625fa4106e5f671a130e |