Skip to main content

Multi-GPU and distributed-memory algorithms

Project description

RapidsMPF

Collection of multi-GPU, distributed memory algorithms.

Getting started

Building rapidsmpf from source is recommended when running a nightly/upstream versions, since dependencies on non-ABI-stable libraries (e.g., pylibcudf) could cause temporary breakage leading to issues such as segmentation faults. Stable versions can be installed from conda or pip packages.

Build from source

Clone rapidsmpf and install the dependencies in a conda environment:

git clone https://github.com/rapidsai/rapidsmpf.git
cd rapidsmpf

# Choose a environment file that match your system.
mamba env create --name rapidsmpf-dev --file conda/environments/all_cuda-130_arch-$(uname -m).yaml

# Build
./build.sh

Debug build

Debug builds can be produced by adding the -g flag:

./build.sh -g
AddressSanitizer-enabled build

Enabling the AddressSanitizer is also possible with the --asan flag:

./build.sh -g --asan

C++ code built with AddressSanitizer should simply work, but there are caveats for CUDA and Python code. Any CUDA code executing with AddressSanitizer requires protect_shadow_gap=0, which can be set via an environment variable:

ASAN_OPTIONS=protect_shadow_gap=0

On the other hand, Python may require LD_PRELOAD to be set so that the AddressSanitizer is loaded before Python. On a conda environment, for example, there is usually a $CONDA_PREFIX/lib/libasan.so, and thus the application may be launched as follows:

LD_PRELOAD=$CONDA_PREFIX/lib/libasan.so python ...

Python applications using CUDA will require setting both environment variables described above

MPI

Run the test suite using MPI:

# When using OpenMP, we need to enable CUDA support.
export OMPI_MCA_opal_cuda_support=1

# Run the suite using two MPI processes.
mpirun -np 2 cpp/build/gtests/mpi_tests

# Alternatively
cd cpp/build && ctest -R mpi_tests_2

We can also run the shuffle benchmark. To assign each MPI rank its own GPU, we use a binder script:

# The binder script requires numactl `mamba install numactl`
wget https://raw.githubusercontent.com/LStuber/binding/refs/heads/master/binder.sh
chmod a+x binder.sh
mpirun -np 2 ./binder.sh cpp/build/benchmarks/bench_shuffle

UCX

The UCX test suite uses, for convenience, MPI to bootstrap, therefore we need to launch UCX tests with mpirun. Run the test suite using UCX:

# Run the suite using two processes.
mpirun -np 2 cpp/build/gtests/ucxx_tests

rrun - Distributed Launcher

RapidsMPF includes rrun, a lightweight launcher that eliminates the MPI dependency for multi-GPU workloads. This is particularly useful for development, testing, and environments where MPI is not available.

Single-Node Usage

# Build rrun
cd cpp/build
cmake --build . --target rrun

# Launch 2 ranks in the local node
./tools/rrun -n 2 ./benchmarks/bench_comm -C ucxx -O all-to-all

# With verbose output and specific GPUs
./tools/rrun -v -n 4 -g 0,1,2,3 ./benchmarks/bench_comm -C ucxx

Algorithms

Table Shuffle Service

Example of a MPI program that uses the shuffler:

#include <vector>

#include <mpi.h>
#include <unistd.h>

#include <rapidsmpf/buffer/packed_data.hpp>
#include <rapidsmpf/communicator/mpi.hpp>
#include <rapidsmpf/error.hpp>
#include <rapidsmpf/integrations/cudf/partition.hpp>
#include <rapidsmpf/shuffler/shuffler.hpp>

#include "../benchmarks/utils/random_data.hpp"

// An example of how to use the shuffler.
int main(int argc, char** argv) {
    // In this example we use the MPI backed. For convenience, rapidsmpf provides an
    // optional MPI-init function that initialize MPI with thread support.
    rapidsmpf::mpi::init(&argc, &argv);

    // First, we have to create a Communicator, which we will use throughout the example.
    // Notice, if you want to do multiple shuffles concurrently, each shuffle should use
    // its own Communicator backed by its own MPI communicator.
    std::shared_ptr<rapidsmpf::Communicator> comm =
        std::make_shared<rapidsmpf::MPI>(MPI_COMM_WORLD);

    // The Communicator provides a logger.
    auto& log = comm->logger();

    // We will use the same stream and memory resource throughout the example.
    rmm::cuda_stream_view stream = cudf::get_default_stream();
    rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref();

    // As input data, we use a helper function from the benchmark suite. It creates a
    // random cudf table with 2 columns and 100 rows. In this example, each MPI rank
    // creates its own local input and we only have one input per rank but each rank
    // could take any number of inputs.
    cudf::table local_input = random_table(2, 100, 0, 10, stream, mr);

    // The total number of inputs equals the number of ranks, in this case.
    rapidsmpf::shuffler::PartID const total_num_partitions = comm->nranks();

    // We create a new shuffler instance, which represents a single shuffle. It takes
    // a Communicator, the total number of partitions, and a "owner function", which
    // map partitions to their destination ranks. All ranks must use the same owner
    // function, in this example we use the included round-robin owner function.
    rapidsmpf::shuffler::Shuffler shuffler(
        comm, total_num_partitions, rapidsmpf::shuffler::Shuffler::round_robin, stream, mr
    );

    // It is our own responsibility to partition and pack (serialize) the input for
    // the shuffle. The shuffler only handles raw host and device buffers. However, it
    // does provide a convenience function that hash partition a cudf table and packs
    // each partition. The result is a mapping of `PartID`, globally unique partition
    // identifiers, to their packed partitions.
    std::unordered_map<rapidsmpf::shuffler::PartID, rapidsmpf::PackedData> packed_inputs =
        rapidsmpf::partition_and_pack(
            local_input,
            {0},
            total_num_partitions,
            cudf::hash_id::HASH_MURMUR3,
            cudf::DEFAULT_HASH_SEED,
            stream,
            mr
        );

    // Now, we can insert the packed partitions into the shuffler. This operation is
    // non-blocking and we can continue inserting new input partitions. E.g., a pipeline
    // could read, hash-partition, pack, and insert, one parquet-file at a time while the
    // distributed shuffle is being processed underneath.
    shuffler.insert(std::move(packed_inputs));

    // When we are finished inserting to a specific partition, we tell the shuffler.
    // Again, this is non-blocking and should be done as soon as we known that we don't
    // have more inputs for a specific partition. In this case, we are finished with all
    // partitions.
    for (rapidsmpf::shuffler::PartID i = 0; i < total_num_partitions; ++i) {
        shuffler.insert_finished(i);
    }

    // Vector to hold the local results of the shuffle operation.
    std::vector<std::unique_ptr<cudf::table>> local_outputs;

    // Wait for and process the shuffle results for each partition.
    while (!shuffler.finished()) {
        // Block until a partition is ready and retrieve its partition ID.
        rapidsmpf::shuffler::PartID finished_partition = shuffler.wait_any();

        // Extract the finished partition's data from the Shuffler.
        auto packed_chunks = shuffler.extract(finished_partition);

        // Unpack (deserialize) and concatenate the chunks into a single table using a
        // convenience function.
        local_outputs.push_back(
            rapidsmpf::unpack_and_concat(std::move(packed_chunks))
        );
    }
    // At this point, `local_outputs` contains the local result of the shuffle.
    // Let's log the result.
    log.print(("Finished shuffle with ", local_outputs.size(), " local output partitions");

    // Shutdown the Shuffler explicitly or let it go out of scope for cleanup.
    shuffler.shutdown();

    // Finalize the execution, `RAPIDSMPF_MPI` is a convenience macro that
    // checks for MPI errors.
    RAPIDSMPF_MPI(MPI_Finalize());
}

RapidsMPF Configuration Options

RapidsMPF can be configured using a dictionary of options, which may be populated via environment variables. All dictionary keys are automatically converted to lowercase.

Each configuration option includes:

  • Name: The key used in the configuration dictionary.
  • Environment Variable: The corresponding environment variable name.
  • Description: Describes what the option controls, including accepted values.

[!NOTE] Environment variable names are always uppercase and prefixed with RAPIDSMPF_.

Typically, it is up to the user to read environment variables using code such as:

options = Options()
options.insert_if_absent(get_environment_variables())

However, Dask automatically reads environment variables for any options not set explicitly when calling bootstrap_dask_cluster().

It is always explicit in C++, use something like:

  rapidsmpf::config::Options options{rapidsmpf::config::get_environment_variables()};

Available Options

General

  • log
    • Environment Variable: RAPIDSMPF_LOG
    • Default: WARN
    • Description: Controls the logging verbosity level. Valid values are:
      • NONE: Disable all logging.
      • PRINT: General print messages.
      • WARN: Warning messages (default).
      • INFO: Informational messages.
      • DEBUG: Debug-level messages.
      • TRACE: Fine-grained trace-level messages.

Dask Integration

  • dask_spill_device

    • Environment Variable: RAPIDSMPF_DASK_SPILL_DEVICE
    • Default: 0.50
    • Description: GPU memory limit for shuffling as a fraction of total device memory.
  • dask_oom_protection

    • Environment Variable: RAPIDSMPF_DASK_OOM_PROTECTION
    • Default: False
    • Description: Enable out-of-memory protection by using managed memory when the device memory pool raises OOM errors.
  • dask_periodic_spill_check

    • Environment Variable: RAPIDSMPF_DASK_PERIODIC_SPILL_CHECK
    • Default: 1e-3
    • Description: Enable periodic spill checks. A dedicated thread continuously checks and perform spilling based on the current available memory as reported by the buffer resource. The value of dask_periodic_spill_check is used as the pause between checks (in seconds). Use "disabled" to disable periodic spill checks.
  • dask_statistics

    • Environment Variable: RAPIDSMPF_DASK_STATISTICS
    • Default: False
    • Description: Enable RapidsMPF statitistics collection.
  • dask_print_statistics

    • Environment Variable: RAPIDSMPF_DASK_STATISTICS
    • Default: True
    • Description: Print RapidsMPF statistics to stdout on Dask Worker shutdown when dask_statistics is enabled.
  • dask_staging_spill_buffer

    • Environment Variable: RAPIDSMPF_DASK_STAGING_SPILL_BUFFER
    • Default: 128 MiB
    • Description: Size of the intermediate staging buffer (in bytes) used for device-to-host spilling. This temporary buffer is allocated on the device to reduce memory pressure when transferring Python-managed GPU objects during Dask spilling. Use disabled to skip allocation of the staging buffer.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distributions

If you're not sure about the file name format, learn more about wheel file names.

rapidsmpf_cu13-25.12.0-cp313-cp313-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl (1.5 MB view details)

Uploaded CPython 3.13manylinux: glibc 2.24+ x86-64manylinux: glibc 2.28+ x86-64

rapidsmpf_cu13-25.12.0-cp313-cp313-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl (1.4 MB view details)

Uploaded CPython 3.13manylinux: glibc 2.24+ ARM64manylinux: glibc 2.28+ ARM64

rapidsmpf_cu13-25.12.0-cp312-cp312-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl (1.5 MB view details)

Uploaded CPython 3.12manylinux: glibc 2.24+ x86-64manylinux: glibc 2.28+ x86-64

rapidsmpf_cu13-25.12.0-cp312-cp312-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl (1.4 MB view details)

Uploaded CPython 3.12manylinux: glibc 2.24+ ARM64manylinux: glibc 2.28+ ARM64

rapidsmpf_cu13-25.12.0-cp311-cp311-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl (1.5 MB view details)

Uploaded CPython 3.11manylinux: glibc 2.24+ x86-64manylinux: glibc 2.28+ x86-64

rapidsmpf_cu13-25.12.0-cp311-cp311-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl (1.4 MB view details)

Uploaded CPython 3.11manylinux: glibc 2.24+ ARM64manylinux: glibc 2.28+ ARM64

rapidsmpf_cu13-25.12.0-cp310-cp310-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl (1.5 MB view details)

Uploaded CPython 3.10manylinux: glibc 2.24+ x86-64manylinux: glibc 2.28+ x86-64

rapidsmpf_cu13-25.12.0-cp310-cp310-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl (1.4 MB view details)

Uploaded CPython 3.10manylinux: glibc 2.24+ ARM64manylinux: glibc 2.28+ ARM64

File details

Details for the file rapidsmpf_cu13-25.12.0-cp313-cp313-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl.

File metadata

File hashes

Hashes for rapidsmpf_cu13-25.12.0-cp313-cp313-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl
Algorithm Hash digest
SHA256 54d8ddf4dd08ce489959f949d2d13a0a5869e58ed718e733669c0ecdff551135
MD5 f6dabdb88ea221ac9b5db3edea56e985
BLAKE2b-256 2a183bdbedd5d2d776cace24aa2c73919e1c17ec7b4f0d43a5c7268c381f0e13

See more details on using hashes here.

File details

Details for the file rapidsmpf_cu13-25.12.0-cp313-cp313-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl.

File metadata

File hashes

Hashes for rapidsmpf_cu13-25.12.0-cp313-cp313-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl
Algorithm Hash digest
SHA256 a76922c595a0acd763c9cab18b6fe35ca19123a924b111c267ae2629215ea89c
MD5 34f5a11418d9f542374f5bd7e3990e9d
BLAKE2b-256 f07d179bb31538ec43756a3b42a239333764c7ab702396f941d2ec348428579e

See more details on using hashes here.

File details

Details for the file rapidsmpf_cu13-25.12.0-cp312-cp312-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl.

File metadata

File hashes

Hashes for rapidsmpf_cu13-25.12.0-cp312-cp312-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl
Algorithm Hash digest
SHA256 bf8832849b6c472da7df359c94445c1bb0dd4239b536cb7175f88e00195e0313
MD5 58a1474ebc85d26fb9491c82f2b45587
BLAKE2b-256 ff507b6b5dc6c4618ca9fa954a290b17d65afe53eaa5b862f890ff37e4c85aff

See more details on using hashes here.

File details

Details for the file rapidsmpf_cu13-25.12.0-cp312-cp312-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl.

File metadata

File hashes

Hashes for rapidsmpf_cu13-25.12.0-cp312-cp312-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl
Algorithm Hash digest
SHA256 83398302d2e316116c0c8ee3d15dc5126f364f696dbefca2932b7b0651f43f39
MD5 ebee2a3222d0c82e281dafdfe99f68fc
BLAKE2b-256 fefc1e3436fbe4cf29bdde75a814d2a9a8ad1776d87cf8ee67ca9ba348f77ab3

See more details on using hashes here.

File details

Details for the file rapidsmpf_cu13-25.12.0-cp311-cp311-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl.

File metadata

File hashes

Hashes for rapidsmpf_cu13-25.12.0-cp311-cp311-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl
Algorithm Hash digest
SHA256 ba9c6affd6e0962d74413777dfd06cf4b7f9fac79cf4494cca78a1d22749f37f
MD5 d07701dff83a02a944e419a4317cac81
BLAKE2b-256 e1336de3ff6181aa332f99af8968f0275cb0e5361d89a1383187f9b15e0282bd

See more details on using hashes here.

File details

Details for the file rapidsmpf_cu13-25.12.0-cp311-cp311-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl.

File metadata

File hashes

Hashes for rapidsmpf_cu13-25.12.0-cp311-cp311-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl
Algorithm Hash digest
SHA256 154e0018aabfca17d2efc8c0465327b00c71f111e6797e779d30ba54c6fb969d
MD5 535ba598e7916c6f8a74b66b4fe56942
BLAKE2b-256 4543eca9d1ee33a5a96727a8602e253ccd79c93c1719f3be72f527bb5de721ea

See more details on using hashes here.

File details

Details for the file rapidsmpf_cu13-25.12.0-cp310-cp310-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl.

File metadata

File hashes

Hashes for rapidsmpf_cu13-25.12.0-cp310-cp310-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl
Algorithm Hash digest
SHA256 0e82639780ca4ba482c04e9d348deef5d9916e0032ce5844294a6c923cb811a0
MD5 8ef75e7ad12e9bd541ac01831e707876
BLAKE2b-256 b81d71068319478613c9ef18637923fe68535b7705d7569bc5fa896b23d52b2f

See more details on using hashes here.

File details

Details for the file rapidsmpf_cu13-25.12.0-cp310-cp310-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl.

File metadata

File hashes

Hashes for rapidsmpf_cu13-25.12.0-cp310-cp310-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl
Algorithm Hash digest
SHA256 26220700c5c170ab7c6895ce662964a7d5be0786ef2280cc436c870667a06e66
MD5 b4cf1c8148efb2f6ee7462e35ccd1d71
BLAKE2b-256 ef6c002d8f7819e1aa69dec1ee70f93ebc75c1836627bf2e3de6c02e18110c96

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page