No project description provided
Project description
Event Sink Filter
The Event Sink Filter is a production-ready OpenFilter component that collects events from filter pipelines and reliably delivers them to a Plainsight API's CloudEvents ingestion compatible endpoint.
Features
- CloudEvents v1.0 Compliant: Industry-standard event format with Plainsight extensions
- Intelligent Batch Processing: Automatic batching based on size (5 MiB), count (1000 events), or time (5s)
- Gzip Compression: 70-80% bandwidth reduction for efficient network usage
- Reliable Delivery: Exponential backoff retry logic for resilient event delivery
- Flexible Event Collection: Multiple extraction patterns, topic filtering
- Production Ready: Comprehensive error handling, logging, and graceful shutdown
Architecture
Main Thread (Filter) Background Thread (EventSinkThread)
───────────────────── ───────────────────────────────────
Receive frames Accumulate events from queue
Extract events ───────→ Check flush conditions:
Queue events - Size >= 5 MiB
- Count >= 1000
- Time >= 5s
↓
Build CloudEvents batch
Gzip compress
HTTP POST to API
Retry on failure
Prerequisites
If you are exporting events to the Plainsight API, you'll need to:
- Plainsight API Token: Generate an API token from the Plainsight dashboard with
filterpipeline:createscope - Plainsight API Filter Pipeline: Create a filter pipeline in the Plainsight dashboard
Quick Start
1. Install Dependencies
python3 -m venv venv
source venv/bin/activate
make install
2. Configure Environment
Copy the example environment file and fill in your values:
cp .env.example .env
# Edit .env with your API credentials
3. Run Locally
# Run the filter
source venv/bin/activate
source .env && make run
Or in one line:
source .env && source venv/bin/activate && make run
4. Run in Docker
# Build image
make build-image
# Run with docker-compose
source .env && make run-image
Configuration
Required Parameters
| Parameter | Environment Variable | Description |
|---|---|---|
api_endpoint |
FILTER_API_ENDPOINT |
Full API endpoint URL including pipeline name and query params (e.g., https://api.plainsight.ai/filter-pipelines/my-pipeline/events?project=uuid) |
api_token |
FILTER_API_TOKEN |
API token (format: ps_...) |
api_custom_headers |
FILTER_API_CUSTOM_HEADERS |
Custom HTTP headers (optional, comma-separated "Header: value" pairs, e.g., "X-Scope-OrgID: uuid") |
Optional Parameters
| Parameter | Environment Variable | Default | Description |
|---|---|---|---|
event_topics |
FILTER_EVENT_TOPICS |
* |
Topics to collect (comma-separated) |
max_batch_size_bytes |
FILTER_MAX_BATCH_SIZE_BYTES |
5242880 |
Max batch size (5 MiB) |
max_batch_events |
FILTER_MAX_BATCH_EVENTS |
1000 |
Max events per batch |
flush_interval_seconds |
FILTER_FLUSH_INTERVAL_SECONDS |
5.0 |
Max time between flushes |
enable_gzip |
FILTER_ENABLE_GZIP |
true |
Enable gzip compression |
request_timeout_seconds |
FILTER_REQUEST_TIMEOUT_SECONDS |
30.0 |
HTTP request timeout |
max_retries |
FILTER_MAX_RETRIES |
3 |
Max retry attempts |
Event Formats
The filter automatically detects and extracts events from frame.data.
Example
frame.data = {
'meta': {
'count': 5,
'classes': ['person', 'vehicle'],
'custom_field': 'value'
}
}
CloudEvent Schema
Events are sent as CloudEvents v1.0 with Plainsight extensions:
{
"id": "550e8400-e29b-41d4-a716-446655440000",
"type": "com.plainsight.event.generic",
"source": "filter://macbookpro-lucas.local-0153261e-9551-4e4a-8740-c71b6dc9c506/ObjectDetector/main",
"specversion": "1.0",
"time": "2025-10-27T22:00:00Z",
"datacontenttype": "application/json",
"data": {
"meta": {
"count": 5,
"classes": ["person", "vehicle"],
"custom_field": "value"
}
},
"pipelineid": "macbookpro-lucas.local-0153261e-9551-4e4a-8740-c71b6dc9c506",
"filtername": "ObjectDetector",
"filtertopic": "detections"
}
Testing
Run the comprehensive test suite:
# Run all tests
make test
# Run with coverage report
make test-coverage
Deployment
Docker Compose Example
services:
filter_event_sink:
image: containers.openfilter.io/plainsightai/openfilter-event-sink:v1.1.0
environment:
LOG_LEVEL: INFO
FILTER_ID: EventSink
FILTER_SOURCES: tcp://upstream_filter:5550??;>VideoIn
# Event Sink Configuration
FILTER_API_ENDPOINT: "https://api.prod.plainsight.tech/filter-pipelines/production-line-1/events?project=your-project-uuid"
FILTER_API_TOKEN: "${PLAINSIGHT_API_TOKEN}"
FILTER_API_CUSTOM_HEADERS: "X-Scope-OrgID: 48eec17d-3089-4d13-a107-24f5f4cf84c7" # Optional
FILTER_EVENT_TOPICS: "detections,alerts"
FILTER_FLUSH_INTERVAL_SECONDS: "5.0"
volumes:
- ./cache:/app/cache
- ./telemetry:/app/telemetry
networks:
- filter-network
Publishing Releases
- Update
VERSIONfile with semver tag (e.g.,v1.2.3) - Update
RELEASE.mdwith version entry matchingVERSION - Merge to main - CI will:
- Build and publish Docker image to GAR
- Build and publish Python wheel to GAR
- Push docs to documentation sites
Monitoring
The filter provides structured logging:
- Info: Successful batch posts, thread lifecycle
- Warning: Retries, queue near capacity
- Error: Failed posts, dropped events, auth failures
Performance
- Throughput: 1000+ events/second with batching
- Memory: Bounded queue (10,000 events)
- Network: 70-80% bandwidth reduction with gzip
Troubleshooting
Queue Full Warnings
- Solution: Increase
event_queue_sizeor reduce event rate - Events will be dropped when queue is full
API Authentication Errors (401/403)
- Solution: Verify
FILTER_API_TOKENis valid and has correct scopes - Check token expiration
Network Timeouts
- Solution: Increase
request_timeout_seconds - Check network connectivity to API endpoint
Events Not Appearing in BigQuery
- Solution: Verify filter pipeline exists in Plainsight
- Check API endpoint URL is correct
- Monitor filter logs for POST errors
Documentation
- docs/overview.md: User-facing documentation
- RELEASE.md: Release notes and changelog
License
Copyright © 2025 Plainsight AI
Support
For issues and questions:
- Create an issue in this repository
- Contact Plainsight support
Version: v1.0.1 Status: Production Ready Maintainer: Plainsight AI
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file filter_event_sink-1.1.0-py3-none-any.whl.
File metadata
- Download URL: filter_event_sink-1.1.0-py3-none-any.whl
- Upload date:
- Size: 17.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.14
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
19c830636af5a92709ce16de6b900fb3203c81b2b73da38cbe8125c093a5c470
|
|
| MD5 |
eefe61b1dc4514245f58a4e03dbe8746
|
|
| BLAKE2b-256 |
32cb9ddb860052a0da4849ec19eaa8a8b56973800d705775ab8cab6b98c03654
|