A mini-framework for Kafka apps
Project description
Kaf
Så er det tid til en kop kaf'
Kaf is a small Python framework for creating Kafka apps. It is inspired by Faust, but differs in the following ways:
- Kaf is synchronous (async is future work)
- Kaf is compatible with Azure Eventhubs (over Kafka interface)
- Kaf is designed to work with different brokers for the consumer and producer
The framework depends on Confluent Kafka.
How to use
Minimal example:
import logging
from kaf import KafkaApp
consumer_config = {'bootstrap.servers': 'kafka:9092', 'group.id': 'myapp'}
producer_config = {'bootstrap.servers': 'kafka:9092'}
app = KafkaApp(
'myapp',
consumer_config=consumer_config,
producer_config=producer_config,
consumer_timeout=5,
consumer_batch_size=1
)
app.logger.setLevel(logging.INFO)
@app.process(topic='foo', publish_to='bar', accepts='json', returns='json')
def add_one(input):
number = input['number']
yield {'result': number+1}, bytes(number)
@app.on_processed
def done(msg, seconds_elapsed):
app.logger.info(f'Processed message in {seconds_elapsed} seconds')
if __name__ == '__main__':
app.run()
How errors are handled
Kafka functions keep trying until they succeed. If the user function throws an exception, kaf will continue to the next message but not commit the message that failed.
Future work:
Features to be added:
- Add decorators for app events
on_consume
,on_processed
andon_publised
. This will allow to hook up e.g. Datadog metrics to these events.
How to deploy a new version
Steps (can maybe be improved):
- change version and download_url in setup.py
- git add + commit + push
- create new release in GitHub (check source-code link, should match download_url)
- Run
python setup.py sdist
- Run
twine upload dist/* --verbose
(if not installed,pip install twine
first)
Useful links used:
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
kaf-0.1.9.tar.gz
(5.6 kB
view hashes)