Build simple kafka streams applications
Project description
Kstreams
kstreams is a library/micro framework to use with kafka. It has simple kafka streams implementation that gives certain guarantees, see below.
Documentation: https://kpn.github.io/kstreams/
Installation
pip install kstreams
You will need a worker, we recommend aiorun
pip install aiorun
Usage
import aiorun
from kstreams import create_engine, ConsumerRecord
stream_engine = create_engine(title="my-stream-engine")
@stream_engine.stream("local--kstream")
async def consume(cr: ConsumerRecord):
print(f"Event consumed: headers: {cr.headers}, payload: {cr.value}")
async def produce():
payload = b'{"message": "Hello world!"}'
for i in range(5):
metadata = await stream_engine.send("local--kstreams", value=payload)
print(f"Message sent: {metadata}")
async def start():
await stream_engine.start()
await produce()
async def shutdown(loop):
await stream_engine.stop()
if __name__ == "__main__":
aiorun.run(start(), stop_on_unhandled_errors=True, shutdown_callback=shutdown)
Features
- Produce events
- Consumer events with
Streams - Subscribe to topics by
pattern -
Prometheusmetrics and custom monitoring - TestClient
- Custom Serialization and Deserialization
- Easy to integrate with any
asyncframework. No tied to any library!! - Yield events from streams
- Opentelemetry Instrumentation
- Middlewares
- Hooks (on_startup, on_stop, after_startup, after_stop)
- Store (kafka streams pattern)
- Stream Join
- Windowing
Development
This repo requires the use of poetry instead of pip.
Note: If you want to have the virtualenv in the same path as the project first you should run poetry config --local virtualenvs.in-project true
To install the dependencies just execute:
poetry install
Then you can activate the virtualenv with
poetry shell
Run test:
./scripts/test
Run code formatting with ruff:
./scripts/format
Commit messages
We use conventional commits for the commit message.
The use of commitizen is recommended. Commitizen is part of the dev dependencies.
cz commit
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file kstreams-0.30.1.tar.gz.
File metadata
- Download URL: kstreams-0.30.1.tar.gz
- Upload date:
- Size: 33.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.3.1 CPython/3.14.2 Linux/6.11.0-1018-azure
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5756fcdbe8d26b15e5fa0afd38cc4a9a0bb0367c3844fe1793d9260832c389b8
|
|
| MD5 |
e787cd7e466b41c128ccccbf462ad5e0
|
|
| BLAKE2b-256 |
ce374d1522d0495d3f694282b6c52be8ea1ab2602804258d987668b082ce656d
|
File details
Details for the file kstreams-0.30.1-py3-none-any.whl.
File metadata
- Download URL: kstreams-0.30.1-py3-none-any.whl
- Upload date:
- Size: 42.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.3.1 CPython/3.14.2 Linux/6.11.0-1018-azure
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
78e1d3e6a5476487b639766f933c48aad5201ea1036814162eaf56a529e53db4
|
|
| MD5 |
405c3ec7c173ea2c1b59ea747f849079
|
|
| BLAKE2b-256 |
6ad58b8dfbbab7cdf5e87e102ffd4fdb462df3b66fcb90727cc7cd1f14792c1a
|