Build simple kafka streams applications
Project description
Kstreams
kstreams
is a library/micro framework to use with kafka
. It has simple kafka streams implementation that gives certain guarantees, see below.
Documentation: https://kpn.github.io/kstreams/
Installation
pip install kstreams
You will need a worker, we recommend aiorun
pip install aiorun
Usage
import aiorun
from kstreams import create_engine, ConsumerRecord
stream_engine = create_engine(title="my-stream-engine")
@stream_engine.stream("local--kstream")
async def consume(cr: ConsumerRecord):
print(f"Event consumed: headers: {cr.headers}, payload: {cr.value}")
async def produce():
payload = b'{"message": "Hello world!"}'
for i in range(5):
metadata = await stream_engine.send("local--kstreams", value=payload)
print(f"Message sent: {metadata}")
async def start():
await stream_engine.start()
await produce()
async def shutdown(loop):
await stream_engine.stop()
if __name__ == "__main__":
aiorun.run(start(), stop_on_unhandled_errors=True, shutdown_callback=shutdown)
Features
- Produce events
- Consumer events with
Streams
- Subscribe to topics by
pattern
-
Prometheus
metrics and custom monitoring - TestClient
- Custom Serialization and Deserialization
- Easy to integrate with any
async
framework. No tied to any library!! - Yield events from streams
- Opentelemetry Instrumentation
- Middlewares
- Hooks (on_startup, on_stop, after_startup, after_stop)
- Store (kafka streams pattern)
- Stream Join
- Windowing
Development
This repo requires the use of poetry instead of pip.
Note: If you want to have the virtualenv
in the same path as the project first you should run poetry config --local virtualenvs.in-project true
To install the dependencies just execute:
poetry install
Then you can activate the virtualenv
with
poetry shell
Run test:
./scripts/test
Run code formatting with ruff:
./scripts/format
Commit messages
We use conventional commits for the commit message.
The use of commitizen is recommended. Commitizen is part of the dev dependencies.
cz commit
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file kstreams-0.26.0.tar.gz
.
File metadata
- Download URL: kstreams-0.26.0.tar.gz
- Upload date:
- Size: 30.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.8.4 CPython/3.12.7 Linux/6.5.0-1025-azure
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 16d0121f3d7f4fd0ad389944764c36da4a444776e105c1775eeb44a50fc00476 |
|
MD5 | 4212c30508c23a72468872003572a12f |
|
BLAKE2b-256 | 90fb8b5bacbd207d3425f166b0a923aa7567d257e71926387f244de059dc74d3 |
File details
Details for the file kstreams-0.26.0-py3-none-any.whl
.
File metadata
- Download URL: kstreams-0.26.0-py3-none-any.whl
- Upload date:
- Size: 37.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.8.4 CPython/3.12.7 Linux/6.5.0-1025-azure
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | b33c4ec873939c653ff611ad2f76683bf4f794430f7279fe232ca1e195fbc3d7 |
|
MD5 | 6a4f8e626fefdadba4a4fdae4b66572d |
|
BLAKE2b-256 | 0b96b1e08ba5bc2c87ecfc7ac3b87c15305b9c15c73d635480b15f7bc3879f16 |