Skip to main content

No project description provided

Project description

Aggregator

PyPI version Docker Version License: Apache 2.0

The Aggregator is a powerful filter that aggregates numeric and categorical data from upstream filters. It supports various aggregation operations and can work with multiple upstream producers and downstream consumers.

Features

  • Multiple Aggregation Operations: Sum, Average, Min, Max, Count, Count Distinct, Median, Standard Deviation, Any, All, Mode operations, and Distinct value collection
  • Flexible Configuration: Support for nested fields using dot notation, optional forwarding of extra fields, image forwarding capability, upstream data forwarding option, and customizable output key naming
  • Use Cases: Aggregating metrics from multiple sources, computing statistics across multiple frames, collecting unique values from different sources, combining data from parallel processing pipelines, data normalization and consolidation

How It Works

The Aggregator processes multiple input frames and performs statistical aggregations on specified fields. Here's how it works:

  1. Input Processing: Receives frames from multiple upstream sources (e.g., different cameras, sensors)
  2. Data Extraction: Extracts values from specified fields using dot notation (e.g., meta.temperature)
  3. Aggregation: Applies statistical operations (sum, avg, min, max, etc.) across all input frames
  4. Output Generation: Creates a main aggregated frame plus optionally forwards original source frames

Example Pipeline

graph TD
    A[Video Input] --> B[Echo Filter 1]
    A --> C[Echo Filter 2]
    B --> D[FilterAggregator]
    C --> D
    D --> E[Web Visualization]
    
    B -.->|"cam_1 data"| D
    C -.->|"cam_2 data"| D
    D -.->|"aggregated + original frames"| E

The aggregator receives data from both echo filters and combines them into statistical summaries.

Usage

Using the Demo Script

The scripts/filter_usage.py script demonstrates the Aggregator in action with a complete pipeline:

# Run with default configuration
python scripts/filter_usage.py

# Run with custom output path
python scripts/filter_usage.py --output_path output/my_results.json

# Run with custom video input
VIDEO_INPUT=path/to/your/video.mp4 python scripts/filter_usage.py

The script creates a complete pipeline with:

  • Video Input: Reads video frames from a file
  • Echo Filters: Two filters that cycle through JSON event data and attach it to video frames
  • FilterAggregator: Aggregates data from both echo filters
  • Web Visualization: Serves results at http://localhost:8002

Configuration

The Aggregator can be configured using environment variables:

FILTER_AGGREGATIONS='{"meta.sheeps":"sum", "meta.door_time":"avg", "meta.states":"distinct"}'
FILTER_FORWARD_EXTRA_FIELDS=true
FILTER_FORWARD_IMAGE=false
FILTER_APPEND_OP_TO_KEY=true
FILTER_FORWARD_UPSTREAM_DATA=true
FILTER_DEBUG=false

Sample Data

The demo script automatically creates sample JSON event files (input/events_1.json and input/events_2.json) with test data including:

  • sheeps: Numeric values for counting
  • states: Categorical values (open/closed)
  • temperature: Numeric values for averaging
  • pressure, humidity: Additional metrics
  • valid: Boolean values for logical operations

Expected Output

When running the demo, you'll see output like this in the terminal:

{'cam_1': (Frame(960x540xBGR-jpg), {'meta': {'id': '1', 'sheeps': 4, 'states': 'open', 'temperature': 25.5}}),
 'cam_2': (Frame(960x540xBGR-jpg), {'meta': {'id': '1', 'sheeps': 4, 'states': 'open', 'temperature': 25.5}}),
 'main': (Frame(None), {'_meta': {'frame_count': 1, 'sources': 2},
         'meta': {'sheeps_sum': 8, 'states_distinct': ['open'], 'temperature_avg': 25.5}})}

This shows:

  • cam_1, cam_2: Original input frames (when forward_upstream_data=True)
  • main: Aggregated frame with statistical summaries
  • _meta: Metadata about frame count and source count

Install

In order to run the filter locally or build/publish the Python wheel we need to install properly:

virtualenv venv
source venv/bin/activate
make install

Run locally

Now to run the filter locally do:

make run

Then navigate to http://localhost:8000 and you should see the video looping.

Unit tests

It is assumed you have installed the packages necessary to run locally (not in docker). Run:

make test

Documentation

For more detailed information, configuration examples, and advanced usage scenarios, see the comprehensive documentation.

License

See LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

filter_aggregator-1.1.6-py3-none-any.whl (11.3 kB view details)

Uploaded Python 3

File details

Details for the file filter_aggregator-1.1.6-py3-none-any.whl.

File metadata

File hashes

Hashes for filter_aggregator-1.1.6-py3-none-any.whl
Algorithm Hash digest
SHA256 aee5133be9a695305b9088a54d299f005611e867c6a03ca2478763ee4cc97982
MD5 f33a5e54a1490ef288ceac7ec56e5605
BLAKE2b-256 1d6df82829974ee60688ca06187d885f90aae096f9f5bcbc25dc8fe1616f45ff

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page