Skip to main content

A modern Python SDK for Kafka event streaming (Grasp Labs)

Project description

ds-event-stream-python-sdk

A Python SDK for producing and consuming events on a Kafka-based event stream, following Grasp Labs' event stream conventions.

Features

  • Dataclass Models: Auto-generated from JSON schemas for type-safe event handling
  • Kafka Producer/Consumer Services: Simple interfaces for sending and receiving events
  • Service Principal Configuration: Centralized config via KafkaConfig for authentication and connection settings
  • Mockable for Testing: All Kafka interactions can be mocked for CI and local testing

Installation

  1. Clone the repository:

    git clone https://github.com/grasp-labs/ds-event-stream-python-sdk.git
    cd ds-event-stream-python-sdk
    
  2. Set up a Python virtual environment:

    python3 -m venv .venv
    source .venv/bin/activate  # On Windows: .venv\Scripts\activate
    
  3. Install the package:

    pip install -e .
    
  4. (Optional) Generate models from schemas:

    make generate-models
    

Quick Start

1. Configure Kafka Connection

from dseventstream.kafka.kafka_config import KafkaConfig

config = KafkaConfig(
    bootstrap_servers="kafka-prod:9092",
    username="service-principal",
    password="secret"
)

2. Create Producer/Consumer

from dseventstream.kafka.kafka_service import KafkaProducerService, KafkaConsumerService

producer = KafkaProducerService(config=config)
consumer = KafkaConsumerService(config=config, group_id="my-group")

3. Send an Event

from dseventstream.models.event import Event

event = Event(
    id="event-id",
    session_id="session-id",
    request_id="request-id",
    tenant_id="tenant-id",
    event_type="type",
    event_source="source",
    metadata={"key": "value"},
    timestamp="2025-09-18T00:00:00Z",
    created_by="user",
    md5_hash="..."
)

producer.send("topic-name", event)

4. Consume Events

def on_message(event_dict):
    print(f"Received event: {event_dict}")

consumer.consume("topic-name", on_message)

Package Structure

dseventstream/
├── models/
│   ├── event.py           # Event dataclass (auto-generated)
│   └── system_topics.py   # System topics enum (auto-generated)
└── kafka/
    ├── kafka_service.py   # Producer/Consumer service classes
    └── kafka_config.py    # Shared config for Kafka principals

docs/                      # Documentation source files
├── README.md             # Main documentation
├── usage.md              # Usage examples
├── api.md                # API reference
├── testing.md            # Testing guide
└── license.md            # License information

tests/                    # Test suite
├── test_kafka_producer.py
├── test_kafka_consumer.py
└── test_kafka_mock.py

schemas/                  # JSON schemas for model generation
├── event.json
└── system-topics.json

.github/workflows/        # CI/CD pipelines
├── deploy-docs.yml       # Documentation deployment
└── deploy-pypi.yml       # PyPI release automation

API Reference

Core Classes

KafkaConfig

Centralized configuration for Kafka connections.

from dseventstream.kafka.kafka_config import KafkaConfig

# Production configuration
config = KafkaConfig(
    bootstrap_servers="kafka-prod:9092",
    username="service-principal",
    password="secret"
)

# Development configuration
config = KafkaConfig(bootstrap_servers="localhost:9092")

KafkaProducerService

Service for publishing events to Kafka topics.

from dseventstream.kafka.kafka_service import KafkaProducerService

producer = KafkaProducerService(config=config)
producer.send("topic-name", event, key="optional-key")

KafkaConsumerService

Service for consuming events from Kafka topics.

from dseventstream.kafka.kafka_service import KafkaConsumerService

consumer = KafkaConsumerService(config=config, group_id="my-group")

def handle_message(event_dict):
    print(f"Received: {event_dict}")

consumer.consume("topic-name", handle_message)

Event

Auto-generated dataclass for event structure.

from dseventstream.models.event import Event

event = Event(
    id="unique-event-id",
    session_id="session-123",
    request_id="request-456",
    tenant_id="tenant-789",
    event_type="user.action",
    event_source="web-app",
    metadata={"key": "value"},
    timestamp="2025-01-01T00:00:00Z",
    created_by="user-id",
    md5_hash="computed-hash"
)

Development

Prerequisites

  • Python 3.13+
  • Make (for code generation)

Setup Development Environment

# Install development dependencies
pip install -e ".[dev]"

# Generate models from schemas
make generate-models

# Run tests
make test

Testing

All Kafka interactions are mockable using unittest.mock. Example tests are provided in the tests/ folder.

# Run all tests
python -m unittest discover -s tests

# Run specific test
python -m unittest tests.test_kafka_producer

Security

  • SASL_PLAINTEXT and SCRAM-SHA-512 are used for authentication (see KafkaConfig)
  • For security issues, please see our Security Policy

Contributing

We welcome contributions! Here's how to get started:

Development Workflow

  1. Fork and Clone

    git clone https://github.com/your-username/ds-event-stream-py-sdk.git
    cd ds-event-stream-py-sdk
    
  2. Set up Development Environment

    python3 -m venv .venv
    source .venv/bin/activate  # On Windows: .venv\Scripts\activate
    pip install -e ".[dev]"
    
  3. Make Changes and Test

    # Generate models if needed
    make generate-models
    
    # Run tests
    make test
    # or
    python -m unittest discover -s tests
    
  4. Create Pull Request

    • Create a feature branch: git checkout -b feature/your-feature-name
    • Make your changes and commit them
    • Push to your fork and create a PR

Automated Release Process

This project uses automated releases when PRs are merged to main. The version bump type is determined by your PR title:

Version Bump Types

PR Title Contains Version Bump Example
major, breaking, BREAKING CHANGE Major 0.1.01.0.0
feat, feature, minor Minor 0.1.00.2.0
Default (bug fixes, docs, etc.) Patch 0.1.00.1.1

Examples

# Patch release (bug fix)
PR Title: "Fix kafka connection timeout issue" Version: 0.1.0  0.1.1

# Minor release (new feature)
PR Title: "feat: Add event filtering capabilities" Version: 0.1.0  0.2.0

# Major release (breaking change)
PR Title: "BREAKING CHANGE: Redesign API interface" Version: 0.1.0  1.0.0

What Happens When PR is Merged

  1. Version automatically bumped in pyproject.toml
  2. Git tag created (v{new_version})
  3. GitHub release created with changelog
  4. Package published to PyPI automatically

Code Style and Standards

  • Follow PEP 8 for Python code style
  • Add type hints where appropriate
  • Write tests for new functionality
  • Update documentation for API changes
  • Use descriptive commit messages

Testing

# Run all tests
python -m unittest discover -s tests

# Run specific test file
python -m unittest tests.test_kafka_producer

# Run with coverage (if installed)
coverage run -m unittest discover -s tests
coverage report

Documentation

Full documentation is available at: https://grasp-labs.github.io/ds-event-stream-python-sdk/

Installation for End Users

From PyPI (Recommended)

pip install ds-event-stream-python-sdk

From Source

git clone https://github.com/grasp-labs/ds-event-stream-py-sdk.git
cd ds-event-stream-py-sdk
pip install -e .

License

This project is licensed under the Apache License 2.0 - see the LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ds_event_stream_python_sdk-0.1.1.tar.gz (12.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

ds_event_stream_python_sdk-0.1.1-py3-none-any.whl (12.0 kB view details)

Uploaded Python 3

File details

Details for the file ds_event_stream_python_sdk-0.1.1.tar.gz.

File metadata

File hashes

Hashes for ds_event_stream_python_sdk-0.1.1.tar.gz
Algorithm Hash digest
SHA256 98654adffd4be0f9b7ecfdf800b0f227dbe86b336e0dbb49b15789682a9e81eb
MD5 635dfb94a184e54da1c865bb6248f92f
BLAKE2b-256 6fde938f74b75ceebf3d4b8caab27abf847b1417111f496e68343bfca7b11c9c

See more details on using hashes here.

File details

Details for the file ds_event_stream_python_sdk-0.1.1-py3-none-any.whl.

File metadata

File hashes

Hashes for ds_event_stream_python_sdk-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 d39228d6bd48f7c201be897a11fdaa1cdc9da5ed779f2727866acf84d370e09c
MD5 dd67fd7fa585f31691e9b810fb04605e
BLAKE2b-256 e64e8f08a7ccd1cfe0deb7d6c94ff5f59e8be822da12496258b06bdcdb155644

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page