-
Notifications
You must be signed in to change notification settings - Fork 179
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
cuda.parallel: Add optional stream argument to reduce_into() #3348
base: main
Are you sure you want to change the base?
cuda.parallel: Add optional stream argument to reduce_into() #3348
Conversation
🟩 CI finished in 24m 14s: Pass: 100%/1 | Total: 24m 14s | Avg: 24m 14s | Max: 24m 14s
|
Project | |
---|---|
CCCL Infrastructure | |
libcu++ | |
CUB | |
Thrust | |
CUDA Experimental | |
+/- | python |
CCCL C Parallel Library | |
Catch2Helper |
Modifications in project or dependencies?
Project | |
---|---|
CCCL Infrastructure | |
libcu++ | |
CUB | |
Thrust | |
CUDA Experimental | |
+/- | python |
CCCL C Parallel Library | |
Catch2Helper |
🏃 Runner counts (total jobs: 1)
# | Runner |
---|---|
1 | linux-amd64-gpu-v100-latest-1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good! A few minor suggestions
if not hasattr(stream, "__cuda_stream__"): | ||
raise TypeError( | ||
f"stream argument {stream} does not implement the '__cuda_stream__' protocol" | ||
) | ||
|
||
stream_property = stream.__cuda_stream__ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general, EAFP is more idiomatic, and also as being discussed in NVIDIA/cuda-python#348, much faster:
So instead of:
if not hasattr(obj, "attr"):
raise TypeError(...)
attr = obj.attr
Prefer:
try:
attr = obj.attr
except AttributeError:
raise TypeError(...)
As a rule of thumb, we should avoid hasattr
in particular as it can be very slow.
(side note: IIRC there's at least one other place in the codebase that we're using hasattr
, and likely that needs to be changed as well)
if ( | ||
isinstance(stream_property, tuple) | ||
and len(stream_property) == 2 | ||
and all(isinstance(i, int) for i in stream_property) | ||
): | ||
version, handle = stream_property | ||
return handle | ||
|
||
raise TypeError( | ||
f"__cuda_stream__ property of '{stream}' must return a 'Tuple[int, int]'; got {stream_property} instead" | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Personally, I'd be a bit more lax here and really only ensure that handle
is an int. If someone returns a list
rather than a tuple
, that's probably not relevant to us:
if ( | |
isinstance(stream_property, tuple) | |
and len(stream_property) == 2 | |
and all(isinstance(i, int) for i in stream_property) | |
): | |
version, handle = stream_property | |
return handle | |
raise TypeError( | |
f"__cuda_stream__ property of '{stream}' must return a 'Tuple[int, int]'; got {stream_property} instead" | |
) | |
_, handle = stream_property # (version, handle) | |
if not isinstance(handle, int): | |
raise TypeError(...) | |
return handle |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also check if version is 0, because we could change it to say 3-tuple in a future version of the protocol.
@@ -46,6 +46,30 @@ def _dtype_validation(dt1, dt2): | |||
raise TypeError(f"dtype mismatch: __init__={dt1}, __call__={dt2}") | |||
|
|||
|
|||
def _validate_and_get_stream(stream) -> Optional[int]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In terms of where this function should live, here are a couple of suggestions:
- Move it to
_utils/stream.py
- Rename
_utils/cai.py
to_utils/protocols.py
and move it there (this module could be general utilities for working with protocol objects like__cuda_array_interface__
and__cuda_stream__
)
reduce_into(d_temp_storage, d_in, d_out, d_in.size, h_init, stream=stream_wrapper) | ||
np.testing.assert_allclose(d_in.sum().get(), d_out.get()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should call stream.synchronize()
after the call to reduce_into
.
Perhaps our wrapper Stream
type should have a .synchronize()
method that calls self.cupy_stream.synchronize()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wearing my CuPy hat: Just call
cupy.asnumpy(..., stream=stream, blocking=True)
or
with stream:
cupy.asnumpy(..., blocking=True)
to perform a stream-ordered, blocking copy to host. No need to add .synchronize()
this way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even better is call cp.testing.assert_allclose(...)
then no need to copy it does the copy internally for us, just need to stream-order it:
with stream:
cp.testing.assert_allclose(...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it does the copy internally for us, just need to stream-order it:
In general, is it recommended that we rely on this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it's public API.
# null stream is allowed | ||
if stream is None: | ||
return None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I think this would be a common source of bugs. The first naive question is would bindings.cccl_device_reduce()
take None
for the stream argument? But more importantly, I expect that by the time of integration (with CuPy & co) we must take an explicit stream, e.g.
reduce_into(..., stream=cp.cuda.get_current_stream())
in order to preserve the respective library's stream ordering. If it's the case, we probably should explicitly forbid stream=None
(as we do in cuda.core
, btw) so as to avoid the mistakes that someone integrating the code forgets to fetch the library stream and pass it to cuda.par.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI, in nvmath-python stream=None
is used to mean "look up the input arrays' library and take the library's current stream (or whatever default it uses)," which has the same semantics as the snippet above, but is achieved in a much more complex and laborious way that we hope to simplify with __cuda_stream__
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but is achieved in a much more complex and laborious way that we hope to simplify with cuda_stream.
Could you expand on this a bit? How would __cuda_stream__
help here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess I see what you're asking. For interpreting a provided stream, the protocol will help. For understanding the stream semantics of each array library (ex: does it have the notion of a current stream?), no the protocol would not help.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The first naive question is would bindings.cccl_device_reduce() take
None
for the stream argument?
Yes - this would match the API of the corresponding C++ function where the stream
argument is explicitly optional and defaults to 0
.
we probably should explicitly forbid stream=None (as we do in cuda.core, btw) so as to avoid the mistakes that someone integrating the code forgets to fetch the library stream and pass it to cuda.par.
Please correct me if I'm wrong, but my understanding is that your concern is about the use of APIs like CuPy's Stream
or PyTorch's stream
, which are both context managers, and set a library-specific "current stream":
with torch.cuda.stream() as s:
torch.some_function(...) # no need to pass a stream explicitly, uses `s` implicitly
The above works great as long as I'm only using PyTorch, but if I want to combine PyTorch with e.g., CuPy:
with torch.cuda.stream() as s:
torch.some_function(...) # no need to pass a stream explicitly, uses `s` implicitly
# need to pass stream explicitly, as cupy doesn't know about
# PyTorch's "current stream":
cupy.some_other_function(..., s)
I certainly agree that what you're describing is a concern, but I don't feel that the API decisions of downstream libraries like CuPy or PyTorch should influence the APIs of upstream libraries like cuda.core
or cuda.parallel
.
The default stream is a reasonable default and 'just works' across the ecosystem for the majority of users who don't necessarily want to use CUDA streams. I would prefer we keep things easy for them.
If someone opts in to using streams, then I think it's fine to require that they take additional care to pass streams appropriately to the various functions they use across libraries (which is something they need to do already).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In both cuda.core
and cudax
in this repo, every place that could take a stream (ex: launch()
) requires an explicit stream. If a user wants to use the default stream (legacy or PTDS) they can do so by passing it explicitly (ex: with cuda.core
it'd be stream=Device().default_stream
). All of our "modern" designs have been gearing toward explicitness, and the default stream in "old CUDA" is nothing but a named stream, no more special (apart from all of the usual caveats) than other user-created streams.
One simple example where a default stream
choice could fail miserably:
with stream as s:
arr = cp.random.random(...)
reduce_into(..., d_in=arr, ...) # forgot to pass s here for whatever reason
It is obvious that the input array creation/reduction are not ordered properly. I don't want us to honor any library's stream context like nvmath-python did (perhaps other than cuda.core
's, once we decide to do it there), but I want the above snippet to raise an error so that users know they need to do this:
with stream as s:
arr = cp.random.random(...)
reduce_into(..., d_in=arr, ..., stream=s)
That is, I was suggesting cuda.parallel
to always require a stream explicitly.
But, before we spend more time to discuss this choice, it'd be nice to know if the current APIs are final (i.e. end-user facing). I was under the impression that they are not, so perhaps this discussion is not that relevant/urgent as far as this PR is concerned?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But, before we spend more time to discuss this choice, it'd be nice to know if the current APIs are final
The current API is meant to be low-level. We've aligned on making this low-level API as close to the underlying thrust/CUB APIs as possible.
so perhaps this discussion is not that relevant/urgent as far as this PR is concerned?
Yeah I agree - let's revisit when we're designing the higher level "user-facing" API. For this PR, I'd prefer if we're more conservative and not commit to requiring streams (just like the underlying C++ API).
@@ -85,7 +109,9 @@ def __init__( | |||
if error != enums.CUDA_SUCCESS: | |||
raise ValueError("Error building reduce") | |||
|
|||
def __call__(self, temp_storage, d_in, d_out, num_items: int, h_init: np.ndarray): | |||
def __call__( | |||
self, temp_storage, d_in, d_out, num_items: int, h_init: np.ndarray, stream=None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have strong opinion in this, just FYI: in some places in cuda.core
we have the following signature
def func(..., *, stream):
so that we enforce stream
to be passed as a kw arg, but has no default value. The consideration back then was to ensure users to pass a stream explicitly.
Description
Closes #3080
This PR adds an optional
stream
argument tocuda.parallel
'sreduce_into()
. The default value of this argument isNone
, which replicates the original behavior prior to this PR, so existing code is unaffected. The stream object must either beNone
(i.e. the default stream) or implement the__cuda_stream__
protocol. Passing in an object that either doesn't implement the protocol or implements it but incorrectly results in an error.Also included are two tests, one with a
cupy
stream and one for invalid streams. I have not added documentation yet because I am unsure where to add it.Checklist