Skip to main content

A mini-framework for Kafka apps

Project description

Kaf

Så er det tid til en kop kaf'

Kaf is a small Python framework for creating Kafka apps. It is inspired by Faust, but differs in the following ways:

  • Kaf is synchronous (async is future work)
  • Kaf is compatible with Azure Eventhubs (over Kafka interface)
  • Kaf is designed to work with different brokers for the consumer and producer

The framework depends on Confluent Kafka.

How to use

Minimal example:

import logging

from kaf import KafkaApp

consumer_config = {'bootstrap.servers': 'kafka:9092', 'group.id': 'myapp'}
producer_config = {'bootstrap.servers': 'kafka:9092'}

app = KafkaApp(
    'myapp',
    consumer_config=consumer_config,
    producer_config=producer_config,
    consumer_timeout=5,
    consumer_batch_size=1    
)
app.logger.setLevel(logging.INFO)

@app.process(topic='foo', publish_to='bar', accepts='json', returns='json')
def add_one(input):
    number = input['number']
    yield {'result':  number+1}, bytes(number)

@app.on_processed
def done(msg, seconds_elapsed):
    app.logger.info(f'Processed message in {seconds_elapsed} seconds')


if __name__ == '__main__':
  app.run()

How errors are handled

Kafka functions keep trying until they succeed. Each user function will get one chance to process each incoming message. Any exception raised by a user functions will only be logged, but otherwise ignored. If a user wants to implement retrying they can do that, but it will stall the pipeline until the function returns.

Future work:

Features to be added:

  • Add decorators for app events on_consume, on_processed and on_publised. This will allow to hook up e.g. Datadog metrics to these events.

How to deploy a new version

Steps (can maybe be improved):

  1. change version and download_url in setup.py
  2. git add + commit + push
  3. create new release in GitHub (check source-code link, should match download_url)
  4. Run python setup.py sdist
  5. Run twine upload dist/* --verbose (if not installed, pip install twine first)

Useful links used:

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kaf-0.2.3.tar.gz (5.7 kB view details)

Uploaded Source

File details

Details for the file kaf-0.2.3.tar.gz.

File metadata

  • Download URL: kaf-0.2.3.tar.gz
  • Upload date:
  • Size: 5.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.2.0 pkginfo/1.5.0.1 requests/2.24.0 setuptools/41.2.0 requests-toolbelt/0.9.1 tqdm/4.49.0 CPython/3.7.7

File hashes

Hashes for kaf-0.2.3.tar.gz
Algorithm Hash digest
SHA256 d900ada4f7e8eeccd31d3bff9c38003b3a27d80c9b01318a9979bad67a63bba0
MD5 e4471374ae2406a4282d50de549ed4b8
BLAKE2b-256 01f5b329cdc1085c4108288836566ef03404c6da799aa19969d9377dd8fdbe3b

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page