No project description provided
Project description
Aggregator
The Aggregator is a powerful filter that aggregates numeric and categorical data from upstream filters. It supports various aggregation operations and can work with multiple upstream producers and downstream consumers.
Features
- Multiple Aggregation Operations: Sum, Average, Min, Max, Count, Count Distinct, Median, Standard Deviation, Any, All, Mode operations, and Distinct value collection
- Flexible Configuration: Support for nested fields using dot notation, optional forwarding of extra fields, image forwarding capability, upstream data forwarding option, and customizable output key naming
- Use Cases: Aggregating metrics from multiple sources, computing statistics across multiple frames, collecting unique values from different sources, combining data from parallel processing pipelines, data normalization and consolidation
How It Works
The Aggregator processes multiple input frames and performs statistical aggregations on specified fields. Here's how it works:
- Input Processing: Receives frames from multiple upstream sources (e.g., different cameras, sensors)
- Data Extraction: Extracts values from specified fields using dot notation (e.g.,
meta.temperature) - Aggregation: Applies statistical operations (sum, avg, min, max, etc.) across all input frames
- Output Generation: Creates a main aggregated frame plus optionally forwards original source frames
Example Pipeline
graph TD
A[Video Input] --> B[Echo Filter 1]
A --> C[Echo Filter 2]
B --> D[FilterAggregator]
C --> D
D --> E[Web Visualization]
B -.->|"cam_1 data"| D
C -.->|"cam_2 data"| D
D -.->|"aggregated + original frames"| E
The aggregator receives data from both echo filters and combines them into statistical summaries.
Usage
Using the Demo Script
The scripts/filter_usage.py script demonstrates the Aggregator in action with a complete pipeline:
# Run with default configuration
python scripts/filter_usage.py
# Run with custom output path
python scripts/filter_usage.py --output_path output/my_results.json
# Run with custom video input
VIDEO_INPUT=path/to/your/video.mp4 python scripts/filter_usage.py
The script creates a complete pipeline with:
- Video Input: Reads video frames from a file
- Echo Filters: Two filters that cycle through JSON event data and attach it to video frames
- FilterAggregator: Aggregates data from both echo filters
- Web Visualization: Serves results at
http://localhost:8002
Configuration
The Aggregator can be configured using environment variables:
FILTER_AGGREGATIONS='{"meta.sheeps":"sum", "meta.door_time":"avg", "meta.states":"distinct"}'
FILTER_FORWARD_EXTRA_FIELDS=true
FILTER_FORWARD_IMAGE=false
FILTER_APPEND_OP_TO_KEY=true
FILTER_FORWARD_UPSTREAM_DATA=true
FILTER_DEBUG=false
Sample Data
The demo script automatically creates sample JSON event files (input/events_1.json and input/events_2.json) with test data including:
sheeps: Numeric values for countingstates: Categorical values (open/closed)temperature: Numeric values for averagingpressure,humidity: Additional metricsvalid: Boolean values for logical operations
Expected Output
When running the demo, you'll see output like this in the terminal:
{'cam_1': (Frame(960x540xBGR-jpg), {'meta': {'id': '1', 'sheeps': 4, 'states': 'open', 'temperature': 25.5}}),
'cam_2': (Frame(960x540xBGR-jpg), {'meta': {'id': '1', 'sheeps': 4, 'states': 'open', 'temperature': 25.5}}),
'main': (Frame(None), {'_meta': {'frame_count': 1, 'sources': 2},
'meta': {'sheeps_sum': 8, 'states_distinct': ['open'], 'temperature_avg': 25.5}})}
This shows:
cam_1,cam_2: Original input frames (whenforward_upstream_data=True)main: Aggregated frame with statistical summaries_meta: Metadata about frame count and source count
IMPORTANT! This repo is not meant to be cloned, it is meant to be used as a template for your own repo.
IMPORTANT! You need to get access to GCP and ensure that gcloud CLI installed and authenticated with your Google SSO credentials.
IMPORTANT! Do NOT try to run the templateize script under Windows.
IMPORTANT! make run is only for development, NOT for deployment to customers, for that please create a docker-compose.yaml to run with docker.
The VERY FIRST thing you MUST do before even creating venv or any other files in this drectory is run the templatize script:
./templatize
Note: running templatize will:
- Overwrite
.github/workflows/ci.yamlwith.github/disabled-workflows/ci.yaml, effictevly replacing thetemplate's CI with thefilter's CI.template's CI tests the templatization, whilefilter's CI tests, builds and publishes the actual filter and its docs. - Replace all place holders with the proper repo/filter names
Requirements
To follow these instructions there are a few prerequisites. You must:
- Be authenticated to GAR:
gcloud auth login
gcloud auth application-default login
- Set your gcloud project to plainsightai-prod and configure docker to use gcloud:
gcloud config set project plainsightai-prod
gcloud auth configure-docker us-west1-docker.pkg.dev
It is assumed you will be running this on a GPU. If not then you will have to comment out the deploy: section in the docker-compose.yaml file and the unit test will fail since it compares against GPU numbers.
Install
In order to run the filter locally or build/publish the Python wheel we need to install properly:
virtualenv venv
source venv/bin/activate
make install
Run locally
Now to run the filter locally do:
make run
Then navigate to http://localhost:8000 and you should see the video looping.
Run in docker
IMPORTANT! If your filter uses the GPU and make compose doesn't autmatically add it to the docker-compose.yaml then make sure to add the following to your filter's section in the compose file:
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: all
capabilities: [gpu]
First, build the filter docker image:
make build-image
If you changed the PIPELINE in the Makefile (if not then skip this step), then rebuild the docker-compose.yaml (you may have to tweak the generated docker-compose.yaml):
make compose
Now run it:
make run-image
Again, navigating to http://localhost:8000 will show you the video.
Unit tests
It is assumed you have installed the packages necessary to run locally (not in docker). Run:
make test
Publish releases
- Ensure the
VERSIONfile at root has a production semver tag (i.e.v1.2.3)- If you intend to release a non-production version such as a development, release candidate or an internal release then add a build number and a classifcation to your version tag (i.e.
v1.2.3.4-dev,v1.2.3.0-rcorv1.2.3.47-int)
- If you intend to release a non-production version such as a development, release candidate or an internal release then add a build number and a classifcation to your version tag (i.e.
- Ensure the version tag of newest entry in
RELEASE.mdmatches the tag inVERSION- Important: Our releases are documentation driven. Not updating
RELEASE.mdwill not trigger a release. Filters cannot be merged to main unlessRELEASE.mdis updated. TheRELEASE.mdfile is validated by our CI and requires version enteries to be in the correct decending order.
- Important: Our releases are documentation driven. Not updating
- Simple merge to main. When a new version is detected in
RELEASE.mdthe CI will:- Build and publish the docker image to the GAR OCI registery
- Build and publish the python wheel to the GAR python registery
- Push the docs to both production and development docuemntation sites
Publish documentation
- Renaming
_PUBLISHING_PATHtoPUBLISHING_PATHwill activate automatic publishing - When automatic publishing is active,
RELEASE.mdfrom at the repository's root and thedocsdirectory will be merged and pushed to thePlainsightAI/docs-prodandPlainsightAI/docs-devon every new release - Upon receiving a commit,
PlainsightAI/docs-prodwill compile a docs preview and open a PR. Once the PR is manually merged, the docs will be published to the production documentation site (https://plainsightai.info) PlainsightAI/docs-devwill compile a docs preview and open a PR. The PR will be automatically merged and the docs will be published to the internal documentation site (https://dev.plainsightai.info)- The content of
docs/~internalwill only be published to the internal documentation website and excluded from production - Any
[Unreleased]or non production release info inRELEASE.mdwill only be published to the internal docs website. The production docs site will only contain production tags (i.e. v1.2.3) and exclude non-production tags (i.e.v1.2.3.4-dev,v1.2.3.0-rcorv1.2.3.47-int)
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file filter_aggregator-1.1.1-py3-none-any.whl.
File metadata
- Download URL: filter_aggregator-1.1.1-py3-none-any.whl
- Upload date:
- Size: 12.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e558fd8adc2b1bfda8b7ad951ae81a1be2e4f871801ee6a2fc145a3eb696aaaa
|
|
| MD5 |
90ee5807d9de665e4498f731d64c9285
|
|
| BLAKE2b-256 |
1f8677a0b9cda8107cd067217b939542a9c2aa3d53cd66e18d85b25818a4e39f
|