Skip to main content

Multi-GPU and distributed-memory algorithms

Project description

RapidsMPF

Collection of multi-GPU, distributed memory algorithms.

Getting started

Building rapidsmpf from source is recommended when running a nightly/upstream versions, since dependencies on non-ABI-stable libraries (e.g., pylibcudf) could cause temporary breakage leading to issues such as segmentation faults. Stable versions can be installed from conda or pip packages.

Build from source

Clone rapidsmpf and install the dependencies in a conda environment:

git clone https://github.com/rapidsai/rapidsmpf.git
cd rapidsmpf

# Choose a environment file that match your system.
mamba env create --name rapidsmpf-dev --file conda/environments/all_cuda-131_arch-$(uname -m).yaml

# Build
./build.sh

Debug build

Debug builds can be produced by adding the -g flag:

./build.sh -g
AddressSanitizer-enabled build

Enabling the AddressSanitizer is also possible with the --asan flag:

./build.sh -g --asan

C++ code built with AddressSanitizer should simply work, but there are caveats for CUDA and Python code. Any CUDA code executing with AddressSanitizer requires protect_shadow_gap=0, which can be set via an environment variable:

ASAN_OPTIONS=protect_shadow_gap=0

On the other hand, Python may require LD_PRELOAD to be set so that the AddressSanitizer is loaded before Python. On a conda environment, for example, there is usually a $CONDA_PREFIX/lib/libasan.so, and thus the application may be launched as follows:

LD_PRELOAD=$CONDA_PREFIX/lib/libasan.so python ...

Python applications using CUDA will require setting both environment variables described above

MPI

Run the test suite using MPI:

# When using OpenMP, we need to enable CUDA support.
export OMPI_MCA_opal_cuda_support=1

# Run the suite using two MPI processes.
mpirun -np 2 cpp/build/gtests/mpi_tests

# Alternatively
cd cpp/build && ctest -R mpi_tests_2

We can also run the shuffle benchmark. To assign each MPI rank its own GPU, we use a binder script:

# The binder script requires numactl `mamba install numactl`
wget https://raw.githubusercontent.com/LStuber/binding/refs/heads/master/binder.sh
chmod a+x binder.sh
mpirun -np 2 ./binder.sh cpp/build/benchmarks/bench_shuffle

UCX

The UCX test suite uses, for convenience, MPI to bootstrap, therefore we need to launch UCX tests with mpirun. Run the test suite using UCX:

# Run the suite using two processes.
mpirun -np 2 cpp/build/gtests/ucxx_tests

rrun - Distributed Launcher

RapidsMPF includes rrun, a lightweight launcher that eliminates the MPI dependency for multi-GPU workloads. This is particularly useful for development, testing, and environments where MPI is not available.

Single-Node Usage

# Build rrun
cd cpp/build
cmake --build . --target rrun

# Launch 2 ranks in the local node
./tools/rrun -n 2 ./benchmarks/bench_comm -C ucxx -O all-to-all

# With verbose output and specific GPUs
./tools/rrun -v -n 4 -g 0,1,2,3 ./benchmarks/bench_comm -C ucxx

Algorithms

Table Shuffle Service

Example of a MPI program that uses the shuffler:

#include <vector>

#include <mpi.h>
#include <unistd.h>

#include <rapidsmpf/memory/packed_data.hpp>
#include <rapidsmpf/communicator/mpi.hpp>
#include <rapidsmpf/error.hpp>
#include <rapidsmpf/integrations/cudf/partition.hpp>
#include <rapidsmpf/shuffler/shuffler.hpp>

#include "../benchmarks/utils/random_data.hpp"

// An example of how to use the shuffler.
int main(int argc, char** argv) {
    // In this example we use the MPI backed. For convenience, rapidsmpf provides an
    // optional MPI-init function that initialize MPI with thread support.
    rapidsmpf::mpi::init(&argc, &argv);

    // Initialize configuration options from environment variables.
    rapidsmpf::config::Options options{rapidsmpf::config::get_environment_variables()};

    // First, we have to create a Communicator, which we will use throughout the
    // example. Notice, if you want to do multiple shuffles concurrently, each shuffle
    // should use its own Communicator backed by its own MPI communicator.
    std::shared_ptr<rapidsmpf::Communicator> comm =
        std::make_shared<rapidsmpf::MPI>(MPI_COMM_WORLD, options);

    // Create a statistics instance for the shuffler that tracks useful information.
    auto stats = std::make_shared<rapidsmpf::Statistics>();

    // Then a progress thread where the shuffler event loop executes is created. A single
    // progress thread may be used by multiple shufflers simultaneously.
    std::shared_ptr<rapidsmpf::ProgressThread> progress_thread =
        std::make_shared<rapidsmpf::ProgressThread>(comm->logger(), stats);

    // The Communicator provides a logger.
    auto& log = comm->logger();

    // We will use the same stream, memory, and buffer resource throughout the example.
    rmm::cuda_stream_view stream = cudf::get_default_stream();
    rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref();
    rapidsmpf::BufferResource br{mr};

    // As input data, we use a helper function from the benchmark suite. It creates a
    // random cudf table with 2 columns and 100 rows. In this example, each MPI rank
    // creates its own local input and we only have one input per rank but each rank
    // could take any number of inputs.
    cudf::table local_input = random_table(2, 100, 0, 10, stream, mr);

    // The total number of inputs equals the number of ranks, in this case.
    auto const total_num_partitions =
        static_cast<rapidsmpf::shuffler::PartID>(comm->nranks());

    // We create a new shuffler instance, which represents a single shuffle. It takes
    // a Communicator, the total number of partitions, and a "owner function", which
    // map partitions to their destination ranks. All ranks must use the same owner
    // function, in this example we use the included round-robin owner function.
    rapidsmpf::shuffler::Shuffler shuffler(
        comm,
        progress_thread,
        0,  // op_id
        total_num_partitions,
        &br,
        stats,
        rapidsmpf::shuffler::Shuffler::round_robin  // partition owner
    );

    // It is our own responsibility to partition and pack (serialize) the input for
    // the shuffle. The shuffler only handles raw host and device buffers. However, it
    // does provide a convenience function that hash partitions a cudf table and packs
    // each partition. The result is a mapping of `PartID`, globally unique partition
    // identifiers, to their packed partitions.
    std::unordered_map<rapidsmpf::shuffler::PartID, rapidsmpf::PackedData> packed_inputs =
        rapidsmpf::partition_and_pack(
            local_input,
            {0},  // columns_to_hash
            static_cast<int>(total_num_partitions),
            cudf::hash_id::HASH_MURMUR3,
            cudf::DEFAULT_HASH_SEED,
            stream,
            &br
        );

    // Now, we can insert the packed partitions into the shuffler. This operation is
    // non-blocking and we can continue inserting new input partitions. E.g., a pipeline
    // could read, hash-partition, pack, and insert, one parquet-file at a time while the
    // distributed shuffle is being processed underneath.
    shuffler.insert(std::move(packed_inputs));

    // When we are finished inserting to a specific partition, we tell the shuffler.
    // Again, this is non-blocking and should be done as soon as we known that we don't
    // have more inputs for a specific partition. In this case, we are finished with all
    // partitions.
    for (rapidsmpf::shuffler::PartID i = 0; i < total_num_partitions; ++i) {
        shuffler.insert_finished(i);
    }

    // Vector to hold the local results of the shuffle operation.
    std::vector<std::unique_ptr<cudf::table>> local_outputs;

    // Wait for and process the shuffle results for each partition.
    while (!shuffler.finished()) {
        // Block until a partition is ready and retrieve its partition ID.
        rapidsmpf::shuffler::PartID finished_partition = shuffler.wait_any();

        // Extract the finished partition's data from the Shuffler.
        auto packed_chunks = shuffler.extract(finished_partition);

        // Unpack (deserialize) and concatenate the chunks into a single table using a
        // convenience function.
        local_outputs.push_back(
            rapidsmpf::unpack_and_concat(
                rapidsmpf::unspill_partitions(std::move(packed_chunks), &br, true),
                stream,
                &br
            )
        );
    }
    // At this point, `local_outputs` contains the local result of the shuffle.
    // Let's log the result.
    log.print("Finished shuffle with ", local_outputs.size(), " local output partitions");

    // Log the statistics report.
    log.print(stats->report());

    // Shutdown the Shuffler explicitly or let it go out of scope for cleanup.
    shuffler.shutdown();

    // Finalize the execution, `RAPIDSMPF_MPI` is a convenience macro that
    // checks for MPI errors.
    RAPIDSMPF_MPI(MPI_Finalize());
}

RapidsMPF Configuration Options

RapidsMPF can be configured using a dictionary of options, which may be populated via environment variables. All dictionary keys are automatically converted to lowercase.

Each configuration option includes:

  • Name: The key used in the configuration dictionary.
  • Environment Variable: The corresponding environment variable name.
  • Description: Describes what the option controls, including accepted values.

[!NOTE] Environment variable names are always uppercase and prefixed with RAPIDSMPF_.

Typically, it is up to the user to read environment variables using code such as:

options = Options()
options.insert_if_absent(get_environment_variables())

However, Dask automatically reads environment variables for any options not set explicitly when calling bootstrap_dask_cluster().

It is always explicit in C++, use something like:

  rapidsmpf::config::Options options{rapidsmpf::config::get_environment_variables()};

Available Options

General

  • log

    • Environment Variable: RAPIDSMPF_LOG
    • Default: WARN
    • Description: Controls the logging verbosity level. Valid values are:
      • NONE: Disable all logging.
      • PRINT: General print messages.
      • WARN: Warning messages (default).
      • INFO: Informational messages.
      • DEBUG: Debug-level messages.
      • TRACE: Fine-grained trace-level messages.
  • memory_reserve_timeout_ms

    • Environment Variable: RAPIDSMPF_MEMORY_RESERVE_TIMEOUT_MS

    • Default: 100

    • Description: Controls the global progress timeout for memory reservation requests, specified in milliseconds.

      The value limits how long the system may go without making progress on any pending memory reservation. When the timeout expires and no reservation has been satisfied, the system forces progress by selecting a pending request and attempting to reserve memory for it. Depending on the context, this may result in an empty reservation, an overbooked reservation, or a failure.

      This option ensures forward progress under memory pressure and prevents the system from stalling indefinitely when memory availability fluctuates.

Dask Integration

  • dask_spill_device

    • Environment Variable: RAPIDSMPF_DASK_SPILL_DEVICE
    • Default: 0.50
    • Description: GPU memory limit for shuffling as a fraction of total device memory.
  • dask_oom_protection

    • Environment Variable: RAPIDSMPF_DASK_OOM_PROTECTION
    • Default: False
    • Description: Enable out-of-memory protection by using managed memory when the device memory pool raises OOM errors.
  • dask_periodic_spill_check

    • Environment Variable: RAPIDSMPF_DASK_PERIODIC_SPILL_CHECK
    • Default: 1e-3
    • Description: Enable periodic spill checks. A dedicated thread continuously checks and perform spilling based on the current available memory as reported by the buffer resource. The value of dask_periodic_spill_check is used as the pause between checks (in seconds). Use "disabled" to disable periodic spill checks.
  • dask_statistics

    • Environment Variable: RAPIDSMPF_DASK_STATISTICS
    • Default: False
    • Description: Enable RapidsMPF statitistics collection.
  • dask_print_statistics

    • Environment Variable: RAPIDSMPF_DASK_STATISTICS
    • Default: True
    • Description: Print RapidsMPF statistics to stdout on Dask Worker shutdown when dask_statistics is enabled.
  • dask_staging_spill_buffer

    • Environment Variable: RAPIDSMPF_DASK_STAGING_SPILL_BUFFER
    • Default: 128 MiB
    • Description: Size of the intermediate staging buffer (in bytes) used for device-to-host spilling. This temporary buffer is allocated on the device to reduce memory pressure when transferring Python-managed GPU objects during Dask spilling. Use disabled to skip allocation of the staging buffer.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distributions

If you're not sure about the file name format, learn more about wheel file names.

rapidsmpf_cu12-26.2.0-cp313-cp313-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl (2.1 MB view details)

Uploaded CPython 3.13manylinux: glibc 2.24+ x86-64manylinux: glibc 2.28+ x86-64

rapidsmpf_cu12-26.2.0-cp313-cp313-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl (2.0 MB view details)

Uploaded CPython 3.13manylinux: glibc 2.24+ ARM64manylinux: glibc 2.28+ ARM64

rapidsmpf_cu12-26.2.0-cp312-cp312-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl (2.1 MB view details)

Uploaded CPython 3.12manylinux: glibc 2.24+ x86-64manylinux: glibc 2.28+ x86-64

rapidsmpf_cu12-26.2.0-cp312-cp312-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl (2.0 MB view details)

Uploaded CPython 3.12manylinux: glibc 2.24+ ARM64manylinux: glibc 2.28+ ARM64

rapidsmpf_cu12-26.2.0-cp311-cp311-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl (2.1 MB view details)

Uploaded CPython 3.11manylinux: glibc 2.24+ x86-64manylinux: glibc 2.28+ x86-64

rapidsmpf_cu12-26.2.0-cp311-cp311-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl (2.0 MB view details)

Uploaded CPython 3.11manylinux: glibc 2.24+ ARM64manylinux: glibc 2.28+ ARM64

rapidsmpf_cu12-26.2.0-cp310-cp310-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl (2.1 MB view details)

Uploaded CPython 3.10manylinux: glibc 2.24+ x86-64manylinux: glibc 2.28+ x86-64

rapidsmpf_cu12-26.2.0-cp310-cp310-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl (2.0 MB view details)

Uploaded CPython 3.10manylinux: glibc 2.24+ ARM64manylinux: glibc 2.28+ ARM64

File details

Details for the file rapidsmpf_cu12-26.2.0-cp313-cp313-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl.

File metadata

File hashes

Hashes for rapidsmpf_cu12-26.2.0-cp313-cp313-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl
Algorithm Hash digest
SHA256 4d29a6e943f1fccda878435fb5b73b14f35ad3b22291b7da9ab078eee4ce5d7e
MD5 bed80033796c634a06a3833b0a4c47d9
BLAKE2b-256 262fbbcdf9bf6f729b61df8f4c2ce1f3bc39db49d1cae6f7d1104d3c205fce71

See more details on using hashes here.

File details

Details for the file rapidsmpf_cu12-26.2.0-cp313-cp313-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl.

File metadata

File hashes

Hashes for rapidsmpf_cu12-26.2.0-cp313-cp313-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl
Algorithm Hash digest
SHA256 00132ec99452ba07fad0ac8d2016e530b31bbf82c4635d70691a8f814f64fd81
MD5 f40229ce4b68277a5441c37d0ddcba77
BLAKE2b-256 d1de3a5017c929cca539ad8bf84fd6d32afdb387916077a3c4caeccf028bc75f

See more details on using hashes here.

File details

Details for the file rapidsmpf_cu12-26.2.0-cp312-cp312-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl.

File metadata

File hashes

Hashes for rapidsmpf_cu12-26.2.0-cp312-cp312-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl
Algorithm Hash digest
SHA256 1974bb57a5e6135ac189266cb8ce3adbbfac0307f26561704d9f92f4a166baca
MD5 b5e68ae03c7dae901be7396cd34d5a90
BLAKE2b-256 3cd5fe0ad396ae6de02f0862bf09e59af854aceb36edbda558338db4e840f632

See more details on using hashes here.

File details

Details for the file rapidsmpf_cu12-26.2.0-cp312-cp312-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl.

File metadata

File hashes

Hashes for rapidsmpf_cu12-26.2.0-cp312-cp312-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl
Algorithm Hash digest
SHA256 46479af11b22ba5b22a88fd8e6208b31c9521e60fb892e9609c5058904ffae16
MD5 d110469e209b687729d2e7cc311d1dc1
BLAKE2b-256 7cfe489cd30dde48011f7891aadfc574e2a8e2811630d08de1a2579c3ee9eb03

See more details on using hashes here.

File details

Details for the file rapidsmpf_cu12-26.2.0-cp311-cp311-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl.

File metadata

File hashes

Hashes for rapidsmpf_cu12-26.2.0-cp311-cp311-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl
Algorithm Hash digest
SHA256 13e6cc0351179b2cb55d6c6a2c868c7faac44168165f4422d5d010020f723df1
MD5 834b8c8cb0d7ddde19eb5edf47c1aa9e
BLAKE2b-256 a9bc517c9e95e7cc09fb23831563c9ab0e0601a9980c8e641ce39f4f0e7c0be3

See more details on using hashes here.

File details

Details for the file rapidsmpf_cu12-26.2.0-cp311-cp311-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl.

File metadata

File hashes

Hashes for rapidsmpf_cu12-26.2.0-cp311-cp311-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl
Algorithm Hash digest
SHA256 807d2a8f51f106adb5b05e9b3286f4584110e1e4d89a778059dec7f7a6009826
MD5 ee1604e8c4bb72d5fc74d4f32c392dcf
BLAKE2b-256 79d9f8382475bfc6ec8266a4b44400ea80c4b7f4a257895f3869904c9492d861

See more details on using hashes here.

File details

Details for the file rapidsmpf_cu12-26.2.0-cp310-cp310-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl.

File metadata

File hashes

Hashes for rapidsmpf_cu12-26.2.0-cp310-cp310-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl
Algorithm Hash digest
SHA256 6589c479ab65130309710bed9cef72c6581f92159c75b1acce4809d7821856bb
MD5 662df6759275719bc4a7f08edc9a1675
BLAKE2b-256 a2a1ae3a7afae3cf142f1fb196de1c621c87068ce7e6200fd79dde3ad62f4d0f

See more details on using hashes here.

File details

Details for the file rapidsmpf_cu12-26.2.0-cp310-cp310-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl.

File metadata

File hashes

Hashes for rapidsmpf_cu12-26.2.0-cp310-cp310-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl
Algorithm Hash digest
SHA256 eb55758373aeae44ba99cdfb7963bfd2f4371eef264ffe6f82abe0c80c149504
MD5 cb742b84f67fdee52dad5a2fb8d67ac9
BLAKE2b-256 f4981278d528553b741e8998c0b0b77062d95571259391075a67d9fdd9b2ce67

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page