High-performance video frame processing library using actor-based parallel processing
Project description
Decimatr: High-Performance Video Frame Processing Library
Decimatr is a modern, actor-based video frame processing library that provides a clean separation between frame analysis (tagging) and decision-making (filtering). Built on the xoscar Actor Model, it enables efficient distributed processing across CPU cores with optional GPU acceleration.
Features
- ๐ฏ Simple API: Process videos with just a few lines of code
- โก High Performance: Actor-based parallel processing across CPU cores
- ๐ง Smart Filtering: Predefined strategies for common use cases (blur removal, duplicate detection, diversity sampling)
- ๐ง Extensible: Easy to create custom taggers and filters
- ๐พ Memory Efficient: Lazy evaluation and automatic memory release
- ๐ Comprehensive Metrics: Detailed performance tracking and monitoring
- ๐ฎ Optional GPU: GPU acceleration for compute-intensive operations (CLIP embeddings)
Installation
CPU-Only (Default)
pip install decimatr
This installs all dependencies needed for CPU-based processing, including:
- Frame analysis (blur, hash, entropy detection)
- Distributed processing across CPU cores
- All filtering capabilities
With GPU Support (Optional)
pip install decimatr[gpu]
This adds GPU dependencies for:
- CLIP embeddings
- GPU-accelerated batch processing
Note: GPU support requires CUDA-capable hardware and drivers.
Quick Start
Basic Usage
from decimatr.core.processor import FrameProcessor
# Create processor with blur removal
processor = FrameProcessor.with_blur_removal(threshold=100.0)
# Process video
for frame in processor.process('video.mp4'):
# Do something with sharp frames
save_frame(frame)
Parallel Processing
# Use 4 CPU cores for faster processing
processor = FrameProcessor.with_blur_removal(
threshold=100.0,
n_workers=4
)
for frame in processor.process('video.mp4'):
save_frame(frame)
With Performance Metrics
# Get detailed performance metrics
processor = FrameProcessor.with_blur_removal(threshold=100.0)
frames, result = processor.process('video.mp4', return_result=True)
for frame in frames:
save_frame(frame)
# Print summary
result.print_summary()
# Output:
# ============================================================
# Processing Session: abc123
# ============================================================
# Frames: 250/1000 selected (25.0%)
# Filtered: 750 frames
# Processing Time: 12.50s
# Throughput: 80.0 fps
# Errors: 0
Core Concepts
Taggers (Stateless Analysis)
Taggers analyze individual frames and compute metadata tags without maintaining state:
from decimatr.taggers.blur import BlurTagger
from decimatr.taggers.hash import HashTagger
from decimatr.taggers.entropy import EntropyTagger
# Taggers compute metrics for each frame
blur_tagger = BlurTagger() # Computes blur score
hash_tagger = HashTagger() # Computes perceptual hash
entropy_tagger = EntropyTagger() # Computes Shannon entropy
Available Taggers:
BlurTagger: Laplacian variance for blur detectionHashTagger: Perceptual hashing for duplicate detectionEntropyTagger: Shannon entropy for information contentCLIPTagger: CLIP embeddings (requires GPU dependencies)
Filters (Decision Making)
Filters make pass/fail decisions based on frame tags:
Stateless Filters
Make decisions based only on current frame:
from decimatr.filters.blur import BlurFilter
from decimatr.filters.entropy import EntropyFilter
from decimatr.filters.threshold import ThresholdFilter
# Filter frames below threshold
blur_filter = BlurFilter(threshold=100.0)
entropy_filter = EntropyFilter(threshold=4.0)
# Generic threshold filter
threshold_filter = ThresholdFilter(
tag_key='blur_score',
threshold=100.0,
operator='>'
)
Stateful Filters
Make decisions based on temporal context:
from decimatr.filters.duplicate import DuplicateFilter
from decimatr.filters.motion import MotionFilter
from decimatr.filters.diversity import DiversityFilter
# Detect duplicates within sliding window
duplicate_filter = DuplicateFilter(
threshold=0.05,
buffer_size=50
)
# Detect scene changes
motion_filter = MotionFilter(
threshold=0.3,
buffer_size=10
)
# Maximize diversity
diversity_filter = DiversityFilter(
window_size=100,
min_distance=0.1
)
Strategies (Predefined Pipelines)
Strategies combine taggers and filters for common use cases:
# Blur removal
processor = FrameProcessor.with_blur_removal(threshold=100.0)
# Duplicate detection
processor = FrameProcessor.with_duplicate_detection(
threshold=0.05,
window_size=50
)
# Smart sampling (blur + duplicates + diversity)
processor = FrameProcessor.with_smart_sampling()
Custom Pipelines
Create custom processing pipelines by combining taggers and filters:
from decimatr.core.processor import FrameProcessor
from decimatr.taggers.blur import BlurTagger
from decimatr.taggers.hash import HashTagger
from decimatr.filters.blur import BlurFilter
from decimatr.filters.duplicate import DuplicateFilter
# Define custom pipeline
pipeline = [
# Taggers first: compute metrics
BlurTagger(),
HashTagger(),
# Filters second: make decisions
BlurFilter(threshold=100.0),
DuplicateFilter(threshold=0.05, buffer_size=50)
]
# Create processor
processor = FrameProcessor(pipeline=pipeline, n_workers=4)
# Process video
for frame in processor.process('video.mp4'):
process_frame(frame)
Performance Optimizations
Lazy Evaluation
Decimatr automatically skips computing tags that aren't used by any filter:
# Only BlurTagger will execute (HashTagger is unused)
pipeline = [
BlurTagger(),
HashTagger(), # Not used by any filter - automatically skipped!
BlurFilter(threshold=100.0)
]
processor = FrameProcessor(
pipeline=pipeline,
lazy_evaluation=True # Default
)
Performance Impact: Up to 8x faster when taggers produce unused tags.
Memory Release
Automatically frees frame data from filtered frames:
processor = FrameProcessor(
pipeline=pipeline,
release_memory=True # Default
)
Memory Savings: Up to 70% reduction in peak memory usage.
Parallel Processing
Distribute processing across CPU cores:
processor = FrameProcessor(
pipeline=pipeline,
n_workers=4 # Use 4 CPU cores
)
Scaling Guidelines:
n_workers=1: Single-threaded (default)n_workers=4: Good for most workloadsn_workers=CPU_COUNT: Maximum parallelism
Creating Custom Components
Custom Tagger
from decimatr.taggers.base import Tagger
from decimatr.scheme import VideoFramePacket
import cv2
class CustomTagger(Tagger):
"""Compute custom metric for frames."""
def compute_tags(self, packet: VideoFramePacket) -> dict:
# Analyze frame
gray = cv2.cvtColor(packet.frame_data, cv2.COLOR_BGR2GRAY)
metric = compute_custom_metric(gray)
return {"custom_metric": metric}
@property
def tag_keys(self) -> list:
return ["custom_metric"]
Custom Filter
from decimatr.filters.base import StatelessFilter
from decimatr.scheme import VideoFramePacket
class CustomFilter(StatelessFilter):
"""Filter frames based on custom metric."""
def __init__(self, threshold: float):
self.threshold = threshold
def should_pass(self, packet: VideoFramePacket) -> bool:
metric = packet.get_tag("custom_metric")
return metric > self.threshold
@property
def required_tags(self) -> list:
return ["custom_metric"]
Custom Strategy
from decimatr.strategies.base import FilterStrategy
class CustomStrategy(FilterStrategy):
"""Custom processing strategy."""
def __init__(self, threshold: float = 100.0):
self.threshold = threshold
def build_pipeline(self):
return [
CustomTagger(),
CustomFilter(threshold=self.threshold)
]
# Use custom strategy
processor = FrameProcessor(strategy=CustomStrategy(threshold=150.0))
GPU Acceleration (Optional)
Checking GPU Availability
from decimatr.core.processor import FrameProcessor
# Check if GPU is available
if FrameProcessor.check_gpu_available():
print("GPU acceleration available")
info = FrameProcessor.get_gpu_info()
print(f"CUDA version: {info['cuda_version']}")
print(f"Device: {info['device_name']}")
else:
print("GPU not available - using CPU")
Using GPU Taggers
from decimatr.taggers.clip import CLIPTagger
# Create GPU-accelerated tagger
clip_tagger = CLIPTagger(
model_name="ViT-B/32",
device="cuda" # or "auto" to auto-detect
)
# Use in pipeline
pipeline = [
clip_tagger,
# ... filters ...
]
processor = FrameProcessor(
pipeline=pipeline,
use_gpu=True,
gpu_batch_size=32
)
GPU Installation
If GPU dependencies are missing:
# Install GPU support
pip install decimatr[gpu]
# Or install manually
pip install torch torchvision --index-url https://download.pytorch.org/whl/cu118
API Reference
FrameProcessor
Main API for processing video frames.
FrameProcessor(
pipeline: Optional[List[Union[Tagger, Filter]]] = None,
strategy: Optional[FilterStrategy] = None,
n_workers: int = 1,
use_gpu: bool = False,
gpu_batch_size: int = 32,
lazy_evaluation: bool = True,
release_memory: bool = True
)
Parameters:
pipeline: Custom pipeline of taggers and filtersstrategy: Predefined strategy (overrides pipeline)n_workers: Number of worker actors (1=single-threaded)use_gpu: Enable GPU accelerationgpu_batch_size: Batch size for GPU processinglazy_evaluation: Compute tags only when neededrelease_memory: Free frame data from filtered frames
Methods:
process(source, session_id=None, return_result=False): Process frameswith_blur_removal(threshold, **kwargs): Create with blur removal strategywith_duplicate_detection(threshold, window_size, **kwargs): Create with duplicate detectionwith_smart_sampling(**kwargs): Create with smart sampling strategycheck_gpu_available(): Check if GPU is available (static)get_gpu_info(): Get GPU information (static)
ProcessingResult
Summary of processing session.
result = ProcessingResult(...)
Attributes:
session_id: Session identifiertotal_frames: Total frames processedselected_frames: Frames that passed filtersfiltered_frames: Frames filtered outprocessing_time: Total time in secondsstage_metrics: Per-stage metricsactor_metrics: Actor-level metricserrors: List of errors
Methods:
get_throughput(): Get frames per secondget_selection_rate(): Get selection percentageget_summary(): Get complete metrics dictionaryprint_summary(): Print detailed summary
Examples
See the examples/ directory for complete working examples:
actor_pipeline_demo.py: Parallel processing with actorsframe_processor_demo.py: Basic frame processingperformance_optimizations_demo.py: Performance optimization techniques
Documentation
- API Documentation - Complete API reference
- Parallel Processing Guide - Actor-based processing
- Performance Optimizations - Optimization techniques
- GPU Setup Guide - GPU installation and configuration
- Custom Components Guide - Creating custom taggers and filters
Architecture
Decimatr uses a three-layer architecture:
- Tagging Layer: Stateless frame analysis (blur, hash, entropy, etc.)
- Filtering Layer: Decision-making (stateless and stateful)
- Orchestration Layer: Actor-based distributed processing (xoscar)
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ FrameProcessor โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ Pipeline Configuration โ โ
โ โ [Tagger1] โ [Tagger2] โ [Filter1] โ [Filter2] โ ... โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ Actor Pool (xoscar) โ โ
โ โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โ โ
โ โ โ Worker 1 โ โ Worker 2 โ โ Worker N โ ... โ โ
โ โ โ (CPU) โ โ (CPU) โ โ (GPU) โ โ โ
โ โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ Frame Stream (Input/Output) โ โ
โ โ VideoFile โ FrameIterator โ ProcessedFrames โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Requirements
- Python 3.10+
- NumPy >= 2.2.5
- OpenCV >= 4.11.0
- imagehash >= 4.3.2
- xoscar >= 0.3.0
- decord >= 0.6.0
- loguru >= 0.7.3
Optional (GPU):
- torch >= 2.0.0
- torchvision >= 0.15.0
Contributing
Contributions are welcome! Please open an issue or pull request on GitHub.
License
This project is licensed under the MIT License - see the LICENSE file for details.
Support
- Issues: GitHub Issues
- Documentation: GitHub Repository
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file decimatr-0.1.0.tar.gz.
File metadata
- Download URL: decimatr-0.1.0.tar.gz
- Upload date:
- Size: 240.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
15aeff85afb2b41138cdbd70752e06056f4794d88a7b750288c0bd0468279cf9
|
|
| MD5 |
7725fe371002bc4de39f95b74c617583
|
|
| BLAKE2b-256 |
3aa5722690ca17c39930408accdad0caf05082eef8edc9177a2cea8e707be824
|
File details
Details for the file decimatr-0.1.0-py3-none-any.whl.
File metadata
- Download URL: decimatr-0.1.0-py3-none-any.whl
- Upload date:
- Size: 73.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a902087c04090130ffcb02512f01d19a1069230e13e31ecc93ddb49e037c8367
|
|
| MD5 |
86a6c6a02dc698baa280136d86621ff7
|
|
| BLAKE2b-256 |
3127c160348a85ef8e1c6e3cbbbb4ad70acfe2d731efb3d6829689005f6add57
|