Complete Kafka integration for Dagster with enterprise DLQ tooling
Project description
Dagster Kafka Integration
The first and most comprehensive Kafka integration for Dagster with complete enterprise-grade features supporting all three major serialization formats and production security.
Complete Enterprise Solution
Version 1.0.0 - Dead Letter Queue Release with enterprise-grade error handling:
๐ NOW LIVE ON PyPI - Successfully published and validated! Install with pip install dagster-kafka and use CLI tools immediately.
- JSON Support: Native JSON message consumption from Kafka topics
- Avro Support: Full Avro message support with Schema Registry integration
- Protobuf Support: Complete Protocol Buffers integration with schema management
- Dead Letter Queue (DLQ): Enterprise-grade error handling with circuit breaker patterns
- Enterprise Security: Complete SASL/SSL authentication and encryption support
- Schema Evolution: Comprehensive validation with breaking change detection across all formats
- Production Monitoring: Real-time alerting with Slack/Email integration
- High Performance: Advanced caching, batching, and connection pooling
- Error Recovery: Multiple recovery strategies for production resilience
- Enterprise Ready: Complete observability and production-grade error handling
133 comprehensive tests passing - Full test coverage across all serialization formats, security configurations, enterprise features, and DLQ functionality.
Three Serialization Formats + Enterprise Security + DLQ
JSON Support
Perfect for APIs and simple data structures with DLQ error handling.
Avro Support
Schema Registry integration with evolution validation and DLQ support.
Protobuf Support
High-performance binary serialization with comprehensive tooling and DLQ handling.
Dead Letter Queue (DLQ) - New in v1.0.0
Enterprise-grade error handling with automatic routing of failed messages to dead letter topics for debugging and reprocessing.
Enterprise Security
Complete SASL/SSL authentication and encryption for production deployments.
Installation
pip install dagster-kafka
โ Validated Installation: Successfully tested in fresh environments. CLI tools work immediately after installation.
Quick Start
JSON Usage with DLQ
from dagster import asset, Definitions
from dagster_kafka import KafkaResource, KafkaIOManager, DLQStrategy
@asset
def api_events():
"""Consume JSON messages from Kafka topic with DLQ support."""
pass
defs = Definitions(
assets=[api_events],
resources={
"kafka": KafkaResource(bootstrap_servers="localhost:9092"),
"io_manager": KafkaIOManager(
kafka_resource=KafkaResource(bootstrap_servers="localhost:9092"),
consumer_group_id="my-dagster-pipeline",
enable_dlq=True,
dlq_strategy=DLQStrategy.RETRY_THEN_DLQ,
dlq_max_retries=3
)
}
)
Secure Production Usage with DLQ
from dagster import asset, Definitions
from dagster_kafka import KafkaResource, SecurityProtocol, SaslMechanism, KafkaIOManager, DLQStrategy
# Production-grade secure configuration with DLQ
secure_kafka = KafkaResource(
bootstrap_servers="prod-kafka-01:9092,prod-kafka-02:9092",
security_protocol=SecurityProtocol.SASL_SSL,
sasl_mechanism=SaslMechanism.SCRAM_SHA_256,
sasl_username="production-user",
sasl_password="secure-password",
ssl_ca_location="/etc/ssl/certs/kafka-ca.pem",
ssl_check_hostname=True
)
@asset
def secure_events():
"""Consume messages from secure production Kafka cluster with DLQ."""
pass
defs = Definitions(
assets=[secure_events],
resources={
"io_manager": KafkaIOManager(
kafka_resource=secure_kafka,
consumer_group_id="secure-production-pipeline",
enable_dlq=True,
dlq_strategy=DLQStrategy.CIRCUIT_BREAKER,
dlq_circuit_breaker_failure_threshold=5
)
}
)
Avro Usage with Schema Registry and DLQ
from dagster import asset, Config
from dagster_kafka import KafkaResource, avro_kafka_io_manager, DLQStrategy
class UserEventsConfig(Config):
schema_file: str = "schemas/user.avsc"
max_messages: int = 100
@asset(io_manager_key="avro_kafka_io_manager")
def user_data(context, config: UserEventsConfig):
"""Load user events using Avro schema with validation and DLQ."""
io_manager = context.resources.avro_kafka_io_manager
return io_manager.load_input(
context,
topic="user-events",
schema_file=config.schema_file,
max_messages=config.max_messages,
validate_evolution=True
)
Protobuf Usage with DLQ
from dagster import asset, Definitions
from dagster_kafka import KafkaResource, DLQStrategy
from dagster_kafka.protobuf_io_manager import create_protobuf_kafka_io_manager
@asset(io_manager_key="protobuf_kafka_io_manager")
def user_events():
"""Consume Protobuf messages from Kafka topic with DLQ support."""
pass
@asset
def processed_data(user_events):
"""Process Protobuf user events."""
print(f"Processing {len(user_events)} Protobuf events")
return {"processed_count": len(user_events)}
defs = Definitions(
assets=[user_events, processed_data],
resources={
"protobuf_kafka_io_manager": create_protobuf_kafka_io_manager(
kafka_resource=KafkaResource(bootstrap_servers="localhost:9092"),
schema_registry_url="http://localhost:8081", # Optional
consumer_group_id="dagster-protobuf-pipeline",
enable_dlq=True,
dlq_strategy=DLQStrategy.RETRY_THEN_DLQ,
dlq_max_retries=3
)
}
)
Dead Letter Queue (DLQ) Features (v1.0.0)
DLQ Strategies
- DISABLED: No DLQ processing
- IMMEDIATE: Send to DLQ immediately on failure
- RETRY_THEN_DLQ: Retry N times, then send to DLQ
- CIRCUIT_BREAKER: Circuit breaker pattern with DLQ fallback
Error Classification
- DESERIALIZATION_ERROR: Failed to deserialize message
- SCHEMA_ERROR: Schema validation failed
- PROCESSING_ERROR: Business logic error
- CONNECTION_ERROR: Kafka connection issues
- TIMEOUT_ERROR: Message processing timeout
- UNKNOWN_ERROR: Unclassified errors
Circuit Breaker Pattern
from dagster_kafka import DLQConfiguration, DLQStrategy
dlq_config = DLQConfiguration(
strategy=DLQStrategy.CIRCUIT_BREAKER,
circuit_breaker_failure_threshold=5, # Open after 5 failures
circuit_breaker_recovery_timeout_ms=30000, # Test recovery after 30s
circuit_breaker_success_threshold=2 # Close after 2 successes
)
DLQ Message Enrichment
DLQ messages include rich metadata for debugging:
{
"original_message": {
"topic": "user-events",
"partition": 0,
"offset": 12345,
"key": "user-123",
"timestamp": 1640995200000
},
"error_info": {
"type": "deserialization_error",
"message": "JSON decode error",
"traceback": "Full Python traceback...",
"failure_timestamp": "2025-01-15T10:30:00Z",
"retry_count": 3
},
"processing_metadata": {
"consumer_group_id": "my-pipeline",
"dagster_run_id": "12345-67890",
"dagster_asset_key": "user_events"
}
}
DLQ Production Tooling Suite (v1.0.0)
Complete enterprise-grade tooling for Dead Letter Queue management and monitoring.
DLQ Inspector
Analyze failed messages in DLQ topics with comprehensive error pattern analysis.
# Analyze DLQ messages for error patterns
dlq-inspector --topic user-events --max-messages 20
# Inspect specific DLQ topic
dlq-inspector --dlq-topic payments_dlq --max-messages 50
DLQ Message Replayer
Replay failed messages back to original topics with filtering and safety controls.
# Replay messages with filtering
dlq-replayer --source-topic user-events_dlq --target-topic user-events --error-types "timeout_error" --max-messages 100 --confirm
# Replay with rate limiting for production safety
dlq-replayer --source-topic orders_dlq --target-topic orders --rate-limit 10 --dry-run
DLQ Monitoring Suite
Production monitoring and alerting for DLQ health across multiple topics.
# Monitor DLQ health across topics
dlq-monitor --topics user-events_dlq,orders_dlq,payments_dlq --output-format json
# Set up automated alerting
dlq-alerts --topic critical-events_dlq --max-messages 500 --webhook-url https://hooks.slack.com/...
# Operations dashboard
dlq-dashboard --topics user-events_dlq,orders_dlq --warning-threshold 100 --critical-threshold 1000
Enterprise Security Features (v1.0.0)
Security Protocols Supported
- PLAINTEXT: For local development and testing
- SSL: Certificate-based encryption
- SASL_PLAINTEXT: Username/password authentication
- SASL_SSL: Combined authentication and encryption (recommended for production)
SASL Authentication Mechanisms
- PLAIN: Simple username/password authentication
- SCRAM-SHA-256: Secure challenge-response authentication
- SCRAM-SHA-512: Enhanced secure authentication
- GSSAPI: Kerberos authentication for enterprise environments
- OAUTHBEARER: OAuth-based authentication
Production Security Configuration
from dagster_kafka import KafkaResource, SecurityProtocol, SaslMechanism
# SASL/SSL Production Configuration (Most Secure)
production_kafka = KafkaResource(
bootstrap_servers="prod-kafka:9092",
security_protocol=SecurityProtocol.SASL_SSL,
sasl_mechanism=SaslMechanism.SCRAM_SHA_256,
sasl_username="dagster-prod-user",
sasl_password="secure-production-password",
ssl_ca_location="/etc/ssl/certs/kafka-ca.pem",
ssl_check_hostname=True,
session_timeout_ms=30000,
additional_config={
"request.timeout.ms": 30000,
"retry.backoff.ms": 1000
}
)
# SSL-Only Configuration
ssl_kafka = KafkaResource(
bootstrap_servers="ssl-kafka:9092",
security_protocol=SecurityProtocol.SSL,
ssl_ca_location="/etc/ssl/certs/ca.pem",
ssl_certificate_location="/etc/ssl/certs/client.pem",
ssl_key_location="/etc/ssl/private/client-key.pem",
ssl_key_password="client-key-password"
)
# Validate security configuration
production_kafka.validate_security_config()
# Get producer configuration with same security settings
producer_config = production_kafka.get_producer_config()
All Three Formats with Security and DLQ
from dagster import Definitions
from dagster_kafka import KafkaResource, SecurityProtocol, SaslMechanism, KafkaIOManager, avro_kafka_io_manager, DLQStrategy
from dagster_kafka.protobuf_io_manager import create_protobuf_kafka_io_manager
# Secure Kafka resource for all formats
secure_kafka = KafkaResource(
bootstrap_servers="secure-kafka:9092",
security_protocol=SecurityProtocol.SASL_SSL,
sasl_mechanism=SaslMechanism.SCRAM_SHA_256,
sasl_username="enterprise-user",
sasl_password="enterprise-password",
ssl_ca_location="/etc/ssl/kafka-ca.pem"
)
defs = Definitions(
assets=[json_events, avro_events, protobuf_events, unified_processing],
resources={
"json_io_manager": KafkaIOManager(
kafka_resource=secure_kafka,
consumer_group_id="secure-json-consumer",
enable_dlq=True,
dlq_strategy=DLQStrategy.CIRCUIT_BREAKER
),
"avro_io_manager": avro_kafka_io_manager.configured({
"kafka_resource": secure_kafka,
"schema_registry_url": "https://secure-schema-registry:8081",
"enable_schema_validation": True,
"enable_dlq": True,
"dlq_strategy": "retry_then_dlq"
}),
"protobuf_io_manager": create_protobuf_kafka_io_manager(
kafka_resource=secure_kafka,
schema_registry_url="https://secure-schema-registry:8081",
enable_dlq=True,
dlq_strategy=DLQStrategy.RETRY_THEN_DLQ
)
}
)
Schema Examples
Avro Schema
{
"type": "record",
"name": "User",
"namespace": "com.example.users",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "email", "type": "string"},
{"name": "created_at", "type": "long"},
{"name": "is_active", "type": "boolean"}
]
}
Protobuf Schema
syntax = "proto3";
package examples;
message User {
int32 id = 1;
string name = 2;
string email = 3;
int32 age = 4;
bool is_active = 5;
repeated string tags = 6;
Address address = 7;
int64 created_at = 8;
int64 updated_at = 9;
}
message Address {
string street = 1;
string city = 2;
string state = 3;
string postal_code = 4;
string country = 5;
}
enum EventType {
USER_CREATED = 0;
USER_UPDATED = 1;
USER_DELETED = 2;
USER_LOGIN = 3;
USER_LOGOUT = 4;
}
message UserEvent {
EventType event_type = 1;
User user = 2;
int64 timestamp = 3;
string source_system = 4;
map<string, string> metadata = 5;
}
Schema Evolution Management
Compatibility Levels
Support for all major compatibility levels across JSON, Avro, and Protobuf:
- BACKWARD: New schema can read old data
- FORWARD: Old schema can read new data
- FULL: Both backward and forward compatible
- BACKWARD_TRANSITIVE: Compatible with all previous versions
- FORWARD_TRANSITIVE: Compatible with all future versions
- FULL_TRANSITIVE: Both backward and forward transitive
- NONE: No compatibility checking
Breaking Change Detection
from dagster_kafka import SchemaEvolutionValidator
validator = SchemaEvolutionValidator(schema_registry_client)
result = validator.validate_schema_compatibility(
"user-events-value",
new_schema,
CompatibilityLevel.BACKWARD
)
if not result["compatible"]:
print(f"Breaking changes detected: {result['reason']}")
Production Monitoring and Alerting
Real-time Monitoring
from dagster_kafka import SchemaEvolutionMonitor, slack_alert_handler
monitor = SchemaEvolutionMonitor()
monitor.add_alert_callback(slack_alert_handler("https://hooks.slack.com/your-webhook"))
monitor.record_validation_attempt(
subject="user-events",
success=True,
duration=2.5,
breaking_changes_count=0
)
Performance Optimization
from dagster_kafka import PerformanceOptimizer, CacheStrategy, BatchStrategy
optimizer = PerformanceOptimizer(
cache_config={
"max_size": 10000,
"strategy": CacheStrategy.LRU,
"ttl_seconds": 300
},
batch_config={
"strategy": BatchStrategy.ADAPTIVE,
"max_batch_size": 1000
},
pool_config={
"max_connections": 20
}
)
Configuration Options
KafkaResource with Security
KafkaResource(
# Core Configuration
bootstrap_servers="localhost:9092", # Required: Kafka cluster endpoints
# Security Configuration
security_protocol=SecurityProtocol.SASL_SSL, # Security protocol
sasl_mechanism=SaslMechanism.SCRAM_SHA_256, # SASL mechanism
sasl_username="username", # SASL username
sasl_password="password", # SASL password
# SSL Configuration
ssl_ca_location="/path/to/ca.pem", # CA certificate
ssl_certificate_location="/path/to/cert.pem", # Client certificate
ssl_key_location="/path/to/key.pem", # Private key
ssl_key_password="key-password", # Key password
ssl_check_hostname=True, # Hostname verification
# Advanced Configuration
session_timeout_ms=30000, # Session timeout
auto_offset_reset="earliest", # Offset reset policy
additional_config={"request.timeout.ms": 30000} # Additional config
)
Advanced AvroKafkaIOManager Configuration with DLQ
avro_kafka_io_manager.configured({
"kafka_resource": secure_kafka_resource, # Secure Kafka resource
"schema_registry_url": "https://registry:8081", # Secure schema registry
"enable_schema_validation": True,
"compatibility_level": "BACKWARD",
"enable_caching": True,
"cache_ttl": 300,
"max_retries": 3,
"retry_backoff": 1.0,
# DLQ Configuration
"enable_dlq": True,
"dlq_strategy": "circuit_breaker",
"dlq_max_retries": 3,
"dlq_circuit_breaker_failure_threshold": 5
})
Protobuf Configuration Options with DLQ
# Simple Protobuf usage with security and DLQ
simple_manager = create_protobuf_kafka_io_manager(
kafka_resource=secure_kafka_resource,
consumer_group_id="my-protobuf-consumer",
enable_dlq=True,
dlq_strategy=DLQStrategy.RETRY_THEN_DLQ,
dlq_max_retries=3
)
# Advanced Protobuf with Schema Registry, security, and DLQ
advanced_manager = ProtobufKafkaIOManager(
kafka_resource=secure_kafka_resource,
schema_registry_url="https://secure-registry:8081",
enable_schema_validation=True,
compatibility_level="BACKWARD",
consumer_group_id="enterprise-protobuf",
enable_dlq=True,
dlq_strategy=DLQStrategy.CIRCUIT_BREAKER,
dlq_circuit_breaker_failure_threshold=5
)
Examples Directory Structure
examples/
โโโ json_examples/ # JSON message examples
โ โโโ simple_json_test.py
โ โโโ README.md
โโโ avro_examples/ # Avro schema examples
โ โโโ simple_avro_test.py
โ โโโ production_schema_migration.py
โ โโโ schemas/
โ โโโ README.md
โโโ protobuf_examples/ # Protobuf examples
โ โโโ simple_protobuf_example.py
โ โโโ advanced_protobuf_example.py
โ โโโ schemas/
โ โ โโโ user.proto
โ โ โโโ product.proto
โ โโโ README.md
โโโ dlq_examples/ # Complete DLQ tooling suite
โ โโโ dlq_inspector.py # Analyze failed messages
โ โโโ dlq_replayer.py # Replay messages with safety controls
โ โโโ dlq_monitor.py # Core monitoring and metrics
โ โโโ dlq_alerts.py # Configurable alerting system
โ โโโ dlq_dashboard.py # Operations dashboard
โ โโโ create_test_data.py # Generate test data
โ โโโ create_stress_test.py # Stress testing utilities
โ โโโ README.md
โโโ security_examples/ # Enterprise security examples
โ โโโ production_security_example.py
โ โโโ README.md
โโโ performance_examples/ # Performance optimization
โโโ production_examples/ # Enterprise deployment patterns
โโโ docker-compose.yml # Local testing setup
Security, Serialization Format, and DLQ Comparison
| Feature | JSON | Avro | Protobuf | Security | DLQ |
|---|---|---|---|---|---|
| Schema Evolution | Basic | Advanced | Advanced | N/A | Error Routing |
| Performance | Good | Better | Best | Overhead | Minimal |
| Schema Registry | No | Yes | Yes | HTTPS | Topic-based |
| Backward Compatibility | Manual | Automatic | Automatic | Maintained | Preserved |
| Binary Format | No | Yes | Yes | Encrypted | JSON |
| Human Readable | Yes | No | No | No | Yes |
| Cross-Language | Yes | Yes | Yes | Yes | Yes |
| Authentication | Basic | SASL/SSL | SASL/SSL | Full | Secured |
| Error Handling | DLQ | DLQ | DLQ | Monitored | Core Feature |
| Use Case | APIs, Logging | Analytics, ETL | High-perf, gRPC | All Production | Error Recovery |
Development & Testing
Production Installation
# For production use
pip install dagster-kafka
Development Setup
# For development and contributions
git clone https://github.com/kingsley-123/dagster-kafka-integration.git
cd dagster-kafka-integration
# Install dependencies (includes Protobuf, security, and DLQ support)
pip install -r requirements.txt
# Install in development mode
pip install -e .
Comprehensive Testing
# Start Kafka and Schema Registry (for development)
docker-compose up -d
# Run all 133 tests across all formats, security configurations, and DLQ functionality
python -m pytest tests/ -v
# Test specific modules
python -m pytest tests/test_avro_io_manager.py -v # Avro tests
python -m pytest tests/test_protobuf_io_manager.py -v # Protobuf tests
python -m pytest tests/test_dlq.py -v # DLQ tests
python -m pytest tests/test_security.py -v # Security tests
python -m pytest tests/test_schema_evolution.py -v # Schema evolution
python -m pytest tests/test_monitoring.py -v # Monitoring
python -m pytest tests/test_performance.py -v # Performance
CLI Tools Usage
# DLQ Inspector - Analyze failed messages
dlq-inspector --help
dlq-inspector --topic user-events --max-messages 20
# DLQ Replayer - Replay messages with safety controls
dlq-replayer --help
dlq-replayer --source-topic orders_dlq --target-topic orders --dry-run
# DLQ Monitor - Core monitoring and metrics
dlq-monitor --help
dlq-monitor --topics user-events_dlq,orders_dlq --output-format json
# DLQ Alerts - Configurable alerting system
dlq-alerts --help
dlq-alerts --topic critical-events_dlq --max-messages 500
# DLQ Dashboard - Operations dashboard
dlq-dashboard --help
dlq-dashboard --topics user-events_dlq,orders_dlq
Running Examples (Development)
# JSON examples
python examples/json_examples/simple_json_test.py
# Avro examples
python examples/avro_examples/simple_avro_test.py
python examples/avro_examples/production_schema_migration.py
# Protobuf examples
python examples/protobuf_examples/simple_protobuf_example.py
python examples/protobuf_examples/advanced_protobuf_example.py
# Security examples
python examples/security_examples/production_security_example.py
# Performance examples
python examples/performance_examples/high_throughput_pipeline.py
# Note: Source files available in examples/ directory for development reference
Schema Registry Support
Supports multiple Schema Registry providers across Avro and Protobuf with security:
- Confluent Schema Registry (most common) - HTTP/HTTPS
- AWS Glue Schema Registry - IAM authentication
- Azure Schema Registry - Azure AD authentication
- Custom implementations - Flexible authentication
Error Handling and Recovery
The integration includes comprehensive error handling for all serialization formats, security configurations, and DLQ functionality:
- Connection failures: Graceful timeouts and retries with security context
- Authentication failures: Clear error messages for SASL/SSL issues
- Schema errors: Clear error messages for missing/invalid schemas
- Deserialization errors: Automatic DLQ routing with retry logic
- Schema evolution failures: Multiple recovery strategies with DLQ fallback
- Performance degradation: Automatic optimization recommendations
- Security validation: Comprehensive configuration validation
- Circuit breaker protection: Automatic failure detection and recovery
Production Features
Error Recovery Strategies
- Fail Fast: Immediate failure on errors
- Fallback Schema: Automatic fallback to previous schema versions
- Skip Validation: Continue processing with validation disabled
- Graceful Degradation: Accept minor breaking changes
- Retry with Backoff: Exponential backoff retry logic
- Security Retry: Automatic credential refresh and retry
- Dead Letter Queue: Automatic routing of failed messages
Performance Optimization
- High-Performance Caching: LRU, TTL, and write-through strategies
- Adaptive Batching: Dynamic batch size optimization
- Connection Pooling: Efficient resource management with security context
- Metrics Collection: Comprehensive performance monitoring
- DLQ Optimization: Minimal overhead error handling
Monitoring and Alerting
- Real-time Metrics: Validation attempts, cache hit rates, throughput
- Security Metrics: Authentication success/failure rates
- DLQ Metrics: Error rates, retry counts, circuit breaker states
- Alert Integration: Slack, email, and custom webhooks
- Threshold Management: Configurable alert thresholds
- Historical Analysis: Performance trends and optimization insights
Security Features
- Authentication: SASL mechanisms (PLAIN, SCRAM, GSSAPI, OAUTHBEARER)
- Encryption: SSL/TLS with certificate management
- Authorization: Kafka ACL support through security protocols
- Validation: Comprehensive security configuration validation
- Monitoring: Security-aware logging and metrics
- DLQ Security: Secure DLQ topic access and encryption
Roadmap
Completed Features (v1.0.0)
- JSON Support - Complete native integration
- Avro Support - Full Schema Registry + evolution validation
- Protobuf Support - Complete Protocol Buffers integration
- Enterprise Security - Complete SASL/SSL authentication and encryption
- Schema Evolution - All compatibility levels across formats
- Production Monitoring - Real-time alerting and metrics
- High-Performance Optimization - Caching, batching, pooling
- Dead Letter Queues - Advanced error handling with circuit breaker
- Complete DLQ Tooling Suite - Inspector, Replayer, Monitoring, Alerting
- Comprehensive Testing - 133 tests across all features
- PyPI Distribution - โ LIVE: Official package published and validated
Upcoming Features
- JSON Schema Support - 4th serialization format
- Confluent Connect - Native connector integration
- Kafka Streams - Stream processing integration
Future Enhancements
- Additional Formats - MessagePack, Apache Arrow
- Advanced Consumers - Custom partition assignment
- Cloud Integrations - AWS MSK, Confluent Cloud native support
- Official Dagster Integration - Potential core inclusion
Why Choose This Integration
Complete Solution
- Only integration supporting all 3 major formats (JSON, Avro, Protobuf)
- Enterprise-grade security with SASL/SSL support
- Production-ready with comprehensive monitoring
- Advanced error handling with Dead Letter Queue support
- Complete DLQ Tooling Suite for enterprise operations
Developer Experience
- Familiar Dagster patterns - feels native to the platform
- Comprehensive examples for all use cases including security and DLQ
- Extensive documentation and testing
- Production-ready CLI tooling for DLQ management
Production Ready
- 133 comprehensive tests covering all scenarios including security and DLQ
- Real-world deployment patterns and examples
- Performance optimization tools and monitoring
- Enterprise security for production Kafka clusters
- Bulletproof error handling with circuit breaker patterns
- Complete operational tooling for DLQ management
Community Driven
- Active development based on user feedback
- Open source with transparent roadmap
- Enterprise support options available
Contributing
Contributions are welcome! This project aims to be the definitive Kafka integration for Dagster.
Ways to contribute:
- Report issues - Found a bug? Let us know!
- Feature requests - What would make this more useful?
- Documentation - Help improve examples and guides
- Code contributions - PRs welcome for any improvements
- Security testing - Help test security configurations
- DLQ testing - Help test error handling scenarios
License
Apache 2.0 - see LICENSE file for details.
Community & Support
- GitHub Issues: Report bugs and request features
- GitHub Discussions: Share use cases and get help
- Star the repo: If this helped your project!
Acknowledgments
- Dagster Community: For the initial feature request and continued feedback
- Contributors: Thanks to all who provided feedback, testing, and code contributions
- Enterprise Users: Built in response to real production deployment needs
- Security Community: Special thanks for security testing and validation
- Slack Community: Special thanks for validation and feature suggestions
The Complete Enterprise Solution
The first and most comprehensive Kafka integration for Dagster - supporting all three major serialization formats (JSON, Avro, Protobuf) with enterprise-grade production features, complete security, advanced Dead Letter Queue error handling, and complete operational tooling suite.
Version 1.0.0 - Dead Letter Queue Release with Complete Tooling Suite - Built by Kingsley Okonkwo
Solving real data engineering problems with comprehensive open source solutions.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file dagster_kafka-1.1.0.tar.gz.
File metadata
- Download URL: dagster_kafka-1.1.0.tar.gz
- Upload date:
- Size: 78.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.10
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
caa0676f4f625f972bf47725c646ade3506f76713e03e24fe3d0feeed9112920
|
|
| MD5 |
25149dc3cebb17dbbaebd119f79a93e4
|
|
| BLAKE2b-256 |
82d9087eaaaef1ab78353fc863f494c4fe60312b98a520d3bddd5d2370d1510c
|
File details
Details for the file dagster_kafka-1.1.0-py3-none-any.whl.
File metadata
- Download URL: dagster_kafka-1.1.0-py3-none-any.whl
- Upload date:
- Size: 64.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.10
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
86af13b8a40b4420c951239221efe8695cd8462d7337f16f73e9d42ebece3ed2
|
|
| MD5 |
b4e252145ce25b0bf397b27a4c97cac8
|
|
| BLAKE2b-256 |
9d02a636f1458c76906e9d4cd2b47be5c48970c9a421a16dcf1b56e15f84c66f
|