Skip to content

Commit

Permalink
Expose stream-ordering in copying APIs (rapidsai#17435)
Browse files Browse the repository at this point in the history
Adds stream parameter to
```
cudf::contiguous_split
cudf::chunked_pack::chunked_pack
cudf::chunked_pack::create
cudf::pack
```
Added stream gtests to verify correct stream forwarding.

Reference: rapidsai#13744

Authors:
  - Shruti Shivakumar (https://github.com/shrshi)

Approvers:
  - David Wendt (https://github.com/davidwendt)
  - Alessandro Bellina (https://github.com/abellina)

URL: rapidsai#17435
  • Loading branch information
shrshi authored Jan 15, 2025
1 parent 960c723 commit e272f1e
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 19 deletions.
4 changes: 2 additions & 2 deletions cpp/benchmarks/copying/contiguous_split.cu
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
* Copyright (c) 2019-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -36,7 +36,7 @@ void chunked_pack(cudf::table_view const& src_table, std::vector<cudf::size_type
auto const mr = cudf::get_current_device_resource_ref();
auto const stream = cudf::get_default_stream();
auto user_buffer = rmm::device_uvector<std::uint8_t>(100L * 1024 * 1024, stream, mr);
auto chunked_pack = cudf::chunked_pack::create(src_table, user_buffer.size(), mr);
auto chunked_pack = cudf::chunked_pack::create(src_table, user_buffer.size());
while (chunked_pack->has_next()) {
auto iter_size = chunked_pack->next(user_buffer);
}
Expand Down
10 changes: 9 additions & 1 deletion cpp/include/cudf/contiguous_split.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
* Copyright (c) 2023-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -114,13 +114,15 @@ struct packed_table {
*
* @param input View of a table to split
* @param splits A vector of indices where the view will be split
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr An optional memory resource to use for all returned device allocations
* @return The set of requested views of `input` indicated by the `splits` and the viewed memory
* buffer
*/
std::vector<packed_table> contiguous_split(
cudf::table_view const& input,
std::vector<size_type> const& splits,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref());

namespace detail {
Expand Down Expand Up @@ -198,12 +200,14 @@ class chunked_pack {
* @param input source `table_view` to pack
* @param user_buffer_size buffer size (in bytes) that will be passed on `next`. Must be
* at least 1MB
* @param stream CUDA stream used for device memory operations and kernel launches
* @param temp_mr An optional memory resource to be used for temporary and scratch allocations
* only
*/
explicit chunked_pack(
cudf::table_view const& input,
std::size_t user_buffer_size,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref temp_mr = cudf::get_current_device_resource_ref());

/**
Expand Down Expand Up @@ -263,12 +267,14 @@ class chunked_pack {
* @param input source `table_view` to pack
* @param user_buffer_size buffer size (in bytes) that will be passed on `next`. Must be
* at least 1MB
* @param stream CUDA stream used for device memory operations and kernel launches
* @param temp_mr RMM memory resource to be used for temporary and scratch allocations only
* @return a unique_ptr of chunked_pack
*/
[[nodiscard]] static std::unique_ptr<chunked_pack> create(
cudf::table_view const& input,
std::size_t user_buffer_size,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref temp_mr = cudf::get_current_device_resource_ref());

private:
Expand All @@ -284,11 +290,13 @@ class chunked_pack {
* `cudf::unpack` to deserialize.
*
* @param input View of the table to pack
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr An optional memory resource to use for all returned device allocations
* @return packed_columns A struct containing the serialized metadata and data in contiguous host
* and device memory respectively
*/
packed_columns pack(cudf::table_view const& input,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref());

/**
Expand Down
12 changes: 7 additions & 5 deletions cpp/src/copying/contiguous_split.cu
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
* Copyright (c) 2019-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,7 +28,6 @@
#include <cudf/structs/structs_column_view.hpp>
#include <cudf/table/table_view.hpp>
#include <cudf/utilities/bit.hpp>
#include <cudf/utilities/default_stream.hpp>
#include <cudf/utilities/memory_resource.hpp>

#include <rmm/cuda_stream_view.hpp>
Expand Down Expand Up @@ -2072,22 +2071,24 @@ std::vector<packed_table> contiguous_split(cudf::table_view const& input,

std::vector<packed_table> contiguous_split(cudf::table_view const& input,
std::vector<size_type> const& splits,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
CUDF_FUNC_RANGE();
return detail::contiguous_split(input, splits, cudf::get_default_stream(), mr);
return detail::contiguous_split(input, splits, stream, mr);
}

chunked_pack::chunked_pack(cudf::table_view const& input,
std::size_t user_buffer_size,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref temp_mr)
{
CUDF_EXPECTS(user_buffer_size >= desired_batch_size,
"The output buffer size must be at least 1MB in size");
// We pass `std::nullopt` for the first `mr` in `contiguous_split_state` to indicate
// that it does not allocate any user-bound data for the `chunked_pack` case.
state = std::make_unique<detail::contiguous_split_state>(
input, user_buffer_size, cudf::get_default_stream(), std::nullopt, temp_mr);
input, user_buffer_size, stream, std::nullopt, temp_mr);
}

// required for the unique_ptr to work with a incomplete type (contiguous_split_state)
Expand All @@ -2112,9 +2113,10 @@ std::unique_ptr<std::vector<uint8_t>> chunked_pack::build_metadata() const

std::unique_ptr<chunked_pack> chunked_pack::create(cudf::table_view const& input,
std::size_t user_buffer_size,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref temp_mr)
{
return std::make_unique<chunked_pack>(input, user_buffer_size, temp_mr);
return std::make_unique<chunked_pack>(input, user_buffer_size, stream, temp_mr);
}

}; // namespace cudf
9 changes: 5 additions & 4 deletions cpp/src/copying/pack.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021-2024, NVIDIA CORPORATION.
* Copyright (c) 2021-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,7 +17,6 @@
#include <cudf/contiguous_split.hpp>
#include <cudf/detail/contiguous_split.hpp>
#include <cudf/detail/nvtx/ranges.hpp>
#include <cudf/utilities/default_stream.hpp>

#include <rmm/cuda_stream_view.hpp>

Expand Down Expand Up @@ -257,10 +256,12 @@ void metadata_builder::clear() { return impl->clear(); }
/**
* @copydoc cudf::pack
*/
packed_columns pack(cudf::table_view const& input, rmm::device_async_resource_ref mr)
packed_columns pack(cudf::table_view const& input,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
CUDF_FUNC_RANGE();
return detail::pack(input, cudf::get_default_stream(), mr);
return detail::pack(input, stream, mr);
}

/**
Expand Down
5 changes: 3 additions & 2 deletions cpp/tests/copying/split_tests.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
* Copyright (c) 2019-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -1394,7 +1394,8 @@ std::vector<cudf::packed_table> do_chunked_pack(cudf::table_view const& input)
auto bounce_buff_span =
cudf::device_span<uint8_t>(static_cast<uint8_t*>(bounce_buff.data()), bounce_buff.size());

auto chunked_pack = cudf::chunked_pack::create(input, bounce_buff_span.size(), mr);
auto chunked_pack =
cudf::chunked_pack::create(input, bounce_buff_span.size(), cudf::get_default_stream(), mr);

// right size the final buffer
rmm::device_buffer final_buff(
Expand Down
25 changes: 24 additions & 1 deletion cpp/tests/streams/copying_test.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,6 +20,7 @@
#include <cudf_test/iterator_utilities.hpp>
#include <cudf_test/type_lists.hpp>

#include <cudf/contiguous_split.hpp>
#include <cudf/copying.hpp>
#include <cudf/detail/null_mask.hpp>

Expand Down Expand Up @@ -337,3 +338,25 @@ TEST_F(CopyingTest, PurgeNonEmptyNulls)

cudf::purge_nonempty_nulls(*input, cudf::test::get_default_stream());
}

TEST_F(CopyingTest, ContiguousSplit)
{
std::vector<cudf::size_type> splits{
2, 16, 31, 35, 64, 97, 158, 190, 638, 899, 900, 901, 996, 4200, 7131, 8111};

cudf::size_type size = 10002;
auto iter = cudf::detail::make_counting_transform_iterator(
0, [](auto i) { return static_cast<double>(i); });

std::vector<std::string> base_strings(
{"banana", "pear", "apple", "pecans", "vanilla", "cat", "mouse", "green"});
auto string_randomizer = thrust::make_transform_iterator(
thrust::make_counting_iterator(0),
[&base_strings](cudf::size_type i) { return base_strings[rand() % base_strings.size()]; });

cudf::test::fixed_width_column_wrapper<double> col(iter, iter + size);
std::vector<std::string> strings(string_randomizer, string_randomizer + size);
cudf::test::strings_column_wrapper col2(strings.begin(), strings.end());
cudf::table_view tbl({col, col2});
auto result = cudf::contiguous_split(tbl, splits, cudf::test::get_default_stream());
}
9 changes: 5 additions & 4 deletions java/src/main/native/src/TableJni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4181,10 +4181,11 @@ JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_Table_makeChunkedPack(
cudf::table_view* n_table = reinterpret_cast<cudf::table_view*>(input_table);
// `temp_mr` is the memory resource that `cudf::chunked_pack` will use to create temporary
// and scratch memory only.
auto temp_mr = memoryResourceHandle != 0
? reinterpret_cast<rmm::mr::device_memory_resource*>(memoryResourceHandle)
: cudf::get_current_device_resource_ref();
auto chunked_pack = cudf::chunked_pack::create(*n_table, bounce_buffer_size, temp_mr);
auto temp_mr = memoryResourceHandle != 0
? reinterpret_cast<rmm::mr::device_memory_resource*>(memoryResourceHandle)
: cudf::get_current_device_resource();
auto chunked_pack =
cudf::chunked_pack::create(*n_table, bounce_buffer_size, cudf::get_default_stream(), temp_mr);
return reinterpret_cast<jlong>(chunked_pack.release());
}
CATCH_STD(env, 0);
Expand Down

0 comments on commit e272f1e

Please sign in to comment.