Python SDK for Kafka event streaming (Grasp Labs)
Project description
ds-event-stream-python-sdk
A Python SDK for producing and consuming events on a Kafka-based event stream, following Grasp Labs' event stream conventions.
Features
- Dataclass Models: Auto-generated from JSON schemas for type-safe event handling
- Kafka Producer/Consumer Services: Simple interfaces for sending and receiving events
- Service Principal Configuration: Centralized config via
KafkaConfigfor authentication and connection settings - Mockable for Testing: All Kafka interactions can be mocked for CI and local testing
Installation
-
Clone the repository:
git clone https://github.com/grasp-labs/ds-event-stream-python-sdk.git cd ds-event-stream-python-sdk
-
Set up a Python virtual environment:
python3 -m venv .venv source .venv/bin/activate # On Windows: .venv\Scripts\activate
-
Install the package:
pip install -e .
-
(Optional) Generate models from schemas:
make generate-models
Quick Start
1. Configure Kafka Connection
from dseventstream.kafka.kafka_config import KafkaConfig
config = KafkaConfig(
bootstrap_servers="kafka-prod:9092",
username="service-principal",
password="secret"
)
2. Create Producer/Consumer
from dseventstream.kafka.kafka_service import KafkaProducerService, KafkaConsumerService
producer = KafkaProducerService(config=config)
consumer = KafkaConsumerService(config=config, group_id="my-group")
3. Send an Event
from dseventstream.models.event import Event
event = Event(
id="event-id",
session_id="session-id",
request_id="request-id",
tenant_id="tenant-id",
event_type="type",
event_source="source",
metadata={"key": "value"},
timestamp="2025-09-18T00:00:00Z",
created_by="user",
md5_hash="..."
)
producer.send("topic-name", event)
4. Consume Events
def on_message(event_dict):
print(f"Received event: {event_dict}")
consumer.consume("topic-name", on_message)
Package Structure
dseventstream/
├── models/
│ ├── event.py # Event dataclass (auto-generated)
│ └── system_topics.py # System topics enum (auto-generated)
└── kafka/
├── kafka_service.py # Producer/Consumer service classes
└── kafka_config.py # Shared config for Kafka principals
docs/ # Documentation source files
├── README.md # Main documentation
├── usage.md # Usage examples
├── api.md # API reference
├── testing.md # Testing guide
└── license.md # License information
tests/ # Test suite
├── test_kafka_producer.py
├── test_kafka_consumer.py
└── test_kafka_mock.py
schemas/ # JSON schemas for model generation
├── event.json
└── system-topics.json
.github/workflows/ # CI/CD pipelines
├── deploy-docs.yml # Documentation deployment
└── deploy-pypi.yml # PyPI release automation
API Reference
Core Classes
KafkaConfig
Centralized configuration for Kafka connections.
from dseventstream.kafka.kafka_config import KafkaConfig
# Production configuration
config = KafkaConfig(
bootstrap_servers="kafka-prod:9092",
username="service-principal",
password="secret"
)
# Development configuration
config = KafkaConfig(bootstrap_servers="localhost:9092")
KafkaProducerService
Service for publishing events to Kafka topics.
from dseventstream.kafka.kafka_service import KafkaProducerService
producer = KafkaProducerService(config=config)
producer.send("topic-name", event, key="optional-key")
KafkaConsumerService
Service for consuming events from Kafka topics.
from dseventstream.kafka.kafka_service import KafkaConsumerService
consumer = KafkaConsumerService(config=config, group_id="my-group")
def handle_message(event_dict):
print(f"Received: {event_dict}")
consumer.consume("topic-name", handle_message)
Event
Auto-generated dataclass for event structure.
from dseventstream.models.event import Event
event = Event(
id="unique-event-id",
session_id="session-123",
request_id="request-456",
tenant_id="tenant-789",
event_type="user.action",
event_source="web-app",
metadata={"key": "value"},
timestamp="2025-01-01T00:00:00Z",
created_by="user-id",
md5_hash="computed-hash"
)
Development
Prerequisites
- Python 3.13+
- Make (for code generation)
Setup Development Environment
# Install development dependencies
pip install -e ".[dev]"
# Generate models from schemas
make generate-models
# Run tests
make test
Testing
All Kafka interactions are mockable using unittest.mock. Example tests are provided in the tests/ folder.
# Run all tests
python -m unittest discover -s tests
# Run specific test
python -m unittest tests.test_kafka_producer
Security
- SASL_PLAINTEXT and SCRAM-SHA-512 are used for authentication (see
KafkaConfig) - For security issues, please see our Security Policy
Contributing
We welcome contributions! Here's how to get started:
Development Workflow
-
Fork and Clone
git clone https://github.com/your-username/ds-event-stream-py-sdk.git cd ds-event-stream-py-sdk
-
Set up Development Environment
python3 -m venv .venv source .venv/bin/activate # On Windows: .venv\Scripts\activate pip install -e ".[dev]"
-
Make Changes and Test
# Generate models if needed make generate-models # Run tests make test # or python -m unittest discover -s tests
-
Create Pull Request
- Create a feature branch:
git checkout -b feature/your-feature-name - Make your changes and commit them
- Push to your fork and create a PR
- Create a feature branch:
Automated Release Process
This project uses automated releases when PRs are merged to main. The version bump type is determined by your PR title:
Version Bump Types
| PR Title Contains | Version Bump | Example |
|---|---|---|
major, breaking, BREAKING CHANGE |
Major | 0.1.0 → 1.0.0 |
feat, feature, minor |
Minor | 0.1.0 → 0.2.0 |
| Default (bug fixes, docs, etc.) | Patch | 0.1.0 → 0.1.1 |
Examples
# Patch release (bug fix)
PR Title: "Fix kafka connection timeout issue"
→ Version: 0.1.0 → 0.1.1
# Minor release (new feature)
PR Title: "feat: Add event filtering capabilities"
→ Version: 0.1.0 → 0.2.0
# Major release (breaking change)
PR Title: "BREAKING CHANGE: Redesign API interface"
→ Version: 0.1.0 → 1.0.0
What Happens When PR is Merged
- ✅ Version automatically bumped in
pyproject.toml - ✅ Git tag created (
v{new_version}) - ✅ GitHub release created with changelog
- ✅ Package published to PyPI automatically
Code Style and Standards
- Follow PEP 8 for Python code style
- Add type hints where appropriate
- Write tests for new functionality
- Update documentation for API changes
- Use descriptive commit messages
Testing
# Run all tests
python -m unittest discover -s tests
# Run specific test file
python -m unittest tests.test_kafka_producer
# Run with coverage (if installed)
coverage run -m unittest discover -s tests
coverage report
Documentation
Full documentation is available at: https://grasp-labs.github.io/ds-event-stream-python-sdk/
Installation for End Users
From PyPI (Recommended)
pip install ds-event-stream-python-sdk
From Source
git clone https://github.com/grasp-labs/ds-event-stream-py-sdk.git
cd ds-event-stream-py-sdk
pip install -e .
License
This project is licensed under the Apache License 2.0 - see the LICENSE file for details.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file ds_event_stream_python_sdk-0.1.0.tar.gz.
File metadata
- Download URL: ds_event_stream_python_sdk-0.1.0.tar.gz
- Upload date:
- Size: 12.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
fa3a32a1106c0c819dc9ef6a28c3c43b4f327dc431cc842db1cbceead5f11763
|
|
| MD5 |
565c1aed2aa5ebc8aaf6428feb92d28a
|
|
| BLAKE2b-256 |
d9b0cc681b4aaf17b8321b7c091c9dff424645cbb8b8951f89cf441adf936ea5
|
File details
Details for the file ds_event_stream_python_sdk-0.1.0-py3-none-any.whl.
File metadata
- Download URL: ds_event_stream_python_sdk-0.1.0-py3-none-any.whl
- Upload date:
- Size: 12.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8e3d55c9ccd7adc445f7601bb60c6a04ce861d07573b9490bb06f1db4ffbe3a8
|
|
| MD5 |
23c9cc66e5cb4fbb96a5aaaed3e43873
|
|
| BLAKE2b-256 |
5944619368363d842e98bf4053e99bc8f9ecd94961ae90ee47047621d285c3d9
|