Skip to main content

Dynamic MapReduce framework for data processing

Project description

DDR - Dynamic MapReduce Framework

A flexible framework for distributed data processing using MapReduce patterns.

Installation

Prerequisites

This project requires Python 3.13+ and uses conda for dependency management. We recommend using the provided environment.yml file to create a consistent development environment.

Setting up the Conda Environment

The project includes an environment.yml file with the following dependencies:

name: ddr
channels:
  - conda-forge
dependencies:
  - coffea=>2025.3.0
  - fsspec-xrootd=>0.5.1
  - ndcctools>=7.15.8
  - python=>3.12
  - rich=>13.9.4
  - uproot=>5.6.0
  - xrootd=>5.8.1
  - setuptools<81
  1. Create the conda environment from the provided environment.yml file:

    conda env create -f environment.yml
    
  2. Activate the environment:

    conda activate ddr
    
  3. Verify the installation:

    python --version  # Should show Python 3.13.2
    conda list | grep -E "(coffea|ndcctools)"  # Should show the installed packages
    

From PyPI

pip install dynamic_data_reduction

Installing from Source

Once you have the conda environment set up:

# Clone the repository
git clone https://github.com/cooperative-computing-lab/dynamic_data_reduction.git
cd dynamic_data_reduction

# Activate the conda environment (if not already active)
conda activate ddr

# Install the package in development mode
pip install -e .

Quick Start

Minimal toy example to get started:

from dynamic_data_reduction import DynamicDataReduction
import ndcctools.taskvine as vine
import getpass

# Simple data: process two datasets
data = {
    "datasets": {
        "numbers": {"values": [1, 2, 3, 4, 5]},
        "more_numbers": {"values": [10, 20, 30]}
    }
}

# Define functions
def preprocess(dataset_info, **kwargs):
    for val in dataset_info["values"]:
        yield (val, 1)

def postprocess(val, **kwargs):
    return val  # Just return the value

def processor(x):
    return x * 2  # Double each number

def reducer(a, b):
    return a + b  # Sum the results

# Run
mgr = vine.Manager(port=[9123, 9129], name=f"{getpass.getuser()}-quick-start-ddr")
print(f"Manager started on port {mgr.port}")
ddr = DynamicDataReduction(mgr,
                           data=data,
                           source_preprocess=preprocess, 
                           source_postprocess=postprocess,
                           processors=processor, 
                           accumulator=reducer)

# Use local workers, condor, slurm, or sge for scale
workers = vine.Factory("local", manager=mgr)
workers.max_workers = 2
workers.min_workers = 0
workers.cores = 4
workers.memory = 2000
workers.disk = 8000
with workers:
    result = ddr.compute()

print(f"Result: {result}")  # Expected: (1+2+3+4+5)*2 + (10+20+30)*2 = 150

Usage

License

This project is licensed under the Apache License 2.0 - see the LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dynamic_data_reduction-2025.10.2.tar.gz (27.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dynamic_data_reduction-2025.10.2-py3-none-any.whl (24.6 kB view details)

Uploaded Python 3

File details

Details for the file dynamic_data_reduction-2025.10.2.tar.gz.

File metadata

File hashes

Hashes for dynamic_data_reduction-2025.10.2.tar.gz
Algorithm Hash digest
SHA256 253b5c3c272c3f75da95944cf1411055cbc51e6eb4a45e00c157871d304f78d7
MD5 61d33d6bfd1d2f22b7a27b5de905d5ac
BLAKE2b-256 90089c3001f3e96701d0e30be72df65b8b5d0dd41336a6e50a52738f15595f0e

See more details on using hashes here.

File details

Details for the file dynamic_data_reduction-2025.10.2-py3-none-any.whl.

File metadata

File hashes

Hashes for dynamic_data_reduction-2025.10.2-py3-none-any.whl
Algorithm Hash digest
SHA256 bb6ec71cdac2d99bc458d895658150b062ee8023fd6431fd5586d097fe594455
MD5 ddf5027d5383617a1558ec1636cc5b67
BLAKE2b-256 cb2f92be4b86e3d02d6d93a432ef3e4064fb171de479d7a534f3262cceaebfca

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page