Skip to main content

Multi-GPU and distributed-memory algorithms

Project description

RapidsMPF

Collection of multi-GPU, distributed memory algorithms.

Getting started

Building rapidsmpf from source is recommended when running a nightly/upstream versions, since dependencies on non-ABI-stable libraries (e.g., pylibcudf) could cause temporary breakage leading to issues such as segmentation faults. Stable versions can be installed from conda or pip packages.

Build from source

Clone rapidsmpf and install the dependencies in a conda environment:

git clone https://github.com/rapidsai/rapidsmpf.git
cd rapidsmpf

# Choose a environment file that match your system.
mamba env create --name rapidsmpf-dev --file conda/environments/all_cuda-131_arch-$(uname -m).yaml

# Build
./build.sh

Debug build

Debug builds can be produced by adding the -g flag:

./build.sh -g
AddressSanitizer-enabled build

Enabling the AddressSanitizer is also possible with the --asan flag:

./build.sh -g --asan

C++ code built with AddressSanitizer should simply work, but there are caveats for CUDA and Python code. Any CUDA code executing with AddressSanitizer requires protect_shadow_gap=0, which can be set via an environment variable:

ASAN_OPTIONS=protect_shadow_gap=0

On the other hand, Python may require LD_PRELOAD to be set so that the AddressSanitizer is loaded before Python. On a conda environment, for example, there is usually a $CONDA_PREFIX/lib/libasan.so, and thus the application may be launched as follows:

LD_PRELOAD=$CONDA_PREFIX/lib/libasan.so python ...

Python applications using CUDA will require setting both environment variables described above

MPI

Run the test suite using MPI:

# When using OpenMP, we need to enable CUDA support.
export OMPI_MCA_opal_cuda_support=1

# Run the suite using two MPI processes.
mpirun -np 2 cpp/build/gtests/mpi_tests

# Alternatively
cd cpp/build && ctest -R mpi_tests_2

We can also run the shuffle benchmark. To assign each MPI rank its own GPU, we use a binder script:

# The binder script requires numactl `mamba install numactl`
wget https://raw.githubusercontent.com/LStuber/binding/refs/heads/master/binder.sh
chmod a+x binder.sh
mpirun -np 2 ./binder.sh cpp/build/benchmarks/bench_shuffle

UCX

The UCX test suite uses, for convenience, MPI to bootstrap, therefore we need to launch UCX tests with mpirun. Run the test suite using UCX:

# Run the suite using two processes.
mpirun -np 2 cpp/build/gtests/ucxx_tests

rrun - Distributed Launcher

RapidsMPF includes rrun, a lightweight launcher that eliminates the MPI dependency for multi-GPU workloads. This is particularly useful for development, testing, and environments where MPI is not available.

Single-Node Usage

# Build rrun
cd cpp/build
cmake --build . --target rrun

# Launch 2 ranks in the local node
./tools/rrun -n 2 ./benchmarks/bench_comm -C ucxx -O all-to-all

# With verbose output and specific GPUs
./tools/rrun -v -n 4 -g 0,1,2,3 ./benchmarks/bench_comm -C ucxx

Algorithms

Table Shuffle Service

Example of a MPI program that uses the shuffler:

#include <vector>

#include <mpi.h>
#include <unistd.h>

#include <rapidsmpf/memory/packed_data.hpp>
#include <rapidsmpf/communicator/mpi.hpp>
#include <rapidsmpf/error.hpp>
#include <rapidsmpf/integrations/cudf/partition.hpp>
#include <rapidsmpf/shuffler/shuffler.hpp>

#include "../benchmarks/utils/random_data.hpp"

// An example of how to use the shuffler.
int main(int argc, char** argv) {
    // In this example we use the MPI backed. For convenience, rapidsmpf provides an
    // optional MPI-init function that initialize MPI with thread support.
    rapidsmpf::mpi::init(&argc, &argv);

    // Initialize configuration options from environment variables.
    rapidsmpf::config::Options options{rapidsmpf::config::get_environment_variables()};

    // First, we have to create a Communicator, which we will use throughout the
    // example. Notice, if you want to do multiple shuffles concurrently, each shuffle
    // should use its own Communicator backed by its own MPI communicator.
    std::shared_ptr<rapidsmpf::Communicator> comm =
        std::make_shared<rapidsmpf::MPI>(MPI_COMM_WORLD, options);

    // Create a statistics instance for the shuffler that tracks useful information.
    auto stats = std::make_shared<rapidsmpf::Statistics>();

    // Then a progress thread where the shuffler event loop executes is created. A single
    // progress thread may be used by multiple shufflers simultaneously.
    std::shared_ptr<rapidsmpf::ProgressThread> progress_thread =
        std::make_shared<rapidsmpf::ProgressThread>(comm->logger(), stats);

    // The Communicator provides a logger.
    auto& log = comm->logger();

    // We will use the same stream, memory, and buffer resource throughout the example.
    rmm::cuda_stream_view stream = cudf::get_default_stream();
    rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref();
    rapidsmpf::BufferResource br{mr};

    // As input data, we use a helper function from the benchmark suite. It creates a
    // random cudf table with 2 columns and 100 rows. In this example, each MPI rank
    // creates its own local input and we only have one input per rank but each rank
    // could take any number of inputs.
    cudf::table local_input = random_table(2, 100, 0, 10, stream, mr);

    // The total number of inputs equals the number of ranks, in this case.
    auto const total_num_partitions =
        static_cast<rapidsmpf::shuffler::PartID>(comm->nranks());

    // We create a new shuffler instance, which represents a single shuffle. It takes
    // a Communicator, the total number of partitions, and a "owner function", which
    // map partitions to their destination ranks. All ranks must use the same owner
    // function, in this example we use the included round-robin owner function.
    rapidsmpf::shuffler::Shuffler shuffler(
        comm,
        progress_thread,
        0,  // op_id
        total_num_partitions,
        &br,
        stats,
        rapidsmpf::shuffler::Shuffler::round_robin  // partition owner
    );

    // It is our own responsibility to partition and pack (serialize) the input for
    // the shuffle. The shuffler only handles raw host and device buffers. However, it
    // does provide a convenience function that hash partitions a cudf table and packs
    // each partition. The result is a mapping of `PartID`, globally unique partition
    // identifiers, to their packed partitions.
    std::unordered_map<rapidsmpf::shuffler::PartID, rapidsmpf::PackedData> packed_inputs =
        rapidsmpf::partition_and_pack(
            local_input,
            {0},  // columns_to_hash
            static_cast<int>(total_num_partitions),
            cudf::hash_id::HASH_MURMUR3,
            cudf::DEFAULT_HASH_SEED,
            stream,
            &br
        );

    // Now, we can insert the packed partitions into the shuffler. This operation is
    // non-blocking and we can continue inserting new input partitions. E.g., a pipeline
    // could read, hash-partition, pack, and insert, one parquet-file at a time while the
    // distributed shuffle is being processed underneath.
    shuffler.insert(std::move(packed_inputs));

    // When we are finished inserting to a specific partition, we tell the shuffler.
    // Again, this is non-blocking and should be done as soon as we known that we don't
    // have more inputs for a specific partition. In this case, we are finished with all
    // partitions.
    for (rapidsmpf::shuffler::PartID i = 0; i < total_num_partitions; ++i) {
        shuffler.insert_finished(i);
    }

    // Vector to hold the local results of the shuffle operation.
    std::vector<std::unique_ptr<cudf::table>> local_outputs;

    // Wait for and process the shuffle results for each partition.
    while (!shuffler.finished()) {
        // Block until a partition is ready and retrieve its partition ID.
        rapidsmpf::shuffler::PartID finished_partition = shuffler.wait_any();

        // Extract the finished partition's data from the Shuffler.
        auto packed_chunks = shuffler.extract(finished_partition);

        // Unpack (deserialize) and concatenate the chunks into a single table using a
        // convenience function.
        local_outputs.push_back(
            rapidsmpf::unpack_and_concat(
                rapidsmpf::unspill_partitions(std::move(packed_chunks), &br, true),
                stream,
                &br
            )
        );
    }
    // At this point, `local_outputs` contains the local result of the shuffle.
    // Let's log the result.
    log.print("Finished shuffle with ", local_outputs.size(), " local output partitions");

    // Log the statistics report.
    log.print(stats->report());

    // Shutdown the Shuffler explicitly or let it go out of scope for cleanup.
    shuffler.shutdown();

    // Finalize the execution, `RAPIDSMPF_MPI` is a convenience macro that
    // checks for MPI errors.
    RAPIDSMPF_MPI(MPI_Finalize());
}

RapidsMPF Configuration Options

RapidsMPF can be configured using a dictionary of options, which may be populated via environment variables. All dictionary keys are automatically converted to lowercase.

Each configuration option includes:

  • Name: The key used in the configuration dictionary.
  • Environment Variable: The corresponding environment variable name.
  • Description: Describes what the option controls, including accepted values.

[!NOTE] Environment variable names are always uppercase and prefixed with RAPIDSMPF_.

Typically, it is up to the user to read environment variables using code such as:

options = Options()
options.insert_if_absent(get_environment_variables())

However, Dask automatically reads environment variables for any options not set explicitly when calling bootstrap_dask_cluster().

It is always explicit in C++, use something like:

  rapidsmpf::config::Options options{rapidsmpf::config::get_environment_variables()};

Available Options

General

  • log

    • Environment Variable: RAPIDSMPF_LOG
    • Default: WARN
    • Description: Controls the logging verbosity level. Valid values are:
      • NONE: Disable all logging.
      • PRINT: General print messages.
      • WARN: Warning messages (default).
      • INFO: Informational messages.
      • DEBUG: Debug-level messages.
      • TRACE: Fine-grained trace-level messages.
  • memory_reserve_timeout_ms

    • Environment Variable: RAPIDSMPF_MEMORY_RESERVE_TIMEOUT_MS

    • Default: 100

    • Description: Controls the global progress timeout for memory reservation requests, specified in milliseconds.

      The value limits how long the system may go without making progress on any pending memory reservation. When the timeout expires and no reservation has been satisfied, the system forces progress by selecting a pending request and attempting to reserve memory for it. Depending on the context, this may result in an empty reservation, an overbooked reservation, or a failure.

      This option ensures forward progress under memory pressure and prevents the system from stalling indefinitely when memory availability fluctuates.

Dask Integration

  • dask_spill_device

    • Environment Variable: RAPIDSMPF_DASK_SPILL_DEVICE
    • Default: 0.50
    • Description: GPU memory limit for shuffling as a fraction of total device memory.
  • dask_oom_protection

    • Environment Variable: RAPIDSMPF_DASK_OOM_PROTECTION
    • Default: False
    • Description: Enable out-of-memory protection by using managed memory when the device memory pool raises OOM errors.
  • dask_periodic_spill_check

    • Environment Variable: RAPIDSMPF_DASK_PERIODIC_SPILL_CHECK
    • Default: 1e-3
    • Description: Enable periodic spill checks. A dedicated thread continuously checks and perform spilling based on the current available memory as reported by the buffer resource. The value of dask_periodic_spill_check is used as the pause between checks (in seconds). Use "disabled" to disable periodic spill checks.
  • dask_statistics

    • Environment Variable: RAPIDSMPF_DASK_STATISTICS
    • Default: False
    • Description: Enable RapidsMPF statitistics collection.
  • dask_print_statistics

    • Environment Variable: RAPIDSMPF_DASK_STATISTICS
    • Default: True
    • Description: Print RapidsMPF statistics to stdout on Dask Worker shutdown when dask_statistics is enabled.
  • dask_staging_spill_buffer

    • Environment Variable: RAPIDSMPF_DASK_STAGING_SPILL_BUFFER
    • Default: 128 MiB
    • Description: Size of the intermediate staging buffer (in bytes) used for device-to-host spilling. This temporary buffer is allocated on the device to reduce memory pressure when transferring Python-managed GPU objects during Dask spilling. Use disabled to skip allocation of the staging buffer.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distributions

If you're not sure about the file name format, learn more about wheel file names.

rapidsmpf_cu13-26.2.0-cp313-cp313-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl (2.1 MB view details)

Uploaded CPython 3.13manylinux: glibc 2.24+ x86-64manylinux: glibc 2.28+ x86-64

rapidsmpf_cu13-26.2.0-cp313-cp313-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl (2.0 MB view details)

Uploaded CPython 3.13manylinux: glibc 2.24+ ARM64manylinux: glibc 2.28+ ARM64

rapidsmpf_cu13-26.2.0-cp312-cp312-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl (2.1 MB view details)

Uploaded CPython 3.12manylinux: glibc 2.24+ x86-64manylinux: glibc 2.28+ x86-64

rapidsmpf_cu13-26.2.0-cp312-cp312-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl (2.0 MB view details)

Uploaded CPython 3.12manylinux: glibc 2.24+ ARM64manylinux: glibc 2.28+ ARM64

rapidsmpf_cu13-26.2.0-cp311-cp311-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl (2.1 MB view details)

Uploaded CPython 3.11manylinux: glibc 2.24+ x86-64manylinux: glibc 2.28+ x86-64

rapidsmpf_cu13-26.2.0-cp311-cp311-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl (2.0 MB view details)

Uploaded CPython 3.11manylinux: glibc 2.24+ ARM64manylinux: glibc 2.28+ ARM64

rapidsmpf_cu13-26.2.0-cp310-cp310-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl (2.1 MB view details)

Uploaded CPython 3.10manylinux: glibc 2.24+ x86-64manylinux: glibc 2.28+ x86-64

rapidsmpf_cu13-26.2.0-cp310-cp310-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl (2.0 MB view details)

Uploaded CPython 3.10manylinux: glibc 2.24+ ARM64manylinux: glibc 2.28+ ARM64

File details

Details for the file rapidsmpf_cu13-26.2.0-cp313-cp313-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl.

File metadata

File hashes

Hashes for rapidsmpf_cu13-26.2.0-cp313-cp313-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl
Algorithm Hash digest
SHA256 2b7c2726a74d111d7964af562172a77ca18758771aa9ae52a29d012d9b6c2258
MD5 8e4271921dc92c0b27666f943041d921
BLAKE2b-256 f7855e9bae108d6b7fc11b843311305abbc70122b090d9c764621a7f64349970

See more details on using hashes here.

File details

Details for the file rapidsmpf_cu13-26.2.0-cp313-cp313-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl.

File metadata

File hashes

Hashes for rapidsmpf_cu13-26.2.0-cp313-cp313-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl
Algorithm Hash digest
SHA256 6f24a0cd20bac03dfdfd7702d3bf4d84b63847ee64c420d3b0b2be0722dcb021
MD5 f4c3c814ac745ddca7fdcdcbb7d93ee6
BLAKE2b-256 75e0c5d7c2aa66b35b4c521f84706ebbde422d32f0f4f1742e3c6342be3228b1

See more details on using hashes here.

File details

Details for the file rapidsmpf_cu13-26.2.0-cp312-cp312-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl.

File metadata

File hashes

Hashes for rapidsmpf_cu13-26.2.0-cp312-cp312-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl
Algorithm Hash digest
SHA256 912e274b488925639f62efc407af4e58dcb0c7d168befb2c11eb368624a21169
MD5 14da98d999f7e1e6cad913ec35a122c0
BLAKE2b-256 df85c43b0db5ff5ce43229de4923424e042fb4130e240a284713eb1e7b23e46c

See more details on using hashes here.

File details

Details for the file rapidsmpf_cu13-26.2.0-cp312-cp312-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl.

File metadata

File hashes

Hashes for rapidsmpf_cu13-26.2.0-cp312-cp312-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl
Algorithm Hash digest
SHA256 1e03ba2cc1d182df1a6468520d61f850fa4efffdcf1661a96e82186a1f685170
MD5 e5110fbe9ca6003d78615a5db7890b15
BLAKE2b-256 9ceb1fd67e025d3bfa6663c21ef2feba99bfda82c207eaa20b71cdfadd285aa9

See more details on using hashes here.

File details

Details for the file rapidsmpf_cu13-26.2.0-cp311-cp311-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl.

File metadata

File hashes

Hashes for rapidsmpf_cu13-26.2.0-cp311-cp311-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl
Algorithm Hash digest
SHA256 6ea10302f1c5badf72102b5d2d693482ea14769ebf2a99a168d27499e469bf34
MD5 d97e9f0252ace4798e98aa4700def5df
BLAKE2b-256 b6fd54930add08f02545ca4904cd356042b46090deee4296bc88ba662dd9a571

See more details on using hashes here.

File details

Details for the file rapidsmpf_cu13-26.2.0-cp311-cp311-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl.

File metadata

File hashes

Hashes for rapidsmpf_cu13-26.2.0-cp311-cp311-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl
Algorithm Hash digest
SHA256 ad71d848902541a48c8c051d922f04f43be0d108e9fa4ecc168c416efbe076b6
MD5 ee090b1a4b0a7be89522ca95c8b4b2f1
BLAKE2b-256 5244eed0240a59a822525f13dcac83e1654ac4800111735b07634be1721e6a99

See more details on using hashes here.

File details

Details for the file rapidsmpf_cu13-26.2.0-cp310-cp310-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl.

File metadata

File hashes

Hashes for rapidsmpf_cu13-26.2.0-cp310-cp310-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl
Algorithm Hash digest
SHA256 7578db015e87172d74e3e08c3d2042fae7e260dca5a8a5976d70231f69237209
MD5 7aff99ec215f89bfb47ee3970b334dcf
BLAKE2b-256 971b42675cfdc2af7ec74f1047dd465faa967fc41d854ead55bdca3d5887ecf9

See more details on using hashes here.

File details

Details for the file rapidsmpf_cu13-26.2.0-cp310-cp310-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl.

File metadata

File hashes

Hashes for rapidsmpf_cu13-26.2.0-cp310-cp310-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl
Algorithm Hash digest
SHA256 482f6fd6362e7051ff613d58f794c4208ea9460e194fa413841452d60affd390
MD5 dd44c17055b78cfd3a3e977720f70efa
BLAKE2b-256 3aa64fa20ecc4cd8a99daaa57d34747583aeb6424d7a9c949dcaacb36e91ab4a

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page