Django application to produce/consume events from Kafka
Project description
Django application to produce/consume events from Kafka
Installation
pip install django-streams
or with poetry
poetry add django-streams
and add it to INSTALLED_APPS:
INSTALLED_APPS = [
...
"django_streams",
...
"my_streams_app",
# etc...
]
Documentation
https://kpn.github.io/django-streams/
Usage
create the engine:
# my_streams_app/engine.py
from django_streams import create_engine
from kstreams.backends import Kafka
stream_engine = create_engine(
title="test-engine",
backend=Kafka(),
)
To configure the backend follow the kstreams backend documentation
Consuming events
Define your streams:
# my_streams_app/streams.py
from kstreams import ConsumerRecord
from .engine import stream_engine
@stream_engine.stream("dev-kpn-des--hello-kpn", group_id="django-streams-principal-group-id") # your consumer
async def consumer_task(cr: ConsumerRecord):
async for cr in stream:
logger.info(f"Event consumed: headers: {cr.headers}, value: {cr.value}")
and then in your apps.py you must import the python module or your coroutines
# my_streams_app/apps.py
from django.apps import AppConfig
class StreamingAppConfig(AppConfig):
name = "streaming_app"
def ready(self):
from . import streams # import the streams module
Now you can run the worker:
python manage.py worker
Producing events
Producing events can be sync or async. If you are in a sync context you must use stream_engine.sync_send, otherwise stream_engine.send. For both cases a RecordMetadata is returned.
# streaming_app/views.py
from django.http import HttpResponse
from django.views.generic import View
from de.core.conf import settings
from .engine import stream_engine
class HelloWorldView(View):
def get(self, request, *args, **kwargs):
topic = f"{settings.KAFKA_TOPIC_PREFIX}hello-kpn"
record_metadata = stream_engine.sync_send(
topic,
value=b"hello world",
key="hello",
partition=None,
timestamp_ms=None,
headers=None,
)
return HttpResponse(f"Event metadata: {record_metadata}")
Benchmark
Producer:
| Total produced events | Time (seconds) |
|---|---|
| 1 | 0.004278898239135742 |
| 10 | 0.030963897705078125 |
| 100 | 0.07049298286437988 |
| 1000 | 0.6609988212585449 |
| 10000 | 6.501222133636475 |
Running tests
./scrtips/test
Code formating
./scrtips/format
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file django_streams-3.0.0.tar.gz.
File metadata
- Download URL: django_streams-3.0.0.tar.gz
- Upload date:
- Size: 15.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.8.3 CPython/3.12.4 Darwin/24.2.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
45b23fe98b2fb688b8172014f90671015551f4212ba4e9274b615a513461136e
|
|
| MD5 |
e195a291973b2513c08706afc7741798
|
|
| BLAKE2b-256 |
9fe3419222adb3fbcf36fd465364f6c0d9432828e6ddc465a7d7cdcfdd808980
|
File details
Details for the file django_streams-3.0.0-py3-none-any.whl.
File metadata
- Download URL: django_streams-3.0.0-py3-none-any.whl
- Upload date:
- Size: 11.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.8.3 CPython/3.12.4 Darwin/24.2.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5fed031c84cca41a66be95ac6ab967ad571f59c705b55895de6f752eb1d50e00
|
|
| MD5 |
a9f7a1c76325991988f11ebd66efa167
|
|
| BLAKE2b-256 |
2e16cb08c8d13c03923a8685ff3dca0ccad08d5c139cdf70ad1c862ace11bae3
|