Skip to main content

Python SDK for Kafka event streaming (Grasp Labs)

Project description

ds-event-stream-python-sdk

A Python SDK for producing and consuming events on a Kafka-based event stream, following Grasp Labs' event stream conventions.

Features

  • Dataclass Models: Auto-generated from JSON schemas for type-safe event handling
  • Kafka Producer/Consumer Services: Simple interfaces for sending and receiving events
  • Service Principal Configuration: Centralized config via KafkaConfig for authentication and connection settings
  • Mockable for Testing: All Kafka interactions can be mocked for CI and local testing

Installation

  1. Clone the repository:

    git clone https://github.com/grasp-labs/ds-event-stream-python-sdk.git
    cd ds-event-stream-python-sdk
    
  2. Set up a Python virtual environment:

    python3 -m venv .venv
    source .venv/bin/activate  # On Windows: .venv\Scripts\activate
    
  3. Install the package:

    pip install -e .
    
  4. (Optional) Generate models from schemas:

    make generate-models
    

Quick Start

1. Configure Kafka Connection

from dseventstream.kafka.kafka_config import KafkaConfig

config = KafkaConfig(
    bootstrap_servers="kafka-prod:9092",
    username="service-principal",
    password="secret"
)

2. Create Producer/Consumer

from dseventstream.kafka.kafka_service import KafkaProducerService, KafkaConsumerService

producer = KafkaProducerService(config=config)
consumer = KafkaConsumerService(config=config, group_id="my-group")

3. Send an Event

from dseventstream.models.event import Event

event = Event(
    id="event-id",
    session_id="session-id",
    request_id="request-id",
    tenant_id="tenant-id",
    event_type="type",
    event_source="source",
    metadata={"key": "value"},
    timestamp="2025-09-18T00:00:00Z",
    created_by="user",
    md5_hash="..."
)

producer.send("topic-name", event)

4. Consume Events

def on_message(event_dict):
    print(f"Received event: {event_dict}")

consumer.consume("topic-name", on_message)

Package Structure

dseventstream/
├── models/
│   ├── event.py           # Event dataclass (auto-generated)
│   └── system_topics.py   # System topics enum (auto-generated)
└── kafka/
    ├── kafka_service.py   # Producer/Consumer service classes
    └── kafka_config.py    # Shared config for Kafka principals

docs/                      # Documentation source files
├── README.md             # Main documentation
├── usage.md              # Usage examples
├── api.md                # API reference
├── testing.md            # Testing guide
└── license.md            # License information

tests/                    # Test suite
├── test_kafka_producer.py
├── test_kafka_consumer.py
└── test_kafka_mock.py

schemas/                  # JSON schemas for model generation
├── event.json
└── system-topics.json

.github/workflows/        # CI/CD pipelines
├── deploy-docs.yml       # Documentation deployment
└── deploy-pypi.yml       # PyPI release automation

API Reference

Core Classes

KafkaConfig

Centralized configuration for Kafka connections.

from dseventstream.kafka.kafka_config import KafkaConfig

# Production configuration
config = KafkaConfig(
    bootstrap_servers="kafka-prod:9092",
    username="service-principal",
    password="secret"
)

# Development configuration
config = KafkaConfig(bootstrap_servers="localhost:9092")

KafkaProducerService

Service for publishing events to Kafka topics.

from dseventstream.kafka.kafka_service import KafkaProducerService

producer = KafkaProducerService(config=config)
producer.send("topic-name", event, key="optional-key")

KafkaConsumerService

Service for consuming events from Kafka topics.

from dseventstream.kafka.kafka_service import KafkaConsumerService

consumer = KafkaConsumerService(config=config, group_id="my-group")

def handle_message(event_dict):
    print(f"Received: {event_dict}")

consumer.consume("topic-name", handle_message)

Event

Auto-generated dataclass for event structure.

from dseventstream.models.event import Event

event = Event(
    id="unique-event-id",
    session_id="session-123",
    request_id="request-456",
    tenant_id="tenant-789",
    event_type="user.action",
    event_source="web-app",
    metadata={"key": "value"},
    timestamp="2025-01-01T00:00:00Z",
    created_by="user-id",
    md5_hash="computed-hash"
)

Development

Prerequisites

  • Python 3.13+
  • Make (for code generation)

Setup Development Environment

# Install development dependencies
pip install -e ".[dev]"

# Generate models from schemas
make generate-models

# Run tests
make test

Testing

All Kafka interactions are mockable using unittest.mock. Example tests are provided in the tests/ folder.

# Run all tests
python -m unittest discover -s tests

# Run specific test
python -m unittest tests.test_kafka_producer

Security

  • SASL_PLAINTEXT and SCRAM-SHA-512 are used for authentication (see KafkaConfig)
  • For security issues, please see our Security Policy

Contributing

We welcome contributions! Here's how to get started:

Development Workflow

  1. Fork and Clone

    git clone https://github.com/your-username/ds-event-stream-py-sdk.git
    cd ds-event-stream-py-sdk
    
  2. Set up Development Environment

    python3 -m venv .venv
    source .venv/bin/activate  # On Windows: .venv\Scripts\activate
    pip install -e ".[dev]"
    
  3. Make Changes and Test

    # Generate models if needed
    make generate-models
    
    # Run tests
    make test
    # or
    python -m unittest discover -s tests
    
  4. Create Pull Request

    • Create a feature branch: git checkout -b feature/your-feature-name
    • Make your changes and commit them
    • Push to your fork and create a PR

Automated Release Process

This project uses automated releases when PRs are merged to main. The version bump type is determined by your PR title:

Version Bump Types

PR Title Contains Version Bump Example
major, breaking, BREAKING CHANGE Major 0.1.01.0.0
feat, feature, minor Minor 0.1.00.2.0
Default (bug fixes, docs, etc.) Patch 0.1.00.1.1

Examples

# Patch release (bug fix)
PR Title: "Fix kafka connection timeout issue" Version: 0.1.0  0.1.1

# Minor release (new feature)
PR Title: "feat: Add event filtering capabilities" Version: 0.1.0  0.2.0

# Major release (breaking change)
PR Title: "BREAKING CHANGE: Redesign API interface" Version: 0.1.0  1.0.0

What Happens When PR is Merged

  1. Version automatically bumped in pyproject.toml
  2. Git tag created (v{new_version})
  3. GitHub release created with changelog
  4. Package published to PyPI automatically

Code Style and Standards

  • Follow PEP 8 for Python code style
  • Add type hints where appropriate
  • Write tests for new functionality
  • Update documentation for API changes
  • Use descriptive commit messages

Testing

# Run all tests
python -m unittest discover -s tests

# Run specific test file
python -m unittest tests.test_kafka_producer

# Run with coverage (if installed)
coverage run -m unittest discover -s tests
coverage report

Documentation

Full documentation is available at: https://grasp-labs.github.io/ds-event-stream-python-sdk/

Installation for End Users

From PyPI (Recommended)

pip install ds-event-stream-python-sdk

From Source

git clone https://github.com/grasp-labs/ds-event-stream-py-sdk.git
cd ds-event-stream-py-sdk
pip install -e .

License

This project is licensed under the Apache License 2.0 - see the LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ds_event_stream_python_sdk-0.1.0.tar.gz (12.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

ds_event_stream_python_sdk-0.1.0-py3-none-any.whl (12.0 kB view details)

Uploaded Python 3

File details

Details for the file ds_event_stream_python_sdk-0.1.0.tar.gz.

File metadata

File hashes

Hashes for ds_event_stream_python_sdk-0.1.0.tar.gz
Algorithm Hash digest
SHA256 fa3a32a1106c0c819dc9ef6a28c3c43b4f327dc431cc842db1cbceead5f11763
MD5 565c1aed2aa5ebc8aaf6428feb92d28a
BLAKE2b-256 d9b0cc681b4aaf17b8321b7c091c9dff424645cbb8b8951f89cf441adf936ea5

See more details on using hashes here.

File details

Details for the file ds_event_stream_python_sdk-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for ds_event_stream_python_sdk-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 8e3d55c9ccd7adc445f7601bb60c6a04ce861d07573b9490bb06f1db4ffbe3a8
MD5 23c9cc66e5cb4fbb96a5aaaed3e43873
BLAKE2b-256 5944619368363d842e98bf4053e99bc8f9ecd94961ae90ee47047621d285c3d9

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page