Event capture library for data pipelines with Kafka transport and decorator-based monitoring
Project description
Palantir Event Capture
A production-ready event capture library for data pipelines with decorator-based monitoring, Kafka transport, and standardized error logging.
Features
- 🎯 Decorator-based capture - Easily wrap functions with error/event monitoring
- 📊 Standardized event models - Consistent event structure with Pydantic validation
- 🚀 Kafka transport - Automatic event streaming to Kafka topics
- 🔄 Retry logic - Built-in retry mechanisms with exponential backoff
- 📝 Rich context - Captures stack traces, function metadata, and custom data
- ⚡ Async support - Full support for async/await patterns
- 🚀 Background Transport - Asynchronous event sending in background thread
- 🎛️ Flexible configuration - Environment-based or programmatic configuration
- 🔌 Multiple transports - Console, Kafka, or custom transport layers
- 🐍 Pydantic V2 Compatible - Works with both Pydantic V1 and V2
Installation
pip install git+https://<USERNAME>:<TOKEN>@git.blackeye.id/sahabista/rca-event-capture.git
- Use git install to access the library, ensure you have been add as the maintainer for the repository.
- Use Personal Access Token (PAT) to avoid error
Quick Start
Basic Usage
from PECLibrary.decorators import capture_errors
from PECLibrary.models.captured_models import PipelineComponent, Severity
@capture_errors(
component=PipelineComponent.KAFKA_PRODUCER,
severity=Severity.HIGH
)
def send_message(topic: str, message: dict):
# Your code here
producer.send(topic, message)
Async Functions
from PECLibrary.decorators import async_capture_errors
@async_capture_errors(
component=PipelineComponent.ELASTICSEARCH,
severity=Severity.CRITICAL
)
async def index_document(doc: dict):
await es_client.index(index="logs", document=doc)
Retry with Capture
from PECLibrary.decorators import retry_with_capture
@retry_with_capture(
max_retries=3,
delay=1.0,
backoff=2.0,
component=PipelineComponent.DATABASE
)
def query_database(query: str):
return db.execute(query)
Context Manager
from PECLibrary.utils.context_managers import capture_context
with capture_context(
component=PipelineComponent.TRANSFORMATION,
pipeline_name="etl_pipeline",
engine_name="pandas"
):
# Your data transformation code
df = process_data(raw_data)
Configuration
Environment Variables
Create a .env file:
# Kafka Configuration
KAFKA_BOOTSTRAP_SERVERS_RCA=localhost:9092
KAFKA_EVENT_TOPIC_RCA=rca.events
KAFKA_ENABLED_RCA=true
KAFKA_COMPRESSION_RCA=gzip
# Logging Configuration
LOG_LEVEL=INFO
LOG_TO_CONSOLE=true
SEND_TO_TRANSPORT=true
# Event Capture Settings
DEFAULT_COMPONENT=CUSTOM
DEFAULT_SEVERITY=MEDIUM
ENABLE_STACK_TRACES=true
USE_BACKGROUND_TRANSPORT=false
Programmatic Configuration
from PECLibrary.config import configure
configure(
KAFKA_BOOTSTRAP_SERVERS_RCA="localhost:9092",
kafka_events_topic="rca.events",
log_level="INFO",
send_to_transport=True
)
Event Model
Events are captured with comprehensive metadata:
{
"id": "550e8400-e29b-41d4-a716-446655440000",
"event_type": "error",
"pipeline_name": "data_ingestion_pipeline",
"engine_name": "spark",
"component": "kafka_consumer",
"component_name": "UserEventsConsumer",
"severity": "high",
"message": "Failed to consume messages: Connection timeout",
"timestamp": 1703001234.567,
"timestamp_iso": "2024-12-01 12:34:56.789",
"error_type": "KafkaTimeoutError",
"stack_trace": "...",
"function_name": "consume_messages",
"file_path": "/app/consumers/events.py",
"line_number": 45,
"metadata": {
"topic": "user_events",
"partition": 0,
"offset": 12345
},
"resolved": false
}
Advanced Usage
Custom Component Names
@capture_errors(
component=PipelineComponent.CUSTOM,
component_name="MyCustomETL",
pipeline_name="daily_batch_job",
engine_name="custom"
)
def custom_pipeline():
pass
Capture All Class Methods
from PECLibrary.decorators import capture_method_errors
@capture_method_errors(
component=PipelineComponent.API,
severity=Severity.HIGH
)
class DataAPI:
def fetch(self):
pass
def save(self):
pass
Manual Event Capture
from PECLibrary.services import get_capture_service
from PECLibrary.models.captured_models import CaptureConfig
service = get_capture_service()
try:
risky_operation()
except Exception as e:
config = CaptureConfig(
component=PipelineComponent.POSTGRES,
severity=Severity.CRITICAL,
metadata={"query": "SELECT * FROM users"}
)
service.capture_exception(e, config=config)
Custom Transport
from PECLibrary.services.transport import Transport
from PECLibrary.models.captured_models import ErrorEvent
class CustomTransport(Transport):
def send(self, event: ErrorEvent) -> bool:
# Your custom transport logic
my_logging_system.log(event.dict())
return True
# Register your transport
from PECLibrary.services import get_capture_service
service = get_capture_service()
service.add_transport(CustomTransport())
Pipeline Components
Supported pipeline components:
- Message Brokers:
KAFKA_PRODUCER,KAFKA_CONSUMER,RABBITMQ,BEANSTALK - Databases:
ELASTICSEARCH,POSTGRES,MONGODB,REDIS,MEMGRAPH,NEBULA,QDRANT,DATABASE - Storage:
S3 - Processing:
TRANSFORMATION,DBT,PANDAS - Orchestration:
AIRFLOW - Services:
API,WEBHOOK - Generic:
CUSTOM,UNKNOWN
Event Severity Levels
CRITICAL- System-breaking errors requiring immediate attentionHIGH- Significant errors affecting functionalityMEDIUM- Moderate issues that should be investigatedLOW- Minor issues or warningsWARNING- Advisory notifications
Architecture
See ARCHITECTURE.md for detailed design documentation.
Testing
# Run tests
pytest
# With coverage
pytest --cov=src --cov-report=html
# Run specific test
pytest tests/test_decorators.py
Contributing
Contributions are welcome! Please see our contributing guidelines.
License
MIT License - see LICENSE file for details.
Support
- Documentation: ReadTheDocs
- Issues: GitHub Issues
- Discussions: GitHub Discussions
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file peclibrary-0.7.0.tar.gz.
File metadata
- Download URL: peclibrary-0.7.0.tar.gz
- Upload date:
- Size: 32.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.10.19
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ef8160a9717e310051b5b229146aa5717ec36df41de3c6450d91669ba45b95be
|
|
| MD5 |
d675f46c9b25412ad37ee72ae88d48e7
|
|
| BLAKE2b-256 |
19d0bda262ad01f32ecab3feb28ad7d8c42d58f39aa210b5838941d103ad3d94
|
File details
Details for the file peclibrary-0.7.0-py3-none-any.whl.
File metadata
- Download URL: peclibrary-0.7.0-py3-none-any.whl
- Upload date:
- Size: 29.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.10.19
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
805d3ef65292e5c3393231abd1b0257bbeb0cce01cb88f0d946e73a03fac0c7d
|
|
| MD5 |
5dac61351fa11d90e8f8bc9798e4b850
|
|
| BLAKE2b-256 |
5f8970d720d60df89ec1f8f652eb90d1cf57ed2822c4a30483efc0bd79be838e
|