Skip to main content

Complete Kafka integration for Dagster with enterprise DLQ tooling and JSON Schema validation

Project description

Dagster Kafka Integration

The most comprehensively validated Kafka integration for Dagster with enterprise-grade features supporting all four major serialization formats and production security.

Enterprise Validation Completed

Version 1.2.0 - Most validated Kafka integration package ever created with JSON Schema support:

11-Phase Comprehensive Validation - Unprecedented testing methodology
Exceptional Performance - 1,199 messages/second peak throughput proven
Security Hardened - Complete credential validation + network security
Stress Tested - 100% success rate (305/305 operations over 8+ minutes)
Enterprise Ready - Complete DLQ tooling suite with 5 CLI tools
Zero Critical Issues - Across all validation phases
NEW: JSON Schema Validation - 4th serialization format with data quality enforcement

Complete Enterprise Solution

  • JSON Support: Native JSON message consumption from Kafka topics
  • JSON Schema Support: Data validation with schema evolution checking (NEW in v1.2.0)
  • Avro Support: Full Avro message support with Schema Registry integration
  • Protobuf Support: Complete Protocol Buffers integration with schema management
  • Dead Letter Queue (DLQ): Enterprise-grade error handling with circuit breaker patterns
  • Enterprise Security: Complete SASL/SSL authentication and encryption support
  • Schema Evolution: Comprehensive validation with breaking change detection across all formats
  • Production Monitoring: Real-time alerting with Slack/Email integration
  • High Performance: Advanced caching, batching, and connection pooling
  • Error Recovery: Multiple recovery strategies for production resilience

Installation

pip install dagster-kafka
JSON Schema Validation (NEW)
pythonfrom dagster import asset, Definitions
from dagster_kafka import KafkaResource, create_json_schema_kafka_io_manager, DLQStrategy

# Define schema for data quality enforcement
user_schema = {
    "type": "object",
    "properties": {
        "user_id": {"type": "string"},
        "event_type": {"type": "string", "enum": ["login", "logout", "click"]},
        "timestamp": {"type": "string", "format": "date-time"}
    },
    "required": ["user_id", "event_type", "timestamp"]
}

@asset(io_manager_key="json_schema_io_manager")
def validated_user_events():
    '''Consume user events with automatic JSON Schema validation.'''
    pass

defs = Definitions(
    assets=[validated_user_events],
    resources={
        "kafka": KafkaResource(bootstrap_servers="localhost:9092"),
        "json_schema_io_manager": create_json_schema_kafka_io_manager(
            kafka_resource=KafkaResource(bootstrap_servers="localhost:9092"),
            schema_dict=user_schema,
            enable_schema_validation=True,
            strict_validation=True,
            enable_dlq=True,
            dlq_strategy=DLQStrategy.CIRCUIT_BREAKER
        )
    }
)
Enterprise DLQ Tooling Suite
Complete operational tooling available immediately after installation:
bash# Analyze failed messages with comprehensive error pattern analysis
dlq-inspector --topic user-events --max-messages 20

# Replay messages with filtering and safety controls  
dlq-replayer --source-topic orders_dlq --target-topic orders --dry-run

# Monitor DLQ health across multiple topics
dlq-monitor --topics user-events_dlq,orders_dlq --output-format json

# Set up automated alerting
dlq-alerts --topic critical-events_dlq --max-messages 500

# Operations dashboard for DLQ health monitoring
dlq-dashboard --topics user-events_dlq,orders_dlq
Quick Start
pythonfrom dagster import asset, Definitions
from dagster_kafka import KafkaResource, KafkaIOManager, DLQStrategy

@asset
def api_events():
    '''Consume JSON messages from Kafka topic with DLQ support.'''
    pass

defs = Definitions(
    assets=[api_events],
    resources={
        "kafka": KafkaResource(bootstrap_servers="localhost:9092"),
        "io_manager": KafkaIOManager(
            kafka_resource=KafkaResource(bootstrap_servers="localhost:9092"),
            consumer_group_id="my-dagster-pipeline",
            enable_dlq=True,
            dlq_strategy=DLQStrategy.RETRY_THEN_DLQ,
            dlq_max_retries=3
        )
    }
)
Validation Results Summary
PhaseTest TypeResultKey MetricsPhase 5Performance TestingPASS1,199 msgs/sec peak throughputPhase 7Integration TestingPASSEnd-to-end message flow validatedPhase 9Compatibility TestingPASSPython 3.12 + Dagster 1.11.3Phase 10Security AuditPASSCredential + network securityPhase 11Stress TestingEXCEPTIONAL100% success rate, 305 operations
Enterprise Security
Security Protocols Supported

SASL_SSL: Combined authentication and encryption (recommended for production)
SSL: Certificate-based encryption
SASL_PLAINTEXT: Username/password authentication
PLAINTEXT: For local development and testing

SASL Authentication Mechanisms

SCRAM-SHA-256: Secure challenge-response authentication
SCRAM-SHA-512: Enhanced secure authentication
PLAIN: Simple username/password authentication
GSSAPI: Kerberos authentication for enterprise environments
OAUTHBEARER: OAuth-based authentication

Why Choose This Integration
Complete Solution

Only integration supporting all 4 major formats (JSON, JSON Schema, Avro, Protobuf)
Enterprise-grade security with SASL/SSL support
Production-ready with comprehensive monitoring
Advanced error handling with Dead Letter Queue support
Complete DLQ Tooling Suite for enterprise operations

Enterprise Ready

11-phase comprehensive validation covering all scenarios
Real-world deployment patterns and examples
Performance optimization tools and monitoring
Enterprise security for production Kafka clusters
Bulletproof error handling with circuit breaker patterns

Unprecedented Validation

Most validated package in the Dagster ecosystem
Performance proven: 1,199 msgs/sec peak throughput
Stability proven: 100% success rate under stress
Security proven: Complete credential and network validation
Enterprise proven: Exceptional rating across all dimensions

Repository
GitHub: https://github.com/kingsley-123/dagster-kafka-integration
License
Apache 2.0

The most comprehensively validated Kafka integration for Dagster - Version 1.2.0 with JSON Schema Validation Support.
Built by Kingsley Okonkwo - Solving real data engineering problems with comprehensive open source solutions.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dagster_kafka-1.2.0.tar.gz (73.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dagster_kafka-1.2.0-py3-none-any.whl (62.7 kB view details)

Uploaded Python 3

File details

Details for the file dagster_kafka-1.2.0.tar.gz.

File metadata

  • Download URL: dagster_kafka-1.2.0.tar.gz
  • Upload date:
  • Size: 73.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.10

File hashes

Hashes for dagster_kafka-1.2.0.tar.gz
Algorithm Hash digest
SHA256 a94e2bc4ccd1400d57fceb51b5f567248c2ba1c7f0a17d6a4165aef489df3f4f
MD5 df7baee9401fdbbca35e32b9652c5f65
BLAKE2b-256 857e9636adb7abea1596868b24d588aed7066b74906837040a0ccfd28ac832c5

See more details on using hashes here.

File details

Details for the file dagster_kafka-1.2.0-py3-none-any.whl.

File metadata

  • Download URL: dagster_kafka-1.2.0-py3-none-any.whl
  • Upload date:
  • Size: 62.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.10

File hashes

Hashes for dagster_kafka-1.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 022719337912d6a4a581a412b06fe1832b9b77b4c3b4ee372b7369c87d03bac2
MD5 e96eb84aa41bcecdb53cc427d3553dd9
BLAKE2b-256 ddcc65f3dce046e2aa414c32bc17871bac061ad495854f02c861fcdd89566953

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page