diff --git a/examples/usage/CMakeLists.txt b/examples/usage/CMakeLists.txt index d108b3ef..165a6075 100644 --- a/examples/usage/CMakeLists.txt +++ b/examples/usage/CMakeLists.txt @@ -55,3 +55,8 @@ if (KAMPING_ENABLE_SERIALIZATION) target_link_libraries(example_serialization kamping cereal::cereal) target_compile_options(example_serialization PRIVATE ${KAMPING_WARNING_FLAGS}) endif () + +add_executable(example_paper paper_example.cpp) +target_link_libraries(example_paper kamping) +# these examples generate some warning due to conciseness, therefore we do not enable warnings +# target_compile_options(example_paper PRIVATE ${KAMPING_WARNING_FLAGS}) diff --git a/examples/usage/paper_example.cpp b/examples/usage/paper_example.cpp new file mode 100644 index 00000000..49eb2914 --- /dev/null +++ b/examples/usage/paper_example.cpp @@ -0,0 +1,224 @@ +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "kamping/named_parameters.hpp" + +struct MyType { + int a; + double b; + char c; + std::array d; +}; + +namespace kamping { +// using KaMPIng’s built-in struct serializer +template <> +struct mpi_type_traits : struct_type {}; +// or using an explicitly constructed type +// template <> +// struct mpi_type_traits { +// static constexpr bool has_to_be_committed = true; +// static MPI_Datatype data_type() { +// MPI_Datatype type; +// MPI_Type_create_*(..., &type); +// return type; +// } +// }; +} // namespace kamping + +template +auto build_buckets(std::vector& data, std::vector& splitters) -> std::vector> { + std::vector> buckets(splitters.size() + 1); + for (auto& element: data) { + auto const bound = std::upper_bound(splitters.begin(), splitters.end(), element); + buckets[bound - splitters.begin()].push_back(element); + } + data.clear(); + return buckets; +} + +// Sorting code for Fig. 7 +template +void sort(std::vector& data, MPI_Comm comm_) { + using namespace std; + using namespace kamping; + Communicator comm(comm_); + size_t const num_samples = 16 * log2(comm.size()) + 1; + vector lsamples(num_samples); + sample(data.begin(), data.end(), lsamples.begin(), num_samples, mt19937{random_device{}()}); + auto gsamples = comm.allgather(send_buf(lsamples)); + sort(gsamples.begin(), gsamples.end()); + for (size_t i = 0; i < comm.size() - 1; i++) { + gsamples[i] = gsamples[num_samples * (i + 1)]; + } + gsamples.resize(comm.size() - 1); + vector> buckets = build_buckets(data, gsamples); + data.clear(); + vector scounts; + for (auto& bucket: buckets) { + data.insert(data.end(), bucket.begin(), bucket.end()); + scounts.push_back(bucket.size()); + } + data = comm.alltoallv(send_buf(data), send_counts(scounts)); + sort(data.begin(), data.end()); +} + +// These are the examples from the paper. Some examples are not runnable as is, but everything should compile. +// If some change breaks any of these, consider updating the arxiv paper. +auto main() -> int { + kamping::Environment env; + kamping::Communicator comm; + + using namespace kamping; + { + // Fig. 1. + std::vector v = {0.1, 3.14, 4.2, 123.4}; + { + // KaMPIng allows concise code + // with sensible defaults ... (1) + + auto v_global = comm.allgatherv(send_buf(v)); + } + { + // ... or detailed tuning of each parameter (2) + std::vector rc; + auto [v_global, rcounts, rdispls] = comm.allgatherv( + send_buf(v), //(3) + recv_counts_out(std::move(rc)), //(4) + recv_displs_out() //(5) + ); + } + } + { + // Fig. 2. + using T = int; + MPI_Datatype MPI_TYPE = MPI_INT; + MPI_Comm comm = MPI_COMM_WORLD; + std::vector v = {1, 3, 4}; // fill with data + int size, rank; + MPI_Comm_size(comm, &size); + MPI_Comm_rank(comm, &rank); + std::vector rc(size), rd(size); + rc[rank] = v.size(); + // exchange counts + MPI_Allgather(MPI_IN_PLACE, 0, MPI_DATATYPE_NULL, rc.data(), 1, MPI_INT, comm); + // compute displacements + std::exclusive_scan(rc.begin(), rc.end(), rd.begin(), 0); + int n_glob = rc.back() + rd.back(); + // allocate receive buffer + std::vector v_glob(n_glob); + // exchange + MPI_Allgatherv(v.data(), v.size(), MPI_TYPE, v_glob.data(), rc.data(), rd.data(), MPI_TYPE, comm); + } + { + // Fig. 3. + std::vector v = {1, 3, 4}; // fill with data + using T = int; + + { + // Version 1: using KaMPIng’s interface + std::vector rc(comm.size()), rd(comm.size()); + rc[comm.rank()] = v.size(); + comm.allgather(send_recv_buf(rc)); + std::exclusive_scan(rc.begin(), rc.end(), rd.begin(), 0); + std::vector v_glob(rc.back() + rd.back()); + comm.allgatherv(send_buf(v), recv_buf(v_glob), recv_counts(rc), recv_displs(rd)); + } + { + // Version 2: displacements are computed implicitly + std::vector rc(comm.size()); + rc[comm.rank()] = v.size(); + comm.allgather(send_recv_buf(rc)); + std::vector v_glob; + comm.allgatherv(send_buf(v), recv_buf(v_glob), recv_counts(rc)); + } + { + // Version 3: counts are automatically exchanged + // and result is returned by value + std::vector v_glob = comm.allgatherv(send_buf(v)); + } + } + { + std::vector v = {1, 3, 4}; // fill with data + + // Section III snippets + { + auto result = comm.allgatherv(send_buf(v), recv_counts_out()); + auto recv_buf = result.extract_recv_buf(); + auto counts = result.extract_recv_counts(); + } + { // + auto [recv_buf, counts] = comm.allgatherv(send_buf(v), recv_counts_out()); + } + { + using T = int; + std::vector tmp = {1, 2, 3, 4}; + // tmp is moved to the underlying call where the + // storage is reused for the recv buffer + auto recv_buffer = comm.allgatherv(send_buf(v), recv_buf(std::move(tmp))); + } + { + using T = int; + std::vector recv_buffer = {}; + // data is written into recv_buffer directly + comm.allgatherv(send_buf(v), recv_buf(recv_buffer)); + } + { + using T = int; + std::vector recv_buffer; // has to be resized + std::vector counts(comm.size()); // size large enough + comm.allgatherv(send_buf(v), recv_buf(recv_buffer), recv_counts_out(counts)); + } + } + { + // Fig. 4. + // type definition is on top + MyType x{}; + comm.send(send_buf(x), destination(rank::null)); + } + { + // Fig. 5. + using dict = std::unordered_map; + dict data = {{"foo", "bar"}, {"baz", "x"}}; + comm.send(send_buf(as_serialized(data)), destination(rank::null)); + dict recv_dict = comm.recv(recv_buf(as_deserializable())); + } + { + // Fig. 6. + std::vector v = {1, 3, 5}; + auto r1 = comm.isend(send_buf_out(std::move(v)), destination(1)); + v = r1.wait(); // v is moved back to caller after + // request is complete + auto r2 = comm.irecv(recv_count(42)); + std::vector data = r2.wait(); // data only returned + // after request + // is complete + } + { + // Sec. III.G snippet + std::vector data(comm.size()); + data[comm.rank()] = comm.rank(); + data = comm.allgather(send_recv_buf(std::move(data))); + } + { + // Fig. 7. + std::vector data = {13, 1, 7, 18}; + sort(data, MPI_COMM_WORLD); + } + return 0; +} diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 9967f974..5f5ea27a 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -301,6 +301,12 @@ kamping_register_mpi_test( CORES 1 4 ) +kamping_register_mpi_test( + test_examples_from_paper + FILES examples_from_paper_test.cpp + CORES 1 4 +) + # Tests using C++ 20 kamping_register_mpi_test( test_std_span_alltoallv_cpp20 diff --git a/tests/examples_from_paper_test.cpp b/tests/examples_from_paper_test.cpp new file mode 100644 index 00000000..7fb35f25 --- /dev/null +++ b/tests/examples_from_paper_test.cpp @@ -0,0 +1,428 @@ +// This file is part of KaMPIng. +// +// Copyright 2024 The KaMPIng Authors +// +// KaMPIng is free software : you can redistribute it and/or modify it under the terms of the GNU Lesser General Public +// License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later +// version. KaMPIng is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the +// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License +// for more details. +// +// You should have received a copy of the GNU Lesser General Public License along with KaMPIng. If not, see +// . +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "kamping/collectives/allgather.hpp" +#include "kamping/communicator.hpp" +#include "kamping/environment.hpp" + +using namespace ::testing; + +namespace sorting { +template +auto build_buckets(std::vector& data, std::vector& splitters) -> std::vector> { + std::vector> buckets(splitters.size() + 1); + for (auto& element: data) { + auto const bound = std::upper_bound(splitters.begin(), splitters.end(), element); + buckets[static_cast(bound - splitters.begin())].push_back(element); + } + data.clear(); + return buckets; +} + +// Sorting code for Fig. 7 +template +void sort(std::vector& data, MPI_Comm comm_) { + using namespace std; + using namespace kamping; + Communicator comm(comm_); + size_t const num_samples = static_cast(16 * log2(comm.size()) + 1); + vector lsamples(num_samples); + sample(data.begin(), data.end(), lsamples.begin(), num_samples, mt19937{random_device{}()}); + auto gsamples = comm.allgather(send_buf(lsamples)); + sort(gsamples.begin(), gsamples.end()); + for (size_t i = 0; i < comm.size() - 1; i++) { + gsamples[i] = gsamples[num_samples * (i + 1)]; + } + gsamples.resize(comm.size() - 1); + vector> buckets = build_buckets(data, gsamples); + data.clear(); + vector scounts; + for (auto& bucket: buckets) { + data.insert(data.end(), bucket.begin(), bucket.end()); + scounts.push_back(static_cast(bucket.size())); + } + data = comm.alltoallv(send_buf(data), send_counts(scounts)); + sort(data.begin(), data.end()); +} +} // namespace sorting + +namespace bfs { +struct Graph { + using Edges = std::vector>; + bool is_local(size_t v) const { + return v_begin <= v && v < v_end; + } + size_t local_id(size_t v) const { + return v - v_begin; + } + size_t local_size() const { + return v_end - v_begin; + } + Edges const& neighbors(size_t local_v) const { + return edges[local_v]; + } + size_t v_begin; + size_t v_end; + std::vector edges; +}; + +Graph init_graph() { + kamping::Communicator comm; + Graph g; + if (comm.rank() == 0) { + g.v_begin = 0; + g.v_end = 2; + g.edges.resize(2); + g.edges[0].emplace_back(1, 0); + g.edges[1].emplace_back(0, 0); + g.edges[1].emplace_back(2, 1); + } + if (comm.rank() == 1) { + g.v_begin = 2; + g.v_end = 4; + g.edges.resize(2); + g.edges[0].emplace_back(1, 0); + g.edges[0].emplace_back(3, 1); + g.edges[1].emplace_back(2, 1); + g.edges[1].emplace_back(4, 2); + } + if (comm.rank() == 2) { + g.v_begin = 4; + g.v_end = 6; + g.edges.resize(2); + g.edges[0].emplace_back(3, 1); + g.edges[0].emplace_back(5, 2); + g.edges[1].emplace_back(4, 2); + g.edges[1].emplace_back(6, 3); + } + if (comm.rank() == 3) { + g.v_begin = 6; + g.v_end = 8; + g.edges.resize(2); + g.edges[0].emplace_back(5, 2); + g.edges[0].emplace_back(7, 3); + g.edges[1].emplace_back(6, 3); + g.edges[1].emplace_back(0, 0); + } + return g; +} + +using namespace kamping; +using VId = size_t; +using VBuf = std::vector; +constexpr VId undef = std::numeric_limits::max(); + +bool is_empty(VBuf const& frontier, Communicator<> const& comm) { + return comm.allreduce_single(send_buf(frontier.empty()), op(std::logical_and<>{})); +} + +// changed signature from auto to std::unordered_map to be C++17 compliant +VBuf exchange(std::unordered_map frontier, Communicator<> const& comm) { + return with_flattened(frontier, comm.size()).call([&](auto... flattened) { + return comm.alltoallv(std::move(flattened)...); + }); +} + +// not part of listing +std::unordered_map +expand_frontier(Graph const& graph, size_t level, VBuf const& frontier, std::vector& dist) { + std::unordered_map next_frontier; + for (auto const& v: frontier) { + auto v_local = graph.local_id(v); + auto& cur_dist = dist[graph.local_id(v)]; + if (cur_dist == undef) { + cur_dist = level; + for (auto const& [u, rank]: graph.neighbors(v_local)) { + next_frontier[rank].push_back(u); + } + } + } + return next_frontier; +} + +std::vector bfs(Graph const& g, VId s, MPI_Comm _comm) { + Communicator comm(_comm); + VBuf frontier; + std::unordered_map next_frontier; + std::vector dist(g.local_size(), undef); + size_t level = 0; + if (g.is_local(s)) { + frontier.push_back(s); + } + while (!is_empty(frontier, comm)) { + next_frontier = expand_frontier(g, level, frontier, dist); + frontier = exchange(std::move(next_frontier), comm); + ++level; + } + return dist; +} +} // namespace bfs + +template +auto repeat_n(std::vector const& vec, std::size_t n) { + std::vector result; + for (size_t i = 0; i < n; ++i) { + result.insert(result.end(), vec.begin(), vec.end()); + } + return result; +} + +TEST(ExamplesFromPaperTest, figure1) { + using namespace kamping; + Communicator comm; + std::vector v = {0.1, 3.14, 4.2, 123.4}; + std::vector expected_res = repeat_n(v, comm.size()); + std::vector expected_rcounts(comm.size(), 4); + std::vector expected_rdispls(comm.size()); + std::exclusive_scan(expected_rcounts.begin(), expected_rcounts.end(), expected_rdispls.begin(), 0); + + { + // KaMPIng allows concise code + // with sensible defaults ... (1) + + auto v_global = comm.allgatherv(send_buf(v)); + // test result (not part of listing) + EXPECT_EQ(v_global, expected_res); + } + { + // ... or detailed tuning of each parameter (2) + std::vector rc; + auto [v_global, rcounts, rdispls] = comm.allgatherv( + send_buf(v), //(3) + recv_counts_out(std::move(rc)), //(4) + recv_displs_out() //(5) + ); + // test result (not part of listing) + EXPECT_EQ(v_global, expected_res); + EXPECT_EQ(rcounts, expected_rcounts); + EXPECT_EQ(rdispls, expected_rdispls); + } +} + +TEST(ExamplesFromPaperTest, figure2) { + using T = int; + MPI_Datatype MPI_TYPE = MPI_INT; + MPI_Comm comm = MPI_COMM_WORLD; + std::vector v = {1, 3, 4}; // fill with data + int size, rank; + MPI_Comm_size(comm, &size); + MPI_Comm_rank(comm, &rank); + std::vector rc(static_cast(size)), rd(static_cast(size)); + rc[static_cast(rank)] = static_cast(v.size()); + // exchange counts + MPI_Allgather(MPI_IN_PLACE, 0, MPI_DATATYPE_NULL, rc.data(), 1, MPI_INT, comm); + // compute displacements + std::exclusive_scan(rc.begin(), rc.end(), rd.begin(), 0); + int n_glob = rc.back() + rd.back(); + // allocate receive buffer + std::vector v_glob(static_cast(n_glob)); + // exchange + MPI_Allgatherv(v.data(), static_cast(v.size()), MPI_TYPE, v_glob.data(), rc.data(), rd.data(), MPI_TYPE, comm); + + // test result (not part of listing) + EXPECT_EQ(v_glob, repeat_n(v, static_cast(size))); +} + +TEST(ExamplesFromPaperTest, figure3) { + using namespace kamping; + Communicator comm; + + std::vector v = {1, 3, 4}; // fill with data + using T = int; + + { + // Version 1: using KaMPIng’s interface + std::vector rc(comm.size()), rd(comm.size()); + rc[comm.rank()] = static_cast(v.size()); + comm.allgather(send_recv_buf(rc)); + std::exclusive_scan(rc.begin(), rc.end(), rd.begin(), 0); + std::vector v_glob(static_cast(rc.back() + rd.back())); + comm.allgatherv(send_buf(v), recv_buf(v_glob), recv_counts(rc), recv_displs(rd)); + + // test result (not part of listing) + EXPECT_EQ(v_glob, repeat_n(v, comm.size())); + } + { + // Version 2: displacements are computed implicitly + std::vector rc(comm.size()); + rc[comm.rank()] = static_cast(v.size()); + comm.allgather(send_recv_buf(rc)); + std::vector v_glob; + comm.allgatherv(send_buf(v), recv_buf(v_glob), recv_counts(rc)); + + // test result (not part of listing) + EXPECT_EQ(v_glob, repeat_n(v, comm.size())); + } + { + // Version 3: counts are automatically exchanged + // and result is returned by value + std::vector v_glob = comm.allgatherv(send_buf(v)); + + // test result (not part of listing) + EXPECT_EQ(v_glob, repeat_n(v, comm.size())); + } +} + +TEST(ExamplesFromPaperTest, section_III_snippets) { + using namespace kamping; + Communicator comm; + std::vector v = {1, 3, 4}; // fill with data + { + auto result = comm.allgatherv(send_buf(v), recv_counts_out()); + auto recv_buf = result.extract_recv_buf(); + auto counts = result.extract_recv_counts(); + + EXPECT_EQ(recv_buf, repeat_n(v, comm.size())); + EXPECT_EQ(counts.size(), comm.size()); + EXPECT_THAT(counts, Each(3)); + } + { + auto [recv_buf, counts] = comm.allgatherv(send_buf(v), recv_counts_out()); + + // test result (not part of listing) + EXPECT_EQ(recv_buf, repeat_n(v, comm.size())); + EXPECT_EQ(counts.size(), comm.size()); + EXPECT_THAT(counts, Each(3)); + } + { + using T = int; + std::vector tmp(comm.size() * v.size()); // ... + // tmp is moved to the underlying call where the + // storage is reused for the recv buffer + auto recv_buffer = comm.allgatherv(send_buf(v), recv_buf(std::move(tmp))); + + // test result (not part of listing) + EXPECT_EQ(recv_buffer, repeat_n(v, comm.size())); + } + { + using T = int; + std::vector recv_buffer(comm.size() * v.size()); //... + // data is written into recv_buffer directly + comm.allgatherv(send_buf(v), recv_buf(recv_buffer)); + + // test result (not part of listing) + EXPECT_EQ(recv_buffer, repeat_n(v, comm.size())); + } + { + using T = int; + std::vector recv_buffer; // has to be resized + std::vector counts(comm.size()); // size large enough + comm.allgatherv(send_buf(v), recv_buf(recv_buffer), recv_counts_out(counts)); + + // test result (not part of listing) + EXPECT_EQ(recv_buffer, repeat_n(v, comm.size())); + } +} + +TEST(ExamplesFromPaperTest, figure5) { + using namespace kamping; + Communicator comm; + if (comm.size() < 2) { + return; + } + using dict = std::unordered_map; + dict data = {{"foo", "bar"}, {"baz", "x"}}; + if (comm.rank() == 0) // if is not part of listing + comm.send(send_buf(as_serialized(data)), destination(1)); + if (comm.rank() == 1) { // if is not part of listing + dict recv_dict = comm.recv(recv_buf(as_deserializable())); + // test result (not part of listing) + EXPECT_EQ(recv_dict, data); + } +} + +TEST(ExamplesFromPaperTest, figure6) { + using namespace kamping; + Communicator comm; + if (comm.size() < 2) { + return; + } + + std::vector v = {1, 3, 5}; // ... + std::vector const expected_v = {1, 3, 5}; // not part of listing + if (comm.rank() == 0) { // if is not part of listing + auto r1 = comm.isend(send_buf_out(std::move(v)), destination(1)); + v = r1.wait(); // v is moved back to caller after + // test result (not part of listing) + EXPECT_EQ(v, expected_v); + } + if (comm.rank() == 1) { // if is not part of listing + auto r2 = comm.irecv(recv_count(42)); + std::vector data = r2.wait(); // data only returned + // after request + // is complete + + // test result (not part of listing) + EXPECT_EQ(data.size(), 42); + EXPECT_THAT(Span(data.begin(), data.begin() + 3), ElementsAre(1, 3, 5)); + } +} + +TEST(ExamplesFromPaperTest, section_III_g) { + using namespace kamping; + Communicator comm; + + std::vector data(comm.size()); + data[comm.rank()] = static_cast(comm.rank()); + data = comm.allgather(send_recv_buf(std::move(data))); + + // test result (not part of listing) + std::vector expected_res(comm.size()); + std::iota(expected_res.begin(), expected_res.end(), 0); + EXPECT_EQ(data, expected_res); +} + +TEST(ExamplesFromPaperTest, figure7) { + std::vector data = {13, 1, 7, 18}; + auto gathered_data = kamping::comm_world().allgatherv(kamping::send_buf(data)); + std::sort(gathered_data.begin(), gathered_data.end()); + + sorting::sort(data, MPI_COMM_WORLD); + + // test result (not part of listing) + using namespace kamping; + Communicator comm; + auto gathered_result = kamping::comm_world().allgatherv(kamping::send_buf(data)); + EXPECT_EQ(gathered_result, gathered_data); +} + +TEST(ExamplesFromPaperTest, figure9) { + using namespace kamping; + Communicator comm; + + if (comm.size() != 4) { + return; + } + + // test result (not part of listing) + bfs::Graph g = bfs::init_graph(); + auto bfs_levels = bfs::bfs(g, 0, MPI_COMM_WORLD); + + auto gathered_levels = comm.allgatherv(send_buf(bfs_levels)); + EXPECT_THAT(gathered_levels, ElementsAre(0, 1, 2, 3, 4, 5, 6, 7)); +}