Skip to main content

Build simple kafka streams applications

Project description

Kstreams

kstreams is a library/micro framework to use with kafka. It has simple kafka streams implementation that gives certain guarantees, see below.

Build status codecov python version


Documentation: https://kpn.github.io/kstreams/


Installation

pip install kstreams

You will need a worker, we recommend aiorun

pip install aiorun

Usage

import aiorun
from kstreams import create_engine, ConsumerRecord


stream_engine = create_engine(title="my-stream-engine")

@stream_engine.stream("local--kstream")
async def consume(cr: ConsumerRecord):
    print(f"Event consumed: headers: {cr.headers}, payload: {cr.value}")


async def produce():
    payload = b'{"message": "Hello world!"}'

    for i in range(5):
        metadata = await stream_engine.send("local--kstreams", value=payload)
        print(f"Message sent: {metadata}")


async def start():
    await stream_engine.start()
    await produce()


async def shutdown(loop):
    await stream_engine.stop()


if __name__ == "__main__":
    aiorun.run(start(), stop_on_unhandled_errors=True, shutdown_callback=shutdown)

Features

  • Produce events
  • Consumer events with Streams
  • Subscribe to topics by pattern
  • Prometheus metrics and custom monitoring
  • TestClient
  • Custom Serialization and Deserialization
  • Easy to integrate with any async framework. No tied to any library!!
  • Yield events from streams
  • Opentelemetry Instrumentation
  • Middlewares
  • Hooks (on_startup, on_stop, after_startup, after_stop)
  • Store (kafka streams pattern)
  • Stream Join
  • Windowing

Development

This repo requires the use of poetry instead of pip. Note: If you want to have the virtualenv in the same path as the project first you should run poetry config --local virtualenvs.in-project true

To install the dependencies just execute:

poetry install

Then you can activate the virtualenv with

poetry shell

Run test:

./scripts/test

Run code formatting with ruff:

./scripts/format

Commit messages

We use conventional commits for the commit message.

The use of commitizen is recommended. Commitizen is part of the dev dependencies.

cz commit

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kstreams-0.24.8.tar.gz (29.7 kB view details)

Uploaded Source

Built Distribution

kstreams-0.24.8-py3-none-any.whl (36.5 kB view details)

Uploaded Python 3

File details

Details for the file kstreams-0.24.8.tar.gz.

File metadata

  • Download URL: kstreams-0.24.8.tar.gz
  • Upload date:
  • Size: 29.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.4 CPython/3.12.7 Linux/6.5.0-1025-azure

File hashes

Hashes for kstreams-0.24.8.tar.gz
Algorithm Hash digest
SHA256 c0b1cb7f2171a893afae53223ea4764b2fa620e65850b9ef5a2f31b75a5a182c
MD5 632bc9b00fe1a9ccaefb22335f82e9d5
BLAKE2b-256 a61c7a3607571c0d897c0fd8a4eadd1d69f549c60618aaae401812340c84cc1b

See more details on using hashes here.

File details

Details for the file kstreams-0.24.8-py3-none-any.whl.

File metadata

  • Download URL: kstreams-0.24.8-py3-none-any.whl
  • Upload date:
  • Size: 36.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.4 CPython/3.12.7 Linux/6.5.0-1025-azure

File hashes

Hashes for kstreams-0.24.8-py3-none-any.whl
Algorithm Hash digest
SHA256 3f0d6f4a5b0a3bc2e1dd8d8cfcd2bb070d58e5b75d056e0b76ea878113308a90
MD5 32c5056b0c2464186b00d516facdd9a0
BLAKE2b-256 6af0fdd8adbae81eb2a3a9c770e47619b6aa6aebaf313b7beaaad1e07b6b2a32

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page