Skip to main content

High-performance video frame processing library using actor-based parallel processing

Project description

Decimatr: High-Performance Video Frame Processing Library

Tests Code Quality Python Version License: MIT PyPI version Ruff

Decimatr is a modern, actor-based video frame processing library that provides a clean separation between frame analysis (tagging) and decision-making (filtering). Built on the xoscar Actor Model, it enables efficient distributed processing across CPU cores with optional GPU acceleration.

Features

  • ๐ŸŽฏ Simple API: Process videos with just a few lines of code
  • โšก High Performance: Actor-based parallel processing across CPU cores
  • ๐Ÿง  Smart Filtering: Predefined strategies for common use cases (blur removal, duplicate detection, diversity sampling)
  • ๐Ÿ”ง Extensible: Easy to create custom taggers and filters
  • ๐Ÿ’พ Memory Efficient: Lazy evaluation and automatic memory release
  • ๐Ÿ“Š Comprehensive Metrics: Detailed performance tracking and monitoring
  • ๐ŸŽฎ Optional GPU: GPU acceleration for compute-intensive operations (CLIP embeddings)

Installation

CPU-Only (Default)

pip install decimatr

This installs all dependencies needed for CPU-based processing, including:

  • Frame analysis (blur, hash, entropy detection)
  • Distributed processing across CPU cores
  • All filtering capabilities

With GPU Support (Optional)

pip install decimatr[gpu]

This adds GPU dependencies for:

  • CLIP embeddings
  • GPU-accelerated batch processing

Note: GPU support requires CUDA-capable hardware and drivers.

Quick Start

Basic Usage

from decimatr.core.processor import FrameProcessor

# Create processor with blur removal
processor = FrameProcessor.with_blur_removal(threshold=100.0)

# Process video
for frame in processor.process('video.mp4'):
    # Do something with sharp frames
    save_frame(frame)

Parallel Processing

# Use 4 CPU cores for faster processing
processor = FrameProcessor.with_blur_removal(
    threshold=100.0,
    n_workers=4
)

for frame in processor.process('video.mp4'):
    save_frame(frame)

With Performance Metrics

# Get detailed performance metrics
processor = FrameProcessor.with_blur_removal(threshold=100.0)
frames, result = processor.process('video.mp4', return_result=True)

for frame in frames:
    save_frame(frame)

# Print summary
result.print_summary()
# Output:
# ============================================================
# Processing Session: abc123
# ============================================================
# Frames: 250/1000 selected (25.0%)
# Filtered: 750 frames
# Processing Time: 12.50s
# Throughput: 80.0 fps
# Errors: 0

Core Concepts

Taggers (Stateless Analysis)

Taggers analyze individual frames and compute metadata tags without maintaining state:

from decimatr.taggers.blur import BlurTagger
from decimatr.taggers.hash import HashTagger
from decimatr.taggers.entropy import EntropyTagger

# Taggers compute metrics for each frame
blur_tagger = BlurTagger()        # Computes blur score
hash_tagger = HashTagger()        # Computes perceptual hash
entropy_tagger = EntropyTagger()  # Computes Shannon entropy

Available Taggers:

  • BlurTagger: Laplacian variance for blur detection
  • HashTagger: Perceptual hashing for duplicate detection
  • EntropyTagger: Shannon entropy for information content
  • CLIPTagger: CLIP embeddings (requires GPU dependencies)

Filters (Decision Making)

Filters make pass/fail decisions based on frame tags:

Stateless Filters

Make decisions based only on current frame:

from decimatr.filters.blur import BlurFilter
from decimatr.filters.entropy import EntropyFilter
from decimatr.filters.threshold import ThresholdFilter

# Filter frames below threshold
blur_filter = BlurFilter(threshold=100.0)
entropy_filter = EntropyFilter(threshold=4.0)

# Generic threshold filter
threshold_filter = ThresholdFilter(
    tag_key='blur_score',
    threshold=100.0,
    operator='>'
)

Stateful Filters

Make decisions based on temporal context:

from decimatr.filters.duplicate import DuplicateFilter
from decimatr.filters.motion import MotionFilter
from decimatr.filters.diversity import DiversityFilter

# Detect duplicates within sliding window
duplicate_filter = DuplicateFilter(
    threshold=0.05,
    buffer_size=50
)

# Detect scene changes
motion_filter = MotionFilter(
    threshold=0.3,
    buffer_size=10
)

# Maximize diversity
diversity_filter = DiversityFilter(
    window_size=100,
    min_distance=0.1
)

Strategies (Predefined Pipelines)

Strategies combine taggers and filters for common use cases:

# Blur removal
processor = FrameProcessor.with_blur_removal(threshold=100.0)

# Duplicate detection
processor = FrameProcessor.with_duplicate_detection(
    threshold=0.05,
    window_size=50
)

# Smart sampling (blur + duplicates + diversity)
processor = FrameProcessor.with_smart_sampling()

Custom Pipelines

Create custom processing pipelines by combining taggers and filters:

from decimatr.core.processor import FrameProcessor
from decimatr.taggers.blur import BlurTagger
from decimatr.taggers.hash import HashTagger
from decimatr.filters.blur import BlurFilter
from decimatr.filters.duplicate import DuplicateFilter

# Define custom pipeline
pipeline = [
    # Taggers first: compute metrics
    BlurTagger(),
    HashTagger(),
    # Filters second: make decisions
    BlurFilter(threshold=100.0),
    DuplicateFilter(threshold=0.05, buffer_size=50)
]

# Create processor
processor = FrameProcessor(pipeline=pipeline, n_workers=4)

# Process video
for frame in processor.process('video.mp4'):
    process_frame(frame)

Performance Optimizations

Lazy Evaluation

Decimatr automatically skips computing tags that aren't used by any filter:

# Only BlurTagger will execute (HashTagger is unused)
pipeline = [
    BlurTagger(),
    HashTagger(),  # Not used by any filter - automatically skipped!
    BlurFilter(threshold=100.0)
]

processor = FrameProcessor(
    pipeline=pipeline,
    lazy_evaluation=True  # Default
)

Performance Impact: Up to 8x faster when taggers produce unused tags.

Memory Release

Automatically frees frame data from filtered frames:

processor = FrameProcessor(
    pipeline=pipeline,
    release_memory=True  # Default
)

Memory Savings: Up to 70% reduction in peak memory usage.

Parallel Processing

Distribute processing across CPU cores:

processor = FrameProcessor(
    pipeline=pipeline,
    n_workers=4  # Use 4 CPU cores
)

Scaling Guidelines:

  • n_workers=1: Single-threaded (default)
  • n_workers=4: Good for most workloads
  • n_workers=CPU_COUNT: Maximum parallelism

Creating Custom Components

Custom Tagger

from decimatr.taggers.base import Tagger
from decimatr.scheme import VideoFramePacket
import cv2

class CustomTagger(Tagger):
    """Compute custom metric for frames."""
    
    def compute_tags(self, packet: VideoFramePacket) -> dict:
        # Analyze frame
        gray = cv2.cvtColor(packet.frame_data, cv2.COLOR_BGR2GRAY)
        metric = compute_custom_metric(gray)
        
        return {"custom_metric": metric}
    
    @property
    def tag_keys(self) -> list:
        return ["custom_metric"]

Custom Filter

from decimatr.filters.base import StatelessFilter
from decimatr.scheme import VideoFramePacket

class CustomFilter(StatelessFilter):
    """Filter frames based on custom metric."""
    
    def __init__(self, threshold: float):
        self.threshold = threshold
    
    def should_pass(self, packet: VideoFramePacket) -> bool:
        metric = packet.get_tag("custom_metric")
        return metric > self.threshold
    
    @property
    def required_tags(self) -> list:
        return ["custom_metric"]

Custom Strategy

from decimatr.strategies.base import FilterStrategy

class CustomStrategy(FilterStrategy):
    """Custom processing strategy."""
    
    def __init__(self, threshold: float = 100.0):
        self.threshold = threshold
    
    def build_pipeline(self):
        return [
            CustomTagger(),
            CustomFilter(threshold=self.threshold)
        ]

# Use custom strategy
processor = FrameProcessor(strategy=CustomStrategy(threshold=150.0))

GPU Acceleration (Optional)

Checking GPU Availability

from decimatr.core.processor import FrameProcessor

# Check if GPU is available
if FrameProcessor.check_gpu_available():
    print("GPU acceleration available")
    info = FrameProcessor.get_gpu_info()
    print(f"CUDA version: {info['cuda_version']}")
    print(f"Device: {info['device_name']}")
else:
    print("GPU not available - using CPU")

Using GPU Taggers

from decimatr.taggers.clip import CLIPTagger

# Create GPU-accelerated tagger
clip_tagger = CLIPTagger(
    model_name="ViT-B/32",
    device="cuda"  # or "auto" to auto-detect
)

# Use in pipeline
pipeline = [
    clip_tagger,
    # ... filters ...
]

processor = FrameProcessor(
    pipeline=pipeline,
    use_gpu=True,
    gpu_batch_size=32
)

GPU Installation

If GPU dependencies are missing:

# Install GPU support
pip install decimatr[gpu]

# Or install manually
pip install torch torchvision --index-url https://download.pytorch.org/whl/cu118

API Reference

FrameProcessor

Main API for processing video frames.

FrameProcessor(
    pipeline: Optional[List[Union[Tagger, Filter]]] = None,
    strategy: Optional[FilterStrategy] = None,
    n_workers: int = 1,
    use_gpu: bool = False,
    gpu_batch_size: int = 32,
    lazy_evaluation: bool = True,
    release_memory: bool = True
)

Parameters:

  • pipeline: Custom pipeline of taggers and filters
  • strategy: Predefined strategy (overrides pipeline)
  • n_workers: Number of worker actors (1=single-threaded)
  • use_gpu: Enable GPU acceleration
  • gpu_batch_size: Batch size for GPU processing
  • lazy_evaluation: Compute tags only when needed
  • release_memory: Free frame data from filtered frames

Methods:

  • process(source, session_id=None, return_result=False): Process frames
  • with_blur_removal(threshold, **kwargs): Create with blur removal strategy
  • with_duplicate_detection(threshold, window_size, **kwargs): Create with duplicate detection
  • with_smart_sampling(**kwargs): Create with smart sampling strategy
  • check_gpu_available(): Check if GPU is available (static)
  • get_gpu_info(): Get GPU information (static)

ProcessingResult

Summary of processing session.

result = ProcessingResult(...)

Attributes:

  • session_id: Session identifier
  • total_frames: Total frames processed
  • selected_frames: Frames that passed filters
  • filtered_frames: Frames filtered out
  • processing_time: Total time in seconds
  • stage_metrics: Per-stage metrics
  • actor_metrics: Actor-level metrics
  • errors: List of errors

Methods:

  • get_throughput(): Get frames per second
  • get_selection_rate(): Get selection percentage
  • get_summary(): Get complete metrics dictionary
  • print_summary(): Print detailed summary

Examples

See the examples/ directory for complete working examples:

  • actor_pipeline_demo.py: Parallel processing with actors
  • frame_processor_demo.py: Basic frame processing
  • performance_optimizations_demo.py: Performance optimization techniques

Documentation

Architecture

Decimatr uses a three-layer architecture:

  1. Tagging Layer: Stateless frame analysis (blur, hash, entropy, etc.)
  2. Filtering Layer: Decision-making (stateless and stateful)
  3. Orchestration Layer: Actor-based distributed processing (xoscar)
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                        FrameProcessor                            โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”‚
โ”‚  โ”‚              Pipeline Configuration                        โ”‚  โ”‚
โ”‚  โ”‚  [Tagger1] โ†’ [Tagger2] โ†’ [Filter1] โ†’ [Filter2] โ†’ ...     โ”‚  โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ”‚
โ”‚                              โ†“                                   โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”‚
โ”‚  โ”‚           Actor Pool (xoscar)                             โ”‚  โ”‚
โ”‚  โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”               โ”‚  โ”‚
โ”‚  โ”‚  โ”‚ Worker 1 โ”‚  โ”‚ Worker 2 โ”‚  โ”‚ Worker N โ”‚  ...          โ”‚  โ”‚
โ”‚  โ”‚  โ”‚ (CPU)    โ”‚  โ”‚ (CPU)    โ”‚  โ”‚ (GPU)    โ”‚               โ”‚  โ”‚
โ”‚  โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜               โ”‚  โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ”‚
โ”‚                              โ†“                                   โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”‚
โ”‚  โ”‚         Frame Stream (Input/Output)                       โ”‚  โ”‚
โ”‚  โ”‚  VideoFile โ†’ FrameIterator โ†’ ProcessedFrames             โ”‚  โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Requirements

  • Python 3.10+
  • NumPy >= 2.2.5
  • OpenCV >= 4.11.0
  • imagehash >= 4.3.2
  • xoscar >= 0.3.0
  • decord >= 0.6.0
  • loguru >= 0.7.3

Optional (GPU):

  • torch >= 2.0.0
  • torchvision >= 0.15.0

Contributing

Contributions are welcome! Please open an issue or pull request on GitHub.

License

This project is licensed under the MIT License - see the LICENSE file for details.

Support

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

decimatr-0.1.0.tar.gz (240.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

decimatr-0.1.0-py3-none-any.whl (73.5 kB view details)

Uploaded Python 3

File details

Details for the file decimatr-0.1.0.tar.gz.

File metadata

  • Download URL: decimatr-0.1.0.tar.gz
  • Upload date:
  • Size: 240.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.11

File hashes

Hashes for decimatr-0.1.0.tar.gz
Algorithm Hash digest
SHA256 15aeff85afb2b41138cdbd70752e06056f4794d88a7b750288c0bd0468279cf9
MD5 7725fe371002bc4de39f95b74c617583
BLAKE2b-256 3aa5722690ca17c39930408accdad0caf05082eef8edc9177a2cea8e707be824

See more details on using hashes here.

File details

Details for the file decimatr-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: decimatr-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 73.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.11

File hashes

Hashes for decimatr-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 a902087c04090130ffcb02512f01d19a1069230e13e31ecc93ddb49e037c8367
MD5 86a6c6a02dc698baa280136d86621ff7
BLAKE2b-256 3127c160348a85ef8e1c6e3cbbbb4ad70acfe2d731efb3d6829689005f6add57

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page