Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP Example allgatherv: Add serialization #697

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 75 additions & 26 deletions examples/usage/allgatherv_example.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
#include <numeric>
#include <vector>

#include <kamping/serialization.hpp>
#include <mpi.h>

#include "cereal/types/string.hpp"
#include "cereal/types/unordered_map.hpp"
#include "helpers_for_examples.hpp"
#include "kamping/checking_casts.hpp"
#include "kamping/collectives/allgather.hpp"
Expand All @@ -27,44 +30,90 @@

int main() {
using namespace kamping;
kamping::Environment e;

// The Environment class is a RAII wrapper around MPI_Init and MPI_Finalize.
kamping::Environment e;

// A kamping::Communicator abstracts away an MPI_Comm; here MPI_COMM_WORLD.
kamping::Communicator comm;
std::vector<int> input(comm.rank(), comm.rank_signed());

{
// simply return the recv buffer
auto recv_buffer = comm.allgatherv(send_buf(input));
print_result(recv_buffer, comm);
// Note, that the size of the input vector is different for each rank.
std::vector<int> input(comm.rank(), comm.rank_signed());

{ // Basic use case; gather the inputs across all ranks to all ranks.
auto const output = comm.allgatherv(send_buf(input));
print_result_on_root(output, comm);
print_on_root("-----", comm);
}

{
// return recv buffer and recv_counts
{ // We can also request the number of elements received from each rank. The recv_buf will always be the first out
// parameter. After that, the output parameters are ordered as they appear in the function call.
// Communicating KaMPIng calls like allgatherv return a result object which can be decomposed using structured
// bindings (here) or explicit extract_*() calls (see below).
auto [recv_buffer, recv_counts] = comm.allgatherv(send_buf(input), recv_counts_out());
print_result(recv_buffer, comm);
print_result(recv_counts, comm);
}

{
// write result to an exisiting container
std::vector<int> recv_buffer;
comm.allgatherv(send_buf(input), recv_buf<resize_to_fit>(recv_buffer));
print_result(recv_buffer, comm);
{ // To re-use memory, we can provide an already allocated container to the MPI call.
std::vector<int> output;
// Let KaMPIng resize the recv_buffer to the correct size. Other possibilities are no_resize and grow_only.
comm.allgatherv(send_buf(input), recv_buf<resize_to_fit>(output));

// We can also re-use already allocated containers for the other output parameters, e.g. recv_counts.
std::vector<int> output_counts(comm.size());
std::iota(output_counts.begin(), output_counts.end(), 0);
comm.allgatherv(send_buf(input), recv_buf<resize_to_fit>(output), output_counts(output_counts));

// additionally, receive counts and/or receive displacements can be provided
std::vector<int> recv_counts(comm.size());
std::iota(recv_counts.begin(), recv_counts.end(), 0);
std::vector<int> recv_displs(comm.size());
std::exclusive_scan(recv_counts.begin(), recv_counts.end(), recv_displs.begin(), 0);
recv_buffer.clear();
std::vector<int> displacements(comm.size());
std::exclusive_scan(output_counts.begin(), output_counts.end(), displacements.begin(), 0);
output.clear();

// In this example, we combine all of the concepts mentioned above:
// - Use input as the send buffer
// - Receive all elements into recv_buffer, resizing it to fit exactly the number of elements received.
// - Output the number of elements received from each rank into recv_counts.
// - Output the displacement of the first element received from each rank into recv_displs.
comm.allgatherv(
send_buf(input),
recv_buf<resize_to_fit>(recv_buffer),
kamping::recv_counts(recv_counts),
kamping::recv_displs(recv_displs)
recv_buf<resize_to_fit>(output),
recv_counts(output_counts),
recv_displs(displacements)
);
print_result(recv_buffer, comm);
}

{ // It is also possible to use result.extract_*() calls instead of decomposing the result object using structured
// bindings in order to increase readability.
auto result = comm.allgatherv(send_buf(input), recv_counts_out(), recv_displs_out());
auto const recv_buffer = result.extract_recv_buffer();
auto const recv_counts = result.extract_recv_counts();
auto const recv_displs = result.extract_recv_displs();
}

return 0;
{ // C++ views can be used to send parts of the data.
input.resize(comm.rank() + 1, comm.rank_signed());

// Note, if you're on C++ >= 20 you can use std::span instead.
comm.allgatherv(send_buf(kamping::Span(input).subspan(0, comm.rank())));

// Alternatively
comm.allgatherv(send_buf(input), send_count(comm.rank_signed()));
}

{ // KaMPIng also provides serialization/deserialization using Cereal
using dict_type = std::unordered_map<std::string, uint64_t>;
dict_type data = {{"rank", comm.rank()}};
if (comm.root()) {
data.insert({"size", comm.size()});
data.insert({"root", comm.root()});
}

auto const output = comm.allgatherv(send_buf(as_serialized(data)), recv_buf(as_deserializable<dict_type>()));

if (comm.root()) {
for (auto const& [key, value]: output) {
std::cout << key << " -> " << value << std::endl;
}
}
}

return 0;
}
Loading