Skip to main content

Complete Kafka integration for Dagster with enterprise DLQ tooling

Project description

Dagster Kafka Integration

The first and most comprehensive Kafka integration for Dagster with complete enterprise-grade features supporting all three major serialization formats and production security.

Complete Enterprise Solution

Version 0.9.0 - Dead Letter Queue Release with enterprise-grade error handling:

  • JSON Support: Native JSON message consumption from Kafka topics
  • Avro Support: Full Avro message support with Schema Registry integration
  • Protobuf Support: Complete Protocol Buffers integration with schema management
  • Dead Letter Queue (DLQ): Enterprise-grade error handling with circuit breaker patterns
  • Enterprise Security: Complete SASL/SSL authentication and encryption support
  • Schema Evolution: Comprehensive validation with breaking change detection across all formats
  • Production Monitoring: Real-time alerting with Slack/Email integration
  • High Performance: Advanced caching, batching, and connection pooling
  • Error Recovery: Multiple recovery strategies for production resilience
  • Enterprise Ready: Complete observability and production-grade error handling

133 comprehensive tests passing - Full test coverage across all serialization formats, security configurations, enterprise features, and DLQ functionality.

Three Serialization Formats + Enterprise Security + DLQ

JSON Support

Perfect for APIs and simple data structures with DLQ error handling.

Avro Support

Schema Registry integration with evolution validation and DLQ support.

Protobuf Support

High-performance binary serialization with comprehensive tooling and DLQ handling.

Dead Letter Queue (DLQ) - New in v0.9.0

Enterprise-grade error handling with automatic routing of failed messages to dead letter topics for debugging and reprocessing.

Enterprise Security

Complete SASL/SSL authentication and encryption for production deployments.

Installation

pip install git+https://github.com/kingsley-123/dagster-kafka-integration.git

Quick Start

JSON Usage with DLQ

from dagster import asset, Definitions
from dagster_kafka import KafkaResource, KafkaIOManager, DLQStrategy

@asset
def api_events():
    """Consume JSON messages from Kafka topic with DLQ support."""
    pass

defs = Definitions(
    assets=[api_events],
    resources={
        "kafka": KafkaResource(bootstrap_servers="localhost:9092"),
        "io_manager": KafkaIOManager(
            kafka_resource=KafkaResource(bootstrap_servers="localhost:9092"),
            consumer_group_id="my-dagster-pipeline",
            enable_dlq=True,
            dlq_strategy=DLQStrategy.RETRY_THEN_DLQ,
            dlq_max_retries=3
        )
    }
)

Secure Production Usage with DLQ

from dagster import asset, Definitions
from dagster_kafka import KafkaResource, SecurityProtocol, SaslMechanism, KafkaIOManager, DLQStrategy

# Production-grade secure configuration with DLQ
secure_kafka = KafkaResource(
    bootstrap_servers="prod-kafka-01:9092,prod-kafka-02:9092",
    security_protocol=SecurityProtocol.SASL_SSL,
    sasl_mechanism=SaslMechanism.SCRAM_SHA_256,
    sasl_username="production-user",
    sasl_password="secure-password",
    ssl_ca_location="/etc/ssl/certs/kafka-ca.pem",
    ssl_check_hostname=True
)

@asset
def secure_events():
    """Consume messages from secure production Kafka cluster with DLQ."""
    pass

defs = Definitions(
    assets=[secure_events],
    resources={
        "io_manager": KafkaIOManager(
            kafka_resource=secure_kafka,
            consumer_group_id="secure-production-pipeline",
            enable_dlq=True,
            dlq_strategy=DLQStrategy.CIRCUIT_BREAKER,
            dlq_circuit_breaker_failure_threshold=5
        )
    }
)

Avro Usage with Schema Registry and DLQ

from dagster import asset, Config
from dagster_kafka import KafkaResource, avro_kafka_io_manager, DLQStrategy

class UserEventsConfig(Config):
    schema_file: str = "schemas/user.avsc"
    max_messages: int = 100

@asset(io_manager_key="avro_kafka_io_manager")
def user_data(context, config: UserEventsConfig):
    """Load user events using Avro schema with validation and DLQ."""
    io_manager = context.resources.avro_kafka_io_manager
    return io_manager.load_input(
        context,
        topic="user-events",
        schema_file=config.schema_file,
        max_messages=config.max_messages,
        validate_evolution=True
    )

Protobuf Usage with DLQ

from dagster import asset, Definitions
from dagster_kafka import KafkaResource, DLQStrategy
from dagster_kafka.protobuf_io_manager import create_protobuf_kafka_io_manager

@asset(io_manager_key="protobuf_kafka_io_manager")
def user_events():
    """Consume Protobuf messages from Kafka topic with DLQ support."""
    pass

@asset
def processed_data(user_events):
    """Process Protobuf user events."""
    print(f"Processing {len(user_events)} Protobuf events")
    return {"processed_count": len(user_events)}

defs = Definitions(
    assets=[user_events, processed_data],
    resources={
        "protobuf_kafka_io_manager": create_protobuf_kafka_io_manager(
            kafka_resource=KafkaResource(bootstrap_servers="localhost:9092"),
            schema_registry_url="http://localhost:8081",  # Optional
            consumer_group_id="dagster-protobuf-pipeline",
            enable_dlq=True,
            dlq_strategy=DLQStrategy.RETRY_THEN_DLQ,
            dlq_max_retries=3
        )
    }
)

Dead Letter Queue (DLQ) Features (v0.9.0)

DLQ Strategies

  • DISABLED: No DLQ processing
  • IMMEDIATE: Send to DLQ immediately on failure
  • RETRY_THEN_DLQ: Retry N times, then send to DLQ
  • CIRCUIT_BREAKER: Circuit breaker pattern with DLQ fallback

Error Classification

  • DESERIALIZATION_ERROR: Failed to deserialize message
  • SCHEMA_ERROR: Schema validation failed
  • PROCESSING_ERROR: Business logic error
  • CONNECTION_ERROR: Kafka connection issues
  • TIMEOUT_ERROR: Message processing timeout
  • UNKNOWN_ERROR: Unclassified errors

Circuit Breaker Pattern

from dagster_kafka import DLQConfiguration, DLQStrategy

dlq_config = DLQConfiguration(
    strategy=DLQStrategy.CIRCUIT_BREAKER,
    circuit_breaker_failure_threshold=5,      # Open after 5 failures
    circuit_breaker_recovery_timeout_ms=30000, # Test recovery after 30s
    circuit_breaker_success_threshold=2        # Close after 2 successes
)

DLQ Message Enrichment

DLQ messages include rich metadata for debugging:

{
  "original_message": {
    "topic": "user-events",
    "partition": 0,
    "offset": 12345,
    "key": "user-123",
    "timestamp": 1640995200000
  },
  "error_info": {
    "type": "deserialization_error",
    "message": "JSON decode error",
    "traceback": "Full Python traceback...",
    "failure_timestamp": "2025-01-15T10:30:00Z",
    "retry_count": 3
  },
  "processing_metadata": {
    "consumer_group_id": "my-pipeline",
    "dagster_run_id": "12345-67890",
    "dagster_asset_key": "user_events"
  }
}

DLQ Production Tooling Suite (v0.9.0)

Complete enterprise-grade tooling for Dead Letter Queue management and monitoring.

DLQ Inspector (dlq_inspector.py)

Analyze failed messages in DLQ topics with comprehensive error pattern analysis.

# Analyze DLQ messages for error patterns
python dlq_inspector.py --topic user-events --max-messages 20

# Inspect specific DLQ topic
python dlq_inspector.py --dlq-topic payments_dlq --max-messages 50

DLQ Message Replayer (dlq_replayer.py)

Replay failed messages back to original topics with filtering and safety controls.

# Replay messages with filtering
python dlq_replayer.py --source-topic user-events_dlq --target-topic user-events --error-types "timeout_error" --max-messages 100 --confirm

# Replay with rate limiting for production safety  
python dlq_replayer.py --source-topic orders_dlq --target-topic orders --rate-limit 10 --dry-run

DLQ Monitoring Suite

Production monitoring and alerting for DLQ health across multiple topics.

# Monitor DLQ health across topics
python dlq_monitor.py --topics user-events_dlq,orders_dlq,payments_dlq --output-format json

# Set up automated alerting
python dlq_alerts.py --topic critical-events_dlq --max-messages 500 --webhook-url https://hooks.slack.com/...

# Operations dashboard
python dlq_dashboard.py --topics user-events_dlq,orders_dlq --warning-threshold 100 --critical-threshold 1000

Enterprise Security Features (v0.8.0)

Security Protocols Supported

  • PLAINTEXT: For local development and testing
  • SSL: Certificate-based encryption
  • SASL_PLAINTEXT: Username/password authentication
  • SASL_SSL: Combined authentication and encryption (recommended for production)

SASL Authentication Mechanisms

  • PLAIN: Simple username/password authentication
  • SCRAM-SHA-256: Secure challenge-response authentication
  • SCRAM-SHA-512: Enhanced secure authentication
  • GSSAPI: Kerberos authentication for enterprise environments
  • OAUTHBEARER: OAuth-based authentication

Production Security Configuration

from dagster_kafka import KafkaResource, SecurityProtocol, SaslMechanism

# SASL/SSL Production Configuration (Most Secure)
production_kafka = KafkaResource(
    bootstrap_servers="prod-kafka:9092",
    security_protocol=SecurityProtocol.SASL_SSL,
    sasl_mechanism=SaslMechanism.SCRAM_SHA_256,
    sasl_username="dagster-prod-user", 
    sasl_password="secure-production-password",
    ssl_ca_location="/etc/ssl/certs/kafka-ca.pem",
    ssl_check_hostname=True,
    session_timeout_ms=30000,
    additional_config={
        "request.timeout.ms": 30000,
        "retry.backoff.ms": 1000
    }
)

# SSL-Only Configuration  
ssl_kafka = KafkaResource(
    bootstrap_servers="ssl-kafka:9092",
    security_protocol=SecurityProtocol.SSL,
    ssl_ca_location="/etc/ssl/certs/ca.pem",
    ssl_certificate_location="/etc/ssl/certs/client.pem",
    ssl_key_location="/etc/ssl/private/client-key.pem",
    ssl_key_password="client-key-password"
)

# Validate security configuration
production_kafka.validate_security_config()

# Get producer configuration with same security settings
producer_config = production_kafka.get_producer_config()

All Three Formats with Security and DLQ

from dagster import Definitions
from dagster_kafka import KafkaResource, SecurityProtocol, SaslMechanism, KafkaIOManager, avro_kafka_io_manager, DLQStrategy
from dagster_kafka.protobuf_io_manager import create_protobuf_kafka_io_manager

# Secure Kafka resource for all formats
secure_kafka = KafkaResource(
    bootstrap_servers="secure-kafka:9092",
    security_protocol=SecurityProtocol.SASL_SSL,
    sasl_mechanism=SaslMechanism.SCRAM_SHA_256,
    sasl_username="enterprise-user",
    sasl_password="enterprise-password",
    ssl_ca_location="/etc/ssl/kafka-ca.pem"
)

defs = Definitions(
    assets=[json_events, avro_events, protobuf_events, unified_processing],
    resources={
        "json_io_manager": KafkaIOManager(
            kafka_resource=secure_kafka,
            consumer_group_id="secure-json-consumer",
            enable_dlq=True,
            dlq_strategy=DLQStrategy.CIRCUIT_BREAKER
        ),
        "avro_io_manager": avro_kafka_io_manager.configured({
            "kafka_resource": secure_kafka,
            "schema_registry_url": "https://secure-schema-registry:8081",
            "enable_schema_validation": True,
            "enable_dlq": True,
            "dlq_strategy": "retry_then_dlq"
        }),
        "protobuf_io_manager": create_protobuf_kafka_io_manager(
            kafka_resource=secure_kafka,
            schema_registry_url="https://secure-schema-registry:8081",
            enable_dlq=True,
            dlq_strategy=DLQStrategy.RETRY_THEN_DLQ
        )
    }
)

Schema Examples

Avro Schema

{
  "type": "record",
  "name": "User",
  "namespace": "com.example.users",
  "fields": [
    {"name": "id", "type": "int"},
    {"name": "name", "type": "string"},
    {"name": "email", "type": "string"},
    {"name": "created_at", "type": "long"},
    {"name": "is_active", "type": "boolean"}
  ]
}

Protobuf Schema

syntax = "proto3";

package examples;

message User {
  int32 id = 1;
  string name = 2;
  string email = 3;
  int32 age = 4;
  bool is_active = 5;
  repeated string tags = 6;
  Address address = 7;
  int64 created_at = 8;
  int64 updated_at = 9;
}

message Address {
  string street = 1;
  string city = 2;
  string state = 3;
  string postal_code = 4;
  string country = 5;
}

enum EventType {
  USER_CREATED = 0;
  USER_UPDATED = 1;
  USER_DELETED = 2;
  USER_LOGIN = 3;
  USER_LOGOUT = 4;
}

message UserEvent {
  EventType event_type = 1;
  User user = 2;
  int64 timestamp = 3;
  string source_system = 4;
  map<string, string> metadata = 5;
}

Schema Evolution Management

Compatibility Levels

Support for all major compatibility levels across JSON, Avro, and Protobuf:

  • BACKWARD: New schema can read old data
  • FORWARD: Old schema can read new data
  • FULL: Both backward and forward compatible
  • BACKWARD_TRANSITIVE: Compatible with all previous versions
  • FORWARD_TRANSITIVE: Compatible with all future versions
  • FULL_TRANSITIVE: Both backward and forward transitive
  • NONE: No compatibility checking

Breaking Change Detection

from dagster_kafka import SchemaEvolutionValidator

validator = SchemaEvolutionValidator(schema_registry_client)

result = validator.validate_schema_compatibility(
    "user-events-value",
    new_schema,
    CompatibilityLevel.BACKWARD
)

if not result["compatible"]:
    print(f"Breaking changes detected: {result['reason']}")

Production Monitoring and Alerting

Real-time Monitoring

from dagster_kafka import SchemaEvolutionMonitor, slack_alert_handler

monitor = SchemaEvolutionMonitor()
monitor.add_alert_callback(slack_alert_handler("https://hooks.slack.com/your-webhook"))

monitor.record_validation_attempt(
    subject="user-events",
    success=True,
    duration=2.5,
    breaking_changes_count=0
)

Performance Optimization

from dagster_kafka import PerformanceOptimizer, CacheStrategy, BatchStrategy

optimizer = PerformanceOptimizer(
    cache_config={
        "max_size": 10000,
        "strategy": CacheStrategy.LRU,
        "ttl_seconds": 300
    },
    batch_config={
        "strategy": BatchStrategy.ADAPTIVE,
        "max_batch_size": 1000
    },
    pool_config={
        "max_connections": 20
    }
)

Configuration Options

KafkaResource with Security

KafkaResource(
    # Core Configuration
    bootstrap_servers="localhost:9092",  # Required: Kafka cluster endpoints
    
    # Security Configuration
    security_protocol=SecurityProtocol.SASL_SSL,  # Security protocol
    sasl_mechanism=SaslMechanism.SCRAM_SHA_256,   # SASL mechanism
    sasl_username="username",                      # SASL username
    sasl_password="password",                      # SASL password
    
    # SSL Configuration
    ssl_ca_location="/path/to/ca.pem",            # CA certificate
    ssl_certificate_location="/path/to/cert.pem", # Client certificate
    ssl_key_location="/path/to/key.pem",          # Private key
    ssl_key_password="key-password",               # Key password
    ssl_check_hostname=True,                       # Hostname verification
    
    # Advanced Configuration
    session_timeout_ms=30000,                     # Session timeout
    auto_offset_reset="earliest",                 # Offset reset policy
    additional_config={"request.timeout.ms": 30000}  # Additional config
)

Advanced AvroKafkaIOManager Configuration with DLQ

avro_kafka_io_manager.configured({
    "kafka_resource": secure_kafka_resource,      # Secure Kafka resource
    "schema_registry_url": "https://registry:8081", # Secure schema registry
    "enable_schema_validation": True,
    "compatibility_level": "BACKWARD",
    "enable_caching": True,
    "cache_ttl": 300,
    "max_retries": 3,
    "retry_backoff": 1.0,
    # DLQ Configuration
    "enable_dlq": True,
    "dlq_strategy": "circuit_breaker",
    "dlq_max_retries": 3,
    "dlq_circuit_breaker_failure_threshold": 5
})

Protobuf Configuration Options with DLQ

# Simple Protobuf usage with security and DLQ
simple_manager = create_protobuf_kafka_io_manager(
    kafka_resource=secure_kafka_resource,
    consumer_group_id="my-protobuf-consumer",
    enable_dlq=True,
    dlq_strategy=DLQStrategy.RETRY_THEN_DLQ,
    dlq_max_retries=3
)

# Advanced Protobuf with Schema Registry, security, and DLQ
advanced_manager = ProtobufKafkaIOManager(
    kafka_resource=secure_kafka_resource,
    schema_registry_url="https://secure-registry:8081",
    enable_schema_validation=True,
    compatibility_level="BACKWARD",
    consumer_group_id="enterprise-protobuf",
    enable_dlq=True,
    dlq_strategy=DLQStrategy.CIRCUIT_BREAKER,
    dlq_circuit_breaker_failure_threshold=5
)

Examples Directory Structure

examples/
├── json_examples/              # JSON message examples
│   ├── simple_json_test.py
│   └── README.md
├── avro_examples/              # Avro schema examples
│   ├── simple_avro_test.py
│   ├── production_schema_migration.py
│   ├── schemas/
│   └── README.md
├── protobuf_examples/          # Protobuf examples
│   ├── simple_protobuf_example.py
│   ├── advanced_protobuf_example.py
│   ├── schemas/
│   │   ├── user.proto
│   │   └── product.proto
│   └── README.md
├── dlq_examples/               # Complete DLQ tooling suite  
│   ├── dlq_inspector.py        # Analyze failed messages
│   ├── dlq_replayer.py         # Replay messages with safety controls
│   ├── dlq_monitor.py          # Core monitoring and metrics
│   ├── dlq_alerts.py           # Configurable alerting system
│   ├── dlq_dashboard.py        # Operations dashboard
│   ├── create_test_data.py     # Generate test data
│   ├── create_stress_test.py   # Stress testing utilities
│   └── README.md
├── security_examples/          # Enterprise security examples
│   ├── production_security_example.py
│   └── README.md
├── performance_examples/       # Performance optimization
├── production_examples/        # Enterprise deployment patterns
└── docker-compose.yml         # Local testing setup

Security, Serialization Format, and DLQ Comparison

Feature JSON Avro Protobuf Security DLQ
Schema Evolution Basic Advanced Advanced N/A Error Routing
Performance Good Better Best Overhead Minimal
Schema Registry No Yes Yes HTTPS Topic-based
Backward Compatibility Manual Automatic Automatic Maintained Preserved
Binary Format No Yes Yes Encrypted JSON
Human Readable Yes No No No Yes
Cross-Language Yes Yes Yes Yes Yes
Authentication Basic SASL/SSL SASL/SSL Full Secured
Error Handling DLQ DLQ DLQ Monitored Core Feature
Use Case APIs, Logging Analytics, ETL High-perf, gRPC All Production Error Recovery

Development & Testing

Local Development Setup

git clone https://github.com/kingsley-123/dagster-kafka-integration.git
cd dagster-kafka-integration

# Install dependencies (includes Protobuf, security, and DLQ support)
pip install -r requirements.txt

# Install in development mode
pip install -e .

Comprehensive Testing

# Start Kafka and Schema Registry
docker-compose up -d

# Run all 133 tests across all formats, security configurations, and DLQ functionality
python -m pytest tests/ -v

# Test specific modules
python -m pytest tests/test_avro_io_manager.py -v      # Avro tests
python -m pytest tests/test_protobuf_io_manager.py -v  # Protobuf tests
python -m pytest tests/test_dlq.py -v                 # DLQ tests (NEW)
python -m pytest tests/test_security.py -v            # Security tests
python -m pytest tests/test_schema_evolution.py -v    # Schema evolution
python -m pytest tests/test_monitoring.py -v          # Monitoring
python -m pytest tests/test_performance.py -v         # Performance

Running Examples

# JSON examples
python examples/json_examples/simple_json_test.py

# Avro examples
python examples/avro_examples/simple_avro_test.py
python examples/avro_examples/production_schema_migration.py

# Protobuf examples
python examples/protobuf_examples/simple_protobuf_example.py
python examples/protobuf_examples/advanced_protobuf_example.py

# DLQ examples (Complete Production Tooling)
python examples/dlq_examples/dlq_inspector.py --topic user-events --max-messages 20
python examples/dlq_examples/dlq_replayer.py --source-topic orders_dlq --target-topic orders --dry-run
python examples/dlq_examples/dlq_monitor.py --topics user-events_dlq,orders_dlq --output-format json
python examples/dlq_examples/dlq_alerts.py --topic critical-events_dlq --max-messages 500
python examples/dlq_examples/dlq_dashboard.py --topics user-events_dlq,orders_dlq

# Security examples
python examples/security_examples/production_security_example.py

# Performance examples
python examples/performance_examples/high_throughput_pipeline.py

Schema Registry Support

Supports multiple Schema Registry providers across Avro and Protobuf with security:

  • Confluent Schema Registry (most common) - HTTP/HTTPS
  • AWS Glue Schema Registry - IAM authentication
  • Azure Schema Registry - Azure AD authentication
  • Custom implementations - Flexible authentication

Error Handling and Recovery

The integration includes comprehensive error handling for all serialization formats, security configurations, and DLQ functionality:

  • Connection failures: Graceful timeouts and retries with security context
  • Authentication failures: Clear error messages for SASL/SSL issues
  • Schema errors: Clear error messages for missing/invalid schemas
  • Deserialization errors: Automatic DLQ routing with retry logic
  • Schema evolution failures: Multiple recovery strategies with DLQ fallback
  • Performance degradation: Automatic optimization recommendations
  • Security validation: Comprehensive configuration validation
  • Circuit breaker protection: Automatic failure detection and recovery

Production Features

Error Recovery Strategies

  • Fail Fast: Immediate failure on errors
  • Fallback Schema: Automatic fallback to previous schema versions
  • Skip Validation: Continue processing with validation disabled
  • Graceful Degradation: Accept minor breaking changes
  • Retry with Backoff: Exponential backoff retry logic
  • Security Retry: Automatic credential refresh and retry
  • Dead Letter Queue: Automatic routing of failed messages (NEW)

Performance Optimization

  • High-Performance Caching: LRU, TTL, and write-through strategies
  • Adaptive Batching: Dynamic batch size optimization
  • Connection Pooling: Efficient resource management with security context
  • Metrics Collection: Comprehensive performance monitoring
  • DLQ Optimization: Minimal overhead error handling (NEW)

Monitoring and Alerting

  • Real-time Metrics: Validation attempts, cache hit rates, throughput
  • Security Metrics: Authentication success/failure rates
  • DLQ Metrics: Error rates, retry counts, circuit breaker states (NEW)
  • Alert Integration: Slack, email, and custom webhooks
  • Threshold Management: Configurable alert thresholds
  • Historical Analysis: Performance trends and optimization insights

Security Features

  • Authentication: SASL mechanisms (PLAIN, SCRAM, GSSAPI, OAUTHBEARER)
  • Encryption: SSL/TLS with certificate management
  • Authorization: Kafka ACL support through security protocols
  • Validation: Comprehensive security configuration validation
  • Monitoring: Security-aware logging and metrics
  • DLQ Security: Secure DLQ topic access and encryption (NEW)

Roadmap

Completed Features (v0.9.0)

  • JSON Support - Complete native integration
  • Avro Support - Full Schema Registry + evolution validation
  • Protobuf Support - Complete Protocol Buffers integration
  • Enterprise Security - Complete SASL/SSL authentication and encryption
  • Schema Evolution - All compatibility levels across formats
  • Production Monitoring - Real-time alerting and metrics
  • High-Performance Optimization - Caching, batching, pooling
  • Dead Letter Queues - Advanced error handling with circuit breaker (NEW)
  • Complete DLQ Tooling Suite - Inspector, Replayer, Monitoring, Alerting (NEW)
  • Comprehensive Testing - 133 tests across all features

Upcoming Features

  • PyPI Distribution - Official package release
  • JSON Schema Support - 4th serialization format
  • Confluent Connect - Native connector integration
  • Kafka Streams - Stream processing integration

Future Enhancements

  • Additional Formats - MessagePack, Apache Arrow
  • Advanced Consumers - Custom partition assignment
  • Cloud Integrations - AWS MSK, Confluent Cloud native support
  • Official Dagster Integration - Potential core inclusion

Why Choose This Integration

Complete Solution

  • Only integration supporting all 3 major formats (JSON, Avro, Protobuf)
  • Enterprise-grade security with SASL/SSL support
  • Production-ready with comprehensive monitoring
  • Advanced error handling with Dead Letter Queue support (NEW)
  • Complete DLQ Tooling Suite for enterprise operations (NEW)

Developer Experience

  • Familiar Dagster patterns - feels native to the platform
  • Comprehensive examples for all use cases including security and DLQ
  • Extensive documentation and testing
  • Production-ready tooling for DLQ management (NEW)

Production Ready

  • 133 comprehensive tests covering all scenarios including security and DLQ
  • Real-world deployment patterns and examples
  • Performance optimization tools and monitoring
  • Enterprise security for production Kafka clusters
  • Bulletproof error handling with circuit breaker patterns (NEW)
  • Complete operational tooling for DLQ management (NEW)

Community Driven

  • Active development based on user feedback
  • Open source with transparent roadmap
  • Enterprise support options available

Contributing

Contributions are welcome! This project aims to be the definitive Kafka integration for Dagster.

Ways to contribute:

  • Report issues - Found a bug? Let us know!
  • Feature requests - What would make this more useful?
  • Documentation - Help improve examples and guides
  • Code contributions - PRs welcome for any improvements
  • Security testing - Help test security configurations
  • DLQ testing - Help test error handling scenarios (NEW)

License

Apache 2.0 - see LICENSE file for details.

Community & Support

Acknowledgments

  • Dagster Community: For the initial feature request and continued feedback
  • Contributors: Thanks to all who provided feedback, testing, and code contributions
  • Enterprise Users: Built in response to real production deployment needs
  • Security Community: Special thanks for security testing and validation
  • Slack Community: Special thanks for validation and feature suggestions

The Complete Enterprise Solution

The first and most comprehensive Kafka integration for Dagster - supporting all three major serialization formats (JSON, Avro, Protobuf) with enterprise-grade production features, complete security, advanced Dead Letter Queue error handling, and complete operational tooling suite.

Version 0.9.0 - Dead Letter Queue Release with Complete Tooling Suite - Built by Kingsley Okonkwo

Solving real data engineering problems with comprehensive open source solutions.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dagster_kafka-1.0.0.tar.gz (75.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dagster_kafka-1.0.0-py3-none-any.whl (60.3 kB view details)

Uploaded Python 3

File details

Details for the file dagster_kafka-1.0.0.tar.gz.

File metadata

  • Download URL: dagster_kafka-1.0.0.tar.gz
  • Upload date:
  • Size: 75.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.10

File hashes

Hashes for dagster_kafka-1.0.0.tar.gz
Algorithm Hash digest
SHA256 cee29f11efcf63739d30f9385e3290f4cfc9a0a23e0d65419d2c4cd961247fb9
MD5 09d725020b6d2e1c28dbaa7535069fa7
BLAKE2b-256 ac2152875af3964c8968f6b7fdc84ccba42fdf9c5f57a81e445ceaf133f47b26

See more details on using hashes here.

File details

Details for the file dagster_kafka-1.0.0-py3-none-any.whl.

File metadata

  • Download URL: dagster_kafka-1.0.0-py3-none-any.whl
  • Upload date:
  • Size: 60.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.10

File hashes

Hashes for dagster_kafka-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 cc21f9d1f0d890ee151476d429ed6248bd5c4ad08d118ea084b9e859c8172706
MD5 45ec376c732ecaf9af53b7197d54a9d8
BLAKE2b-256 b5d9187a7b24c91e863300067927078f2c5af0babe6793c6adaa57369aeea533

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page