Skip to main content

Event capture library for data pipelines with Kafka transport and decorator-based monitoring

Project description

Palantir Event Capture

A production-ready event capture library for data pipelines with decorator-based monitoring, Kafka transport, and standardized error logging.

Features

  • 🎯 Decorator-based capture - Easily wrap functions with error/event monitoring
  • 📊 Standardized event models - Consistent event structure with Pydantic validation
  • 🚀 Kafka transport - Automatic event streaming to Kafka topics
  • 🔄 Retry logic - Built-in retry mechanisms with exponential backoff
  • 📝 Rich context - Captures stack traces, function metadata, and custom data
  • Async support - Full support for async/await patterns
  • 🚀 Background Transport - Asynchronous event sending in background thread
  • 🎛️ Flexible configuration - Environment-based or programmatic configuration
  • 🔌 Multiple transports - Console, Kafka, or custom transport layers
  • 🐍 Pydantic V2 Compatible - Works with both Pydantic V1 and V2

Installation

pip install git+https://<USERNAME>:<TOKEN>@git.blackeye.id/sahabista/rca-event-capture.git
  • Use git install to access the library, ensure you have been add as the maintainer for the repository.
  • Use Personal Access Token (PAT) to avoid error

Quick Start

Basic Usage

from PECLibrary.decorators import capture_errors
from PECLibrary.models.captured_models import PipelineComponent, Severity

@capture_errors(
    component=PipelineComponent.KAFKA_PRODUCER,
    severity=Severity.HIGH
)
def send_message(topic: str, message: dict):
    # Your code here
    producer.send(topic, message)

Async Functions

from PECLibrary.decorators import async_capture_errors

@async_capture_errors(
    component=PipelineComponent.ELASTICSEARCH,
    severity=Severity.CRITICAL
)
async def index_document(doc: dict):
    await es_client.index(index="logs", document=doc)

Retry with Capture

from PECLibrary.decorators import retry_with_capture

@retry_with_capture(
    max_retries=3,
    delay=1.0,
    backoff=2.0,
    component=PipelineComponent.DATABASE
)
def query_database(query: str):
    return db.execute(query)

Context Manager

from PECLibrary.utils.context_managers import capture_context

with capture_context(
    component=PipelineComponent.TRANSFORMATION,
    pipeline_name="etl_pipeline",
    engine_name="pandas"
):
    # Your data transformation code
    df = process_data(raw_data)

Configuration

Environment Variables

Create a .env file:

# Kafka Configuration
KAFKA_BOOTSTRAP_SERVERS_RCA=localhost:9092
KAFKA_EVENT_TOPIC_RCA=rca.events
KAFKA_ENABLED_RCA=true
KAFKA_COMPRESSION_RCA=gzip

# Logging Configuration
LOG_LEVEL=INFO
LOG_TO_CONSOLE=true
SEND_TO_TRANSPORT=true

# Event Capture Settings
DEFAULT_COMPONENT=CUSTOM
DEFAULT_SEVERITY=MEDIUM
ENABLE_STACK_TRACES=true
USE_BACKGROUND_TRANSPORT=false

Programmatic Configuration

from PECLibrary.config import configure

configure(
    KAFKA_BOOTSTRAP_SERVERS_RCA="localhost:9092",
    kafka_events_topic="rca.events",
    log_level="INFO",
    send_to_transport=True
)

Event Model

Events are captured with comprehensive metadata:

{
    "id": "550e8400-e29b-41d4-a716-446655440000",
    "event_type": "error",
    "pipeline_name": "data_ingestion_pipeline",
    "engine_name": "spark",
    "component": "kafka_consumer",
    "component_name": "UserEventsConsumer",
    "severity": "high",
    "message": "Failed to consume messages: Connection timeout",
    "timestamp": 1703001234.567,
    "timestamp_iso": "2024-12-01 12:34:56.789",
    "error_type": "KafkaTimeoutError",
    "stack_trace": "...",
    "function_name": "consume_messages",
    "file_path": "/app/consumers/events.py",
    "line_number": 45,
    "metadata": {
        "topic": "user_events",
        "partition": 0,
        "offset": 12345
    },
    "resolved": false
}

Advanced Usage

Custom Component Names

@capture_errors(
    component=PipelineComponent.CUSTOM,
    component_name="MyCustomETL",
    pipeline_name="daily_batch_job",
    engine_name="custom"
)
def custom_pipeline():
    pass

Capture All Class Methods

from PECLibrary.decorators import capture_method_errors

@capture_method_errors(
    component=PipelineComponent.API,
    severity=Severity.HIGH
)
class DataAPI:
    def fetch(self):
        pass
  
    def save(self):
        pass

Manual Event Capture

from PECLibrary.services import get_capture_service
from PECLibrary.models.captured_models import CaptureConfig

service = get_capture_service()

try:
    risky_operation()
except Exception as e:
    config = CaptureConfig(
        component=PipelineComponent.POSTGRES,
        severity=Severity.CRITICAL,
        metadata={"query": "SELECT * FROM users"}
    )
    service.capture_exception(e, config=config)

Custom Transport

from PECLibrary.services.transport import Transport
from PECLibrary.models.captured_models import ErrorEvent

class CustomTransport(Transport):
    def send(self, event: ErrorEvent) -> bool:
        # Your custom transport logic
        my_logging_system.log(event.dict())
        return True

# Register your transport
from PECLibrary.services import get_capture_service
service = get_capture_service()
service.add_transport(CustomTransport())

Pipeline Components

Supported pipeline components:

  • Message Brokers: KAFKA_PRODUCER, KAFKA_CONSUMER, RABBITMQ, BEANSTALK
  • Databases: ELASTICSEARCH, POSTGRES, MONGODB, REDIS, MEMGRAPH, NEBULA, QDRANT, DATABASE
  • Storage: S3
  • Processing: TRANSFORMATION, DBT, PANDAS
  • Orchestration: AIRFLOW
  • Services: API, WEBHOOK
  • Generic: CUSTOM, UNKNOWN

Event Severity Levels

  • CRITICAL - System-breaking errors requiring immediate attention
  • HIGH - Significant errors affecting functionality
  • MEDIUM - Moderate issues that should be investigated
  • LOW - Minor issues or warnings
  • WARNING - Advisory notifications

Architecture

See ARCHITECTURE.md for detailed design documentation.

Testing

# Run tests
pytest

# With coverage
pytest --cov=src --cov-report=html

# Run specific test
pytest tests/test_decorators.py

Contributing

Contributions are welcome! Please see our contributing guidelines.

License

MIT License - see LICENSE file for details.

Support

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

peclibrary-0.7.0.tar.gz (32.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

peclibrary-0.7.0-py3-none-any.whl (29.5 kB view details)

Uploaded Python 3

File details

Details for the file peclibrary-0.7.0.tar.gz.

File metadata

  • Download URL: peclibrary-0.7.0.tar.gz
  • Upload date:
  • Size: 32.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.10.19

File hashes

Hashes for peclibrary-0.7.0.tar.gz
Algorithm Hash digest
SHA256 ef8160a9717e310051b5b229146aa5717ec36df41de3c6450d91669ba45b95be
MD5 d675f46c9b25412ad37ee72ae88d48e7
BLAKE2b-256 19d0bda262ad01f32ecab3feb28ad7d8c42d58f39aa210b5838941d103ad3d94

See more details on using hashes here.

File details

Details for the file peclibrary-0.7.0-py3-none-any.whl.

File metadata

  • Download URL: peclibrary-0.7.0-py3-none-any.whl
  • Upload date:
  • Size: 29.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.10.19

File hashes

Hashes for peclibrary-0.7.0-py3-none-any.whl
Algorithm Hash digest
SHA256 805d3ef65292e5c3393231abd1b0257bbeb0cce01cb88f0d946e73a03fac0c7d
MD5 5dac61351fa11d90e8f8bc9798e4b850
BLAKE2b-256 5f8970d720d60df89ec1f8f652eb90d1cf57ed2822c4a30483efc0bd79be838e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page