Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding Combination Operators Zip, CombineLatest, WithLatestFrom #330

Open
wants to merge 83 commits into
base: branch-23.07
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
83 commits
Select commit Hold shift + click to select a range
5bc9b35
Fixing CombineLatest and adding Zip operator
mdemoret-nv Jun 2, 2023
293c481
Adding WithLatestFrom
mdemoret-nv Jun 3, 2023
32672c9
IWYU fixes
mdemoret-nv Jun 3, 2023
34d4767
Adding copyright header
mdemoret-nv Jun 21, 2023
cfbafd3
Merge branch 'branch-23.07' into mdd_combining-operators
mdemoret-nv Jun 21, 2023
7cbfe8e
Merge branch 'branch-23.07' into mdd_combining-operators
mdemoret-nv Jul 10, 2023
0178730
In progress adding children to Object classes
mdemoret-nv Jul 12, 2023
e6a320f
Creating branch for v23.11
mdemoret-nv Jul 14, 2023
7ada881
Update Versions for v23.11.00 (#357)
mdemoret-nv Jul 14, 2023
9b47924
Merge pull request #358 from nv-morpheus/branch-23.07
GPUtester Jul 20, 2023
9ae280a
Adopt patched pybind11 (#364)
dagardner-nv Aug 18, 2023
fa5e40c
Use `copy-pr-bot` (#369)
ajschmidt8 Aug 29, 2023
a2a9829
Fixing an issue with `update-versions.sh` which always blocked CI (#377)
mdemoret-nv Sep 6, 2023
79a27e2
Updates for MRC/Morpheus to build in the same RAPIDS devcontainer env…
cwharris Sep 7, 2023
483194d
Make Quick Start Guide not use `make_node_full` (#376)
cwharris Sep 7, 2023
5747320
Revert boost upgrade, and update clang to v16 (#382)
dagardner-nv Sep 11, 2023
9153243
Add missing flags to docker command to mount the working dir and set …
dagardner-nv Sep 11, 2023
18ffe6e
update rapidsai/ci to rapidsai/ci-conda (#396)
AyodeAwe Sep 22, 2023
40d20a3
Safe handling of control plane promises & fix CI (#391)
dagardner-nv Sep 23, 2023
5a4a1a4
Update ObservableProxy::pipe to support any number of operators (#387)
cwharris Sep 25, 2023
079e371
Add local CI scripts & rebase docker image (#394)
dagardner-nv Sep 25, 2023
8b20469
Add test for gc being invoked in a thread finalizer (#365)
dagardner-nv Sep 25, 2023
ba483b1
Fix EdgeHolder from incorrectly reporting an active connection (#402)
dagardner-nv Sep 28, 2023
34bacd1
Update boost versions to match version used in dev env (#404)
dagardner-nv Oct 9, 2023
24865be
Fix libhwloc & stubgen versions to match dev yaml (#405)
dagardner-nv Oct 9, 2023
36460c9
Adding more coroutine components to support async generators and task…
mdemoret-nv Oct 20, 2023
1ebd4e2
Move Pycoro from Morpheus to MRC (#409)
cwharris Oct 25, 2023
62e1834
Add AsyncioRunnable (#411)
cwharris Nov 2, 2023
8aa9216
Use a traditional semaphore in AsyncioRunnable (#412)
cwharris Nov 3, 2023
eda079d
Creating branch for v24.03
mdemoret-nv Dec 1, 2023
b152175
Updating versions for v24.03.00
mdemoret-nv Dec 1, 2023
dbce7f9
23.11 Cleanup
mdemoret-nv Dec 1, 2023
de48cb1
Updating CHANGELOG
mdemoret-nv Dec 1, 2023
960b8d1
Merge branch 'branch-23.11' into branch-24.03-merge-23.11
mdemoret-nv Dec 6, 2023
5d9bd22
Fix auto-merger error merging branch 23.11 into 24.03
mdemoret-nv Dec 6, 2023
dbbdc7c
Use `dependencies.yaml` to generate environment files (#416)
cwharris Dec 7, 2023
fdc0fbe
Add flake8, yapf, and isort pre-commit hooks. (#420)
cwharris Dec 15, 2023
d12e95e
Updating the workspace settings to remove deprecated python options (…
mdemoret-nv Dec 22, 2023
368fe7a
Optionally skip the CI pipeline if the PR contains the skip-ci label …
dagardner-nv Dec 22, 2023
294e087
Remove redundant copy of libmrc_pymrc.so (#429)
dagardner-nv Jan 5, 2024
75e43dd
Unifying cmake exports name across all Morpheus repos (#427)
mdemoret-nv Jan 8, 2024
00dfd7b
RAPIDS 24.02 Upgrade (#433)
cwharris Jan 24, 2024
cf3d20f
Update Conda channels to prioritize `conda-forge` over `nvidia` (#436)
cwharris Feb 7, 2024
33b2ef4
Adopt updated builds of CI runners (#442)
dagardner-nv Feb 9, 2024
cc0dc78
Update MRC to use CCCL instead of libcudacxx (#444)
cwharris Feb 14, 2024
6c4256d
Update ops-bot.yaml (#446)
AyodeAwe Feb 22, 2024
3010601
Add IoScheduler to enable epoll-based Task scheduling (#448)
cwharris Feb 29, 2024
2dbd985
Adding RoundRobinRouter node type for distributing values to downstre…
mdemoret-nv Mar 7, 2024
a920644
Update cast_from_pyobject to throw on unsupported types rather than r…
dagardner-nv Mar 12, 2024
9cf1ebc
Removing the INFO log when creating an AsyncioRunnable (#456)
mdemoret-nv Mar 15, 2024
bd7955e
Add `TestScheduler` to support testing time-based coroutines without …
cwharris Mar 25, 2024
f4e6266
Add JSONValues container for holding Python values as JSON objects if…
dagardner-nv Apr 3, 2024
e080e77
Update CR year (#460)
dagardner-nv Apr 4, 2024
5242760
Add pybind11 type caster for JSONValues (#458)
dagardner-nv Apr 5, 2024
5f9d73b
Merge pull request #463 from nv-morpheus/branch-24.03
mdemoret-nv Apr 5, 2024
f78d50d
Creating branch for v24.06
dagardner-nv Apr 5, 2024
5a6ae3c
Updating versions for v24.06.00
dagardner-nv Apr 5, 2024
029ac49
Update minimum requirements (#467)
dagardner-nv Apr 6, 2024
d64eaa5
Add maximum simultaneous tasks support to `TaskContainer` (#464)
cwharris Apr 6, 2024
99a7add
Disabling the root level RAPIDS "checks" step in CI to allow CI on `b…
mdemoret-nv Apr 7, 2024
edad3c3
Updating CHANGELOG
mdemoret-nv Apr 8, 2024
e1d0e9d
Cleaning up style for release
mdemoret-nv Apr 8, 2024
06171dc
Downgrade doxygen to match Morpheus (#469)
cwharris Apr 10, 2024
0701b13
Consolidate redundant split_string_to_array, split_string_on & split_…
dagardner-nv Apr 10, 2024
2308e23
Merge branch-24.03 into branch-24.06
dagardner-nv Apr 10, 2024
41afd96
Merge pull request #471 from dagardner-nv/branch-24.06-merge-24.03
dagardner-nv Apr 11, 2024
8b3d310
Add auto register helpers to AsyncSink and AsyncSource (#473)
dagardner-nv Apr 16, 2024
84bfffc
Updating CHANGELOG
dagardner-nv Apr 16, 2024
7b599a3
Merge pull request #474 from nv-morpheus/branch-24.03
dagardner-nv Apr 16, 2024
5c52d5c
Merge remote-tracking branch 'upstream/branch-24.06' into mdd_combini…
mdemoret-nv May 1, 2024
439d7df
resolve rapids-dependency-file-generator warning (#482)
jameslamb Jun 17, 2024
f754c78
Creating branch for v24.10
dagardner-nv Jul 3, 2024
246ac97
Updating versions for v24.10.00
dagardner-nv Jul 3, 2024
cc89f5f
Updating CHANGELOG
dagardner-nv Jul 3, 2024
8377f6a
Merge pull request #484 from nv-morpheus/branch-24.06
dagardner-nv Jul 3, 2024
bceb7ef
Ensure proper initialization of `CMAKE_INSTALL_PREFIX` if needed (#485)
dagardner-nv Jul 24, 2024
ca8a73f
Stop a python source once the subscriber is no longer subscribed (#493)
dagardner-nv Aug 29, 2024
8489b45
Define a Python source which receives a reference to a subscriber (#496)
dagardner-nv Sep 11, 2024
ccbcd76
Change `LOG(WARNING)` to `VLOG(1)` when no GPUs are detected (#497)
dagardner-nv Sep 11, 2024
48d17a1
Pass a `mrc.Subscription` object to sources rather than a `mrc.Subscr…
dagardner-nv Sep 17, 2024
cef7f0b
Update to RAPIDS 24.10 (#494)
cwharris Oct 4, 2024
7629d78
Merge remote-tracking branch 'upstream/branch-24.10' into mdd_combini…
mdemoret-nv Oct 6, 2024
61fa294
Fixing merge
mdemoret-nv Oct 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions cpp/mrc/include/mrc/channel/status.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#pragma once

#include <ostream>

namespace mrc::channel {

enum class Status
Expand All @@ -29,4 +31,25 @@ enum class Status
error
};

static inline std::ostream& operator<<(std::ostream& os, const Status& s)
{
switch (s)
{
case Status::success:
return os << "success";
case Status::empty:
return os << "empty";
case Status::full:
return os << "full";
case Status::closed:
return os << "closed";
case Status::timeout:
return os << "timeout";
case Status::error:
return os << "error";
default:
throw std::logic_error("Unsupported channel::Status enum. Was a new value added recently?");
}
}

} // namespace mrc::channel
30 changes: 16 additions & 14 deletions cpp/mrc/include/mrc/core/utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

#include "mrc/utils/macros.hpp"

#include <glog/logging.h>

#include <algorithm>
#include <exception>
#include <map>
Expand Down Expand Up @@ -54,47 +56,47 @@ std::set<KeyT> extract_keys(const std::map<KeyT, ValT>& stdmap)
}

// RAII will execute a function when destroyed.
template <typename FunctionT>
class Unwinder
{
public:
explicit Unwinder(std::function<void()> unwind_fn) : m_unwind_fn(std::move(unwind_fn)) {}

~Unwinder()
{
if (!!m_function)
if (!!m_unwind_fn)
{
try
{
(*m_function)();
m_unwind_fn();
} catch (...)
{
LOG(ERROR) << "Fatal error during unwinder function";
std::terminate();
}
}
}

explicit Unwinder(FunctionT* function_arg) : m_function(function_arg) {}

void detach()
{
m_function = nullptr;
m_unwind_fn = nullptr;
}

Unwinder() = delete;
Unwinder(const Unwinder&) = delete;
Unwinder& operator=(const Unwinder&) = delete;

static Unwinder create(std::function<void()> unwind_fn)
{
return Unwinder(std::move(unwind_fn));
}

private:
FunctionT* m_function;
std::function<void()> m_unwind_fn;
};

#define MRC_UNWIND(var_name, function) MRC_UNWIND_EXPLICIT(uw_func_##var_name, var_name, function)

#define MRC_UNWIND_AUTO(function) \
MRC_UNWIND_EXPLICIT(MRC_UNIQUE_VAR_NAME(uw_func_), MRC_UNIQUE_VAR_NAME(un_obj_), function)
#define MRC_UNWIND(unwinder_name, function) mrc::Unwinder unwinder_name(function);

#define MRC_UNWIND_EXPLICIT(function_name, unwinder_name, function) \
auto function_name = (function); \
mrc::Unwinder<decltype(function_name)> unwinder_name(std::addressof(function_name))
#define MRC_UNWIND_AUTO(function) MRC_UNWIND(MRC_UNIQUE_VAR_NAME(__un_obj_), function)

template <typename T>
std::pair<std::set<T>, std::set<T>> set_compare(const std::set<T>& cur_set, const std::set<T>& new_set)
Expand Down
19 changes: 19 additions & 0 deletions cpp/mrc/include/mrc/edge/edge_channel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "mrc/edge/edge_readable.hpp"
#include "mrc/edge/edge_writable.hpp"
#include "mrc/edge/forward.hpp"
#include "mrc/utils/macros.hpp"

#include <memory>

Expand Down Expand Up @@ -89,6 +90,24 @@ class EdgeChannel
{
CHECK(m_channel) << "Cannot create an EdgeChannel from an empty pointer";
}

EdgeChannel(EdgeChannel&& other) : m_channel(std::move(other.m_channel)) {}

EdgeChannel& operator=(EdgeChannel&& other)
{
if (this == &other)
{
return *this;
}

m_channel = std::move(other.m_channel);

return *this;
}

// This should not be copyable because it requires passing in a unique_ptr
DELETE_COPYABILITY(EdgeChannel);

virtual ~EdgeChannel() = default;

[[nodiscard]] std::shared_ptr<EdgeChannelReader<T>> get_reader() const
Expand Down
97 changes: 7 additions & 90 deletions cpp/mrc/include/mrc/node/operators/combine_latest.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "mrc/channel/status.hpp"
#include "mrc/node/sink_properties.hpp"
#include "mrc/node/source_properties.hpp"
#include "mrc/utils/tuple_utils.hpp"
#include "mrc/utils/type_utils.hpp"

#include <boost/fiber/mutex.hpp>
Expand All @@ -31,91 +32,6 @@

namespace mrc::node {

// template <typename... TypesT>
// class ParameterPackIndexer
// {
// public:
// ParameterPackIndexer(TypesT... ts) : ParameterPackIndexer(std::make_index_sequence<sizeof...(TypesT)>{}, ts...)
// {}

// std::tuple<std::tuple<TypesT, std::size_t>...> tup;

// private:
// template <std::size_t... Is>
// ParameterPackIndexer(std::index_sequence<Is...> const& /*unused*/, TypesT... ts) : tup{std::make_tuple(ts,
// Is)...}
// {}
// };

// template <typename TargetT, typename ListHeadT, typename... ListTailsT>
// constexpr size_t getTypeIndexInTemplateList()
// {
// if constexpr (std::is_same<TargetT, ListHeadT>::value)
// {
// return 0;
// }
// else
// {
// return 1 + getTypeIndexInTemplateList<TargetT, ListTailsT...>();
// }
// }

namespace detail {
struct Surely
{
template <class... T>
auto operator()(const T&... t) const -> decltype(std::make_tuple(t.value()...))
{
return std::make_tuple(t.value()...);
}
};
} // namespace detail

// template <class... T>
// inline auto surely(const std::tuple<T...>& tpl) -> decltype(rxcpp::util::apply(tpl, detail::surely()))
// {
// return rxcpp::util::apply(tpl, detail::surely());
// }

template <class... T>
inline auto surely2(const std::tuple<T...>& tpl)
{
return std::apply([](auto... args) {
return std::make_tuple(args.value()...);
});
}

// template <typename... TypesT>
// static auto surely2(const std::tuple<TypesT...>& tpl, std::index_sequence<Is...>)
// {
// return std::make_tuple(std::make_shared<Upstream<Is>>(*self)...);
// }

// template <size_t i, typename T>
// struct IndexTypePair
// {
// static constexpr size_t index{i};
// using Type = T;
// };

// template <typename... T>
// struct make_index_type_tuple_helper
// {
// template <typename V>
// struct idx;

// template <size_t... Indices>
// struct idx<std::index_sequence<Indices...>>
// {
// using tuple_type = std::tuple<IndexTypePair<Indices, T>...>;
// };

// using tuple_type = typename idx<std::make_index_sequence<sizeof...(T)>>::tuple_type;
// };

// template <typename... T>
// using make_index_type_tuple = typename make_index_type_tuple_helper<T...>::tuple_type;

template <typename... TypesT>
class CombineLatest : public WritableAcceptor<std::tuple<TypesT...>>
{
Expand All @@ -128,9 +44,7 @@ class CombineLatest : public WritableAcceptor<std::tuple<TypesT...>>
public:
CombineLatest() :
m_upstream_holders(build_ingress(const_cast<CombineLatest*>(this), std::index_sequence_for<TypesT...>{}))
{
// auto a = build_ingress(const_cast<CombineLatest*>(this), std::index_sequence_for<TypesT...>{});
}
{}

virtual ~CombineLatest() = default;

Expand Down Expand Up @@ -193,9 +107,9 @@ class CombineLatest : public WritableAcceptor<std::tuple<TypesT...>>
// Check if we should push the new value
if (m_values_set == sizeof...(TypesT))
{
// std::tuple<TypesT...> new_val = surely2(m_state);
std::tuple<TypesT...> new_val = utils::tuple_surely(m_state);

// status = this->get_writable_edge()->await_write(std::move(new_val));
status = this->get_writable_edge()->await_write(std::move(new_val));
}

return status;
Expand All @@ -209,6 +123,9 @@ class CombineLatest : public WritableAcceptor<std::tuple<TypesT...>>

if (m_completions == sizeof...(TypesT))
{
// Clear the held tuple to remove any dangling values
m_state = std::tuple<std::optional<TypesT>...>();

WritableAcceptor<std::tuple<TypesT...>>::release_edge_connection();
}
}
Expand Down
Loading