Skip to main content

Django application to produce/consume events from Kafka

Project description

Django application to produce/consume events from Kafka

django streaming

Build status codecov python version

Installation

pip install django-streams

or with poetry

poetry add django-streams

and add it to INSTALLED_APPS:

INSTALLED_APPS = [
    ...
    "django_streams",
    ...
    "my_streams_app",
    # etc...
]

Documentation

https://kpn.github.io/django-streams/

Usage

create the engine:

# my_streams_app/engine.py
from django_streams import create_engine

from kstreams.backends import Kafka


stream_engine = create_engine(
    title="test-engine",
    backend=Kafka(),
)

To configure the backend follow the kstreams backend documentation

Consuming events

Define your streams:

# my_streams_app/streams.py
from kstreams import ConsumerRecord
from .engine import stream_engine


@stream_engine.stream("dev-kpn-des--hello-kpn", group_id="django-streams-principal-group-id")  # your consumer
async def consumer_task(cr: ConsumerRecord):
    async for cr in stream:
        logger.info(f"Event consumed: headers: {cr.headers}, value: {cr.value}")

and then in your apps.py you must import the python module or your coroutines

# my_streams_app/apps.py
from django.apps import AppConfig


class StreamingAppConfig(AppConfig):
    name = "streaming_app"

    def ready(self):
        from . import streams  # import the streams module

Now you can run the worker:

python manage.py worker

Producing events

Producing events can be sync or async. If you are in a sync context you must use stream_engine.sync_send, otherwise stream_engine.send. For both cases a RecordMetadata is returned.

# streaming_app/views.py
from django.http import HttpResponse
from django.views.generic import View

from de.core.conf import settings
from .engine import stream_engine


class HelloWorldView(View):

    def get(self, request, *args, **kwargs):
        topic = f"{settings.KAFKA_TOPIC_PREFIX}hello-kpn"

        record_metadata = stream_engine.sync_send(
            topic,
            value=b"hello world",
            key="hello",
            partition=None,
            timestamp_ms=None,
            headers=None,
        )

        return HttpResponse(f"Event metadata: {record_metadata}")

Benchmark

Producer:

Total produced events Time (seconds)
1 0.004278898239135742
10 0.030963897705078125
100 0.07049298286437988
1000 0.6609988212585449
10000 6.501222133636475

Running tests

./scrtips/test

Code formating

./scrtips/format

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

django_streams-3.0.0.tar.gz (15.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

django_streams-3.0.0-py3-none-any.whl (11.1 kB view details)

Uploaded Python 3

File details

Details for the file django_streams-3.0.0.tar.gz.

File metadata

  • Download URL: django_streams-3.0.0.tar.gz
  • Upload date:
  • Size: 15.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.3 CPython/3.12.4 Darwin/24.2.0

File hashes

Hashes for django_streams-3.0.0.tar.gz
Algorithm Hash digest
SHA256 45b23fe98b2fb688b8172014f90671015551f4212ba4e9274b615a513461136e
MD5 e195a291973b2513c08706afc7741798
BLAKE2b-256 9fe3419222adb3fbcf36fd465364f6c0d9432828e6ddc465a7d7cdcfdd808980

See more details on using hashes here.

File details

Details for the file django_streams-3.0.0-py3-none-any.whl.

File metadata

  • Download URL: django_streams-3.0.0-py3-none-any.whl
  • Upload date:
  • Size: 11.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.3 CPython/3.12.4 Darwin/24.2.0

File hashes

Hashes for django_streams-3.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 5fed031c84cca41a66be95ac6ab967ad571f59c705b55895de6f752eb1d50e00
MD5 a9f7a1c76325991988f11ebd66efa167
BLAKE2b-256 2e16cb08c8d13c03923a8685ff3dca0ccad08d5c139cdf70ad1c862ace11bae3

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page