Dynamic MapReduce framework for data processing
Project description
DDR - Dynamic MapReduce Framework
A flexible framework for distributed data processing using MapReduce patterns.
Installation
Prerequisites
This project requires Python 3.13+ and uses conda for dependency management. We recommend using the provided environment.yml file to create a consistent development environment.
Setting up the Conda Environment
The project includes an environment.yml file with the following dependencies:
name: ddr
channels:
- conda-forge
dependencies:
- coffea=>2025.3.0
- fsspec-xrootd=>0.5.1
- ndcctools>=7.15.8
- python=>3.12
- rich=>13.9.4
- uproot=>5.6.0
- xrootd=>5.8.1
- setuptools<81
-
Create the conda environment from the provided environment.yml file:
conda env create -f environment.yml
-
Activate the environment:
conda activate ddr
-
Verify the installation:
python --version # Should show Python 3.13.2 conda list | grep -E "(coffea|ndcctools)" # Should show the installed packages
From PyPI
pip install dynamic_data_reduction
Installing from Source
Once you have the conda environment set up:
# Clone the repository
git clone https://github.com/cooperative-computing-lab/dynamic_data_reduction.git
cd dynamic_data_reduction
# Activate the conda environment (if not already active)
conda activate ddr
# Install the package in development mode
pip install -e .
Quick Start
Minimal toy example to get started:
from dynamic_data_reduction import DynamicDataReduction
import ndcctools.taskvine as vine
import getpass
# Simple data: process two datasets
data = {
"datasets": {
"numbers": {"values": [1, 2, 3, 4, 5]},
"more_numbers": {"values": [10, 20, 30]}
}
}
# Define functions
def preprocess(dataset_info, **kwargs):
for val in dataset_info["values"]:
yield (val, 1)
def postprocess(val, **kwargs):
return val # Just return the value
def processor(x):
return x * 2 # Double each number
def reducer(a, b):
return a + b # Sum the results
# Run
mgr = vine.Manager(port=[9123, 9129], name=f"{getpass.getuser()}-quick-start-ddr")
print(f"Manager started on port {mgr.port}")
ddr = DynamicDataReduction(mgr,
data=data,
source_preprocess=preprocess,
source_postprocess=postprocess,
processors=processor,
accumulator=reducer)
# Use local workers, condor, slurm, or sge for scale
workers = vine.Factory("local", manager=mgr)
workers.max_workers = 2
workers.min_workers = 0
workers.cores = 4
workers.memory = 2000
workers.disk = 8000
with workers:
result = ddr.compute()
print(f"Result: {result}") # Expected: (1+2+3+4+5)*2 + (10+20+30)*2 = 150
Usage
- General use example: examples/simple/simple-example.py
- Using Coffea Processors Classes Directly: examples/coffea_processor/example_with_preprocess.py
- Coffea use in analysis: examples/cortado/ddr_cortado.py
License
This project is licensed under the Apache License 2.0 - see the LICENSE file for details.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file dynamic_data_reduction-2025.11.3.tar.gz.
File metadata
- Download URL: dynamic_data_reduction-2025.11.3.tar.gz
- Upload date:
- Size: 27.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8c0dc23996210d1b8f4e566ae5fce3fc93f4eb5caa998758cc98515621645f2a
|
|
| MD5 |
368f08d23ab813a8ba79181176c38da7
|
|
| BLAKE2b-256 |
9c6206722e66706dd9ea99b1ac4fc5871a44a4ef3ce5130f5e37d64ea2f86094
|
File details
Details for the file dynamic_data_reduction-2025.11.3-py3-none-any.whl.
File metadata
- Download URL: dynamic_data_reduction-2025.11.3-py3-none-any.whl
- Upload date:
- Size: 24.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
553e39480ccd5aef063471ed8cdc2548b2ab01fd15a6f28802dc0b3ccc1daebb
|
|
| MD5 |
cab1769e09a1e47d866b39894dd75069
|
|
| BLAKE2b-256 |
8f16141af2ad36aaa8acae1d9334f3373889bc2640fb81880f34909c191bac9c
|