Skip to main content

Multi-GPU and distributed-memory algorithms

Project description

RapidsMPF

Collection of multi-GPU, distributed memory algorithms.

Getting started

Building rapidsmpf from source is recommended when running a nightly/upstream versions, since dependencies on non-ABI-stable libraries (e.g., pylibcudf) could cause temporary breakage leading to issues such as segmentation faults. Stable versions can be installed from conda or pip packages.

Build from source

Clone rapidsmpf and install the dependencies in a conda environment:

git clone https://github.com/rapidsai/rapidsmpf.git
cd rapidsmpf

# Choose a environment file that match your system.
mamba env create --name rapidsmpf-dev --file conda/environments/all_cuda-130_arch-$(uname -m).yaml

# Build
./build.sh

Debug build

Debug builds can be produced by adding the -g flag:

./build.sh -g
AddressSanitizer-enabled build

Enabling the AddressSanitizer is also possible with the --asan flag:

./build.sh -g --asan

C++ code built with AddressSanitizer should simply work, but there are caveats for CUDA and Python code. Any CUDA code executing with AddressSanitizer requires protect_shadow_gap=0, which can be set via an environment variable:

ASAN_OPTIONS=protect_shadow_gap=0

On the other hand, Python may require LD_PRELOAD to be set so that the AddressSanitizer is loaded before Python. On a conda environment, for example, there is usually a $CONDA_PREFIX/lib/libasan.so, and thus the application may be launched as follows:

LD_PRELOAD=$CONDA_PREFIX/lib/libasan.so python ...

Python applications using CUDA will require setting both environment variables described above

MPI

Run the test suite using MPI:

# When using OpenMP, we need to enable CUDA support.
export OMPI_MCA_opal_cuda_support=1

# Run the suite using two MPI processes.
mpirun -np 2 cpp/build/gtests/mpi_tests

# Alternatively
cd cpp/build && ctest -R mpi_tests_2

We can also run the shuffle benchmark. To assign each MPI rank its own GPU, we use a binder script:

# The binder script requires numactl `mamba install numactl`
wget https://raw.githubusercontent.com/LStuber/binding/refs/heads/master/binder.sh
chmod a+x binder.sh
mpirun -np 2 ./binder.sh cpp/build/benchmarks/bench_shuffle

UCX

The UCX test suite uses, for convenience, MPI to bootstrap, therefore we need to launch UCX tests with mpirun. Run the test suite using UCX:

# Run the suite using two processes.
mpirun -np 2 cpp/build/gtests/ucxx_tests

rrun - Distributed Launcher

RapidsMPF includes rrun, a lightweight launcher that eliminates the MPI dependency for multi-GPU workloads. This is particularly useful for development, testing, and environments where MPI is not available.

Single-Node Usage

# Build rrun
cd cpp/build
cmake --build . --target rrun

# Launch 2 ranks in the local node
./tools/rrun -n 2 ./benchmarks/bench_comm -C ucxx -O all-to-all

# With verbose output and specific GPUs
./tools/rrun -v -n 4 -g 0,1,2,3 ./benchmarks/bench_comm -C ucxx

Algorithms

Table Shuffle Service

Example of a MPI program that uses the shuffler:

#include <vector>

#include <mpi.h>
#include <unistd.h>

#include <rapidsmpf/buffer/packed_data.hpp>
#include <rapidsmpf/communicator/mpi.hpp>
#include <rapidsmpf/error.hpp>
#include <rapidsmpf/integrations/cudf/partition.hpp>
#include <rapidsmpf/shuffler/shuffler.hpp>

#include "../benchmarks/utils/random_data.hpp"

// An example of how to use the shuffler.
int main(int argc, char** argv) {
    // In this example we use the MPI backed. For convenience, rapidsmpf provides an
    // optional MPI-init function that initialize MPI with thread support.
    rapidsmpf::mpi::init(&argc, &argv);

    // First, we have to create a Communicator, which we will use throughout the example.
    // Notice, if you want to do multiple shuffles concurrently, each shuffle should use
    // its own Communicator backed by its own MPI communicator.
    std::shared_ptr<rapidsmpf::Communicator> comm =
        std::make_shared<rapidsmpf::MPI>(MPI_COMM_WORLD);

    // The Communicator provides a logger.
    auto& log = comm->logger();

    // We will use the same stream and memory resource throughout the example.
    rmm::cuda_stream_view stream = cudf::get_default_stream();
    rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref();

    // As input data, we use a helper function from the benchmark suite. It creates a
    // random cudf table with 2 columns and 100 rows. In this example, each MPI rank
    // creates its own local input and we only have one input per rank but each rank
    // could take any number of inputs.
    cudf::table local_input = random_table(2, 100, 0, 10, stream, mr);

    // The total number of inputs equals the number of ranks, in this case.
    rapidsmpf::shuffler::PartID const total_num_partitions = comm->nranks();

    // We create a new shuffler instance, which represents a single shuffle. It takes
    // a Communicator, the total number of partitions, and a "owner function", which
    // map partitions to their destination ranks. All ranks must use the same owner
    // function, in this example we use the included round-robin owner function.
    rapidsmpf::shuffler::Shuffler shuffler(
        comm, total_num_partitions, rapidsmpf::shuffler::Shuffler::round_robin, stream, mr
    );

    // It is our own responsibility to partition and pack (serialize) the input for
    // the shuffle. The shuffler only handles raw host and device buffers. However, it
    // does provide a convenience function that hash partition a cudf table and packs
    // each partition. The result is a mapping of `PartID`, globally unique partition
    // identifiers, to their packed partitions.
    std::unordered_map<rapidsmpf::shuffler::PartID, rapidsmpf::PackedData> packed_inputs =
        rapidsmpf::partition_and_pack(
            local_input,
            {0},
            total_num_partitions,
            cudf::hash_id::HASH_MURMUR3,
            cudf::DEFAULT_HASH_SEED,
            stream,
            mr
        );

    // Now, we can insert the packed partitions into the shuffler. This operation is
    // non-blocking and we can continue inserting new input partitions. E.g., a pipeline
    // could read, hash-partition, pack, and insert, one parquet-file at a time while the
    // distributed shuffle is being processed underneath.
    shuffler.insert(std::move(packed_inputs));

    // When we are finished inserting to a specific partition, we tell the shuffler.
    // Again, this is non-blocking and should be done as soon as we known that we don't
    // have more inputs for a specific partition. In this case, we are finished with all
    // partitions.
    for (rapidsmpf::shuffler::PartID i = 0; i < total_num_partitions; ++i) {
        shuffler.insert_finished(i);
    }

    // Vector to hold the local results of the shuffle operation.
    std::vector<std::unique_ptr<cudf::table>> local_outputs;

    // Wait for and process the shuffle results for each partition.
    while (!shuffler.finished()) {
        // Block until a partition is ready and retrieve its partition ID.
        rapidsmpf::shuffler::PartID finished_partition = shuffler.wait_any();

        // Extract the finished partition's data from the Shuffler.
        auto packed_chunks = shuffler.extract(finished_partition);

        // Unpack (deserialize) and concatenate the chunks into a single table using a
        // convenience function.
        local_outputs.push_back(
            rapidsmpf::unpack_and_concat(std::move(packed_chunks))
        );
    }
    // At this point, `local_outputs` contains the local result of the shuffle.
    // Let's log the result.
    log.print(("Finished shuffle with ", local_outputs.size(), " local output partitions");

    // Shutdown the Shuffler explicitly or let it go out of scope for cleanup.
    shuffler.shutdown();

    // Finalize the execution, `RAPIDSMPF_MPI` is a convenience macro that
    // checks for MPI errors.
    RAPIDSMPF_MPI(MPI_Finalize());
}

RapidsMPF Configuration Options

RapidsMPF can be configured using a dictionary of options, which may be populated via environment variables. All dictionary keys are automatically converted to lowercase.

Each configuration option includes:

  • Name: The key used in the configuration dictionary.
  • Environment Variable: The corresponding environment variable name.
  • Description: Describes what the option controls, including accepted values.

[!NOTE] Environment variable names are always uppercase and prefixed with RAPIDSMPF_.

Typically, it is up to the user to read environment variables using code such as:

options = Options()
options.insert_if_absent(get_environment_variables())

However, Dask automatically reads environment variables for any options not set explicitly when calling bootstrap_dask_cluster().

It is always explicit in C++, use something like:

  rapidsmpf::config::Options options{rapidsmpf::config::get_environment_variables()};

Available Options

General

  • log
    • Environment Variable: RAPIDSMPF_LOG
    • Default: WARN
    • Description: Controls the logging verbosity level. Valid values are:
      • NONE: Disable all logging.
      • PRINT: General print messages.
      • WARN: Warning messages (default).
      • INFO: Informational messages.
      • DEBUG: Debug-level messages.
      • TRACE: Fine-grained trace-level messages.

Dask Integration

  • dask_spill_device

    • Environment Variable: RAPIDSMPF_DASK_SPILL_DEVICE
    • Default: 0.50
    • Description: GPU memory limit for shuffling as a fraction of total device memory.
  • dask_oom_protection

    • Environment Variable: RAPIDSMPF_DASK_OOM_PROTECTION
    • Default: False
    • Description: Enable out-of-memory protection by using managed memory when the device memory pool raises OOM errors.
  • dask_periodic_spill_check

    • Environment Variable: RAPIDSMPF_DASK_PERIODIC_SPILL_CHECK
    • Default: 1e-3
    • Description: Enable periodic spill checks. A dedicated thread continuously checks and perform spilling based on the current available memory as reported by the buffer resource. The value of dask_periodic_spill_check is used as the pause between checks (in seconds). Use "disabled" to disable periodic spill checks.
  • dask_statistics

    • Environment Variable: RAPIDSMPF_DASK_STATISTICS
    • Default: False
    • Description: Enable RapidsMPF statitistics collection.
  • dask_print_statistics

    • Environment Variable: RAPIDSMPF_DASK_STATISTICS
    • Default: True
    • Description: Print RapidsMPF statistics to stdout on Dask Worker shutdown when dask_statistics is enabled.
  • dask_staging_spill_buffer

    • Environment Variable: RAPIDSMPF_DASK_STAGING_SPILL_BUFFER
    • Default: 128 MiB
    • Description: Size of the intermediate staging buffer (in bytes) used for device-to-host spilling. This temporary buffer is allocated on the device to reduce memory pressure when transferring Python-managed GPU objects during Dask spilling. Use disabled to skip allocation of the staging buffer.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distributions

If you're not sure about the file name format, learn more about wheel file names.

rapidsmpf_cu12-25.12.0-cp313-cp313-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl (1.5 MB view details)

Uploaded CPython 3.13manylinux: glibc 2.24+ x86-64manylinux: glibc 2.28+ x86-64

rapidsmpf_cu12-25.12.0-cp313-cp313-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl (1.4 MB view details)

Uploaded CPython 3.13manylinux: glibc 2.24+ ARM64manylinux: glibc 2.28+ ARM64

rapidsmpf_cu12-25.12.0-cp312-cp312-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl (1.5 MB view details)

Uploaded CPython 3.12manylinux: glibc 2.24+ x86-64manylinux: glibc 2.28+ x86-64

rapidsmpf_cu12-25.12.0-cp312-cp312-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl (1.4 MB view details)

Uploaded CPython 3.12manylinux: glibc 2.24+ ARM64manylinux: glibc 2.28+ ARM64

rapidsmpf_cu12-25.12.0-cp311-cp311-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl (1.5 MB view details)

Uploaded CPython 3.11manylinux: glibc 2.24+ x86-64manylinux: glibc 2.28+ x86-64

rapidsmpf_cu12-25.12.0-cp311-cp311-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl (1.4 MB view details)

Uploaded CPython 3.11manylinux: glibc 2.24+ ARM64manylinux: glibc 2.28+ ARM64

rapidsmpf_cu12-25.12.0-cp310-cp310-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl (1.5 MB view details)

Uploaded CPython 3.10manylinux: glibc 2.24+ x86-64manylinux: glibc 2.28+ x86-64

rapidsmpf_cu12-25.12.0-cp310-cp310-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl (1.4 MB view details)

Uploaded CPython 3.10manylinux: glibc 2.24+ ARM64manylinux: glibc 2.28+ ARM64

File details

Details for the file rapidsmpf_cu12-25.12.0-cp313-cp313-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl.

File metadata

File hashes

Hashes for rapidsmpf_cu12-25.12.0-cp313-cp313-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl
Algorithm Hash digest
SHA256 c2156beb0d82595c9e68e2b57d5cfb225db1ffbcecde3703a4d61e21f1ee450a
MD5 00bc2cbca254fb0135c6fab0bb81dda7
BLAKE2b-256 5f8c3fd18c1e97bca7e1733a317a36a46513bbf308a03272c479dd0f8cb0f106

See more details on using hashes here.

File details

Details for the file rapidsmpf_cu12-25.12.0-cp313-cp313-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl.

File metadata

File hashes

Hashes for rapidsmpf_cu12-25.12.0-cp313-cp313-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl
Algorithm Hash digest
SHA256 bb93aa40989152cbe75b70f647fc8beb1e965748821f77e926d429b36b0e6874
MD5 0caf7afbfabad62ddb4a4b463f20eba6
BLAKE2b-256 dbf163ab7832f82ee3abe1a40ae334a87f0b01bcaa45f458c113b3ccc6ba192c

See more details on using hashes here.

File details

Details for the file rapidsmpf_cu12-25.12.0-cp312-cp312-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl.

File metadata

File hashes

Hashes for rapidsmpf_cu12-25.12.0-cp312-cp312-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl
Algorithm Hash digest
SHA256 66d8d58039e9d7134d289d9e896e4537f3bc1d52b424287babd3171e35ba3a8c
MD5 a7e632912aec5cd6bf4bb85af47fc8e3
BLAKE2b-256 eec80af2f4548dd62efa78a24b1b4ed45513a2ccb48884a4dc94e284542594cc

See more details on using hashes here.

File details

Details for the file rapidsmpf_cu12-25.12.0-cp312-cp312-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl.

File metadata

File hashes

Hashes for rapidsmpf_cu12-25.12.0-cp312-cp312-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl
Algorithm Hash digest
SHA256 6b87a2433e210698cb1218b5bde246a8b5c935e084a8c4cbb48777cf19f23994
MD5 fc148f7db0c414848dd8b5779de67464
BLAKE2b-256 7c5169e7e6a26a42ffdb9adecb8b61fd3f4ef4daf909db62db3322bac0a4e8e0

See more details on using hashes here.

File details

Details for the file rapidsmpf_cu12-25.12.0-cp311-cp311-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl.

File metadata

File hashes

Hashes for rapidsmpf_cu12-25.12.0-cp311-cp311-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl
Algorithm Hash digest
SHA256 d6a956d1c7d5c28291c90505f72e02fb6f66f304b78f575465c46d6ce5f6c8b5
MD5 cbf37b021e52fc8f6c1b4a3143821a54
BLAKE2b-256 fbf92a0e57be350911123f96838e2506c0b7b42cffdfebfb700298f29dd34daa

See more details on using hashes here.

File details

Details for the file rapidsmpf_cu12-25.12.0-cp311-cp311-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl.

File metadata

File hashes

Hashes for rapidsmpf_cu12-25.12.0-cp311-cp311-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl
Algorithm Hash digest
SHA256 d0374ccb2fc4bec0711af5cea4cbb8f1a27a973e6aa77dfda8dfb1c63a263fc0
MD5 8a4c5849d66547c95e577bb3408487b9
BLAKE2b-256 e7c52dd4654f2fd8c9c9770d1aae05696f2c34807608a7b4fd34432420cab865

See more details on using hashes here.

File details

Details for the file rapidsmpf_cu12-25.12.0-cp310-cp310-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl.

File metadata

File hashes

Hashes for rapidsmpf_cu12-25.12.0-cp310-cp310-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl
Algorithm Hash digest
SHA256 1bb40956050a755b1758c00e704dcbc32ef2c650075169c219a03ef3ac84ebb5
MD5 08034191af7ba8ea0c0f124a0dd8f669
BLAKE2b-256 22d24381d174c7885857c6e7b26c992734efa281d88a382aa266f55f9052d534

See more details on using hashes here.

File details

Details for the file rapidsmpf_cu12-25.12.0-cp310-cp310-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl.

File metadata

File hashes

Hashes for rapidsmpf_cu12-25.12.0-cp310-cp310-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl
Algorithm Hash digest
SHA256 9b32f416fe9bd99cdee8950d275c356151d4cd7e0316a0f9b18e5143766d3462
MD5 4c2aa242b141b9e361c3de2bdb604cbe
BLAKE2b-256 885c2ac6299ded41790bc305192742572049281bc86e0a0610c7695570b47ee1

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page