Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix events-executor warm-up bug and add unit-tests #2591

Merged
merged 8 commits into from
Sep 11, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ EventsExecutor::setup_notify_waitable()
// ---> we need to wake up the executor so that it can terminate
// - a node or callback group guard condition is triggered:
// ---> the entities collection is changed, we need to update callbacks
entities_need_rebuild_ = false;
this->handle_updated_entities(false);
});

Expand Down Expand Up @@ -168,6 +167,14 @@ EventsExecutor::spin_some_impl(std::chrono::nanoseconds max_duration, bool exhau
return false;
};

// If this spin is not exhaustive (e.g. spin_some), we need to explicitly check
// if entities need to be rebuilt here rather than letting the notify waitable event do it.
// A non-exhaustive spin would not check for work a second time, thus delaying the execution
// of some entities to the next invocation of spin.
if (!exhaustive) {
this->handle_updated_entities(false);
}

// Get the number of events and timers ready at start
const size_t ready_events_at_start = events_queue_->size();
size_t executed_events = 0;
Expand Down Expand Up @@ -314,6 +321,17 @@ void
EventsExecutor::handle_updated_entities(bool notify)
{
(void)notify;

// Do not rebuild if we don't need to.
// A rebuild event could be generated, but then
// this function could end up being called from somewhere else
// before that event gets processed, for example if
// a node or callback group is manually added to the executor.
const bool notify_waitable_triggered = entities_need_rebuild_.exchange(false);
if (!notify_waitable_triggered && !this->collector_.has_pending()) {
return;
}

// Build the new collection
this->collector_.update_collections();
auto callback_groups = this->collector_.get_all_callback_groups();
Expand Down
10 changes: 10 additions & 0 deletions rclcpp/test/rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,16 @@ if(TARGET test_executors_busy_waiting)
target_link_libraries(test_executors_busy_waiting ${PROJECT_NAME})
endif()

ament_add_gtest(
test_executors_warmup
executors/test_executors_warmup.cpp
executors/test_waitable.cpp
APPEND_LIBRARY_DIRS "${append_library_dirs}"
TIMEOUT 180)
if(TARGET test_executors_warmup)
target_link_libraries(test_executors_warmup ${PROJECT_NAME} ${test_msgs_TARGETS})
endif()

ament_add_gtest(test_static_single_threaded_executor executors/test_static_single_threaded_executor.cpp
APPEND_LIBRARY_DIRS "${append_library_dirs}")
ament_add_test_label(test_static_single_threaded_executor mimick)
Expand Down
244 changes: 244 additions & 0 deletions rclcpp/test/rclcpp/executors/test_executors_warmup.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
// Copyright 2024 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/**
* This test checks all implementations of rclcpp::executor to check they pass they basic API
* tests. Anything specific to any executor in particular should go in a separate test file.
*/

#include <gtest/gtest.h>

#include <chrono>
#include <cstddef>
#include <memory>
#include <string>
#include <type_traits>
#include <utility>

#include "rclcpp/rclcpp.hpp"

#include "test_msgs/msg/empty.hpp"

#include "./executor_types.hpp"

using namespace std::chrono_literals;

template<typename T>
class TestExecutorsWarmup : public ::testing::Test
{
public:
void SetUp()
{
rclcpp::init(0, nullptr);
}

void TearDown()
{
rclcpp::shutdown();
}
};

TYPED_TEST_SUITE(TestExecutorsWarmup, ExecutorTypes, ExecutorTypeNames);

// This test verifies that spin_all is correctly collecting work multiple times
// even when one of the items of work is a notifier waitable event and thus results in
// rebuilding the entities collection.
// When spin_all goes back to collect more work, it should see the ready items from
// the new added entities
TYPED_TEST(TestExecutorsWarmup, spin_all_doesnt_require_warmup)
{
using ExecutorType = TypeParam;
ExecutorType executor;

// Enable intra-process to guarantee deterministic and synchronous delivery of the message / event
auto node_options = rclcpp::NodeOptions().use_intra_process_comms(true);
auto node = std::make_shared<rclcpp::Node>("test_node", node_options);

// Add node to the executor before creating the entities
executor.add_node(node);

// Create entities, this will produce a notifier waitable event, telling the executor to refresh
// the entities collection
auto publisher = node->create_publisher<test_msgs::msg::Empty>("test_topic", rclcpp::QoS(10));
size_t callback_count = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

include <cstddef>

auto callback = [&callback_count](test_msgs::msg::Empty::ConstSharedPtr) {callback_count++;};
auto subscription =
node->create_subscription<test_msgs::msg::Empty>(
"test_topic", rclcpp::QoS(10), std::move(callback));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

include <utility>


ASSERT_EQ(callback_count, 0u);

// Publish a message so that the new entities (i.e. the subscriber) already have work to do
publisher->publish(test_msgs::msg::Empty());

// We need to select a duration that is greater than
// the time taken to refresh the entities collection and rebuild the waitset.
// spin-all is expected to process the notifier waitable event, rebuild the collection,
// and then collect more work, finding the subscription message event.
// This duration has been selected empirically.
executor.spin_all(std::chrono::milliseconds(500));

// Verify that the callback is called as part of the spin above
EXPECT_EQ(callback_count, 1u);
}

// Same test as `spin_all_doesnt_require_warmup`, but uses a callback group
// This test reproduces the bug reported by https://github.com/ros2/rclcpp/issues/2589
TYPED_TEST(TestExecutorsWarmup, spin_all_doesnt_require_warmup_with_cbgroup)
{
using ExecutorType = TypeParam;

// TODO(alsora): Enable when https://github.com/ros2/rclcpp/pull/2595 gets merged
if (
std::is_same<ExecutorType, rclcpp::executors::SingleThreadedExecutor>() ||
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

include <type_traits>

std::is_same<ExecutorType, rclcpp::executors::MultiThreadedExecutor>())
{
GTEST_SKIP();
}

ExecutorType executor;

// Enable intra-process to guarantee deterministic and synchronous delivery of the message / event
auto node_options = rclcpp::NodeOptions().use_intra_process_comms(true);
auto node = std::make_shared<rclcpp::Node>("test_node", node_options);

auto callback_group = node->create_callback_group(
rclcpp::CallbackGroupType::MutuallyExclusive,
false);

// Add callback group to the executor before creating the entities
executor.add_callback_group(callback_group, node->get_node_base_interface());

// Create entities, this will produce a notifier waitable event, telling the executor to refresh
// the entities collection
auto publisher = node->create_publisher<test_msgs::msg::Empty>("test_topic", rclcpp::QoS(10));
size_t callback_count = 0;
auto callback = [&callback_count](test_msgs::msg::Empty::ConstSharedPtr) {callback_count++;};
rclcpp::SubscriptionOptions sub_options;
sub_options.callback_group = callback_group;
auto subscription =
node->create_subscription<test_msgs::msg::Empty>(
"test_topic", rclcpp::QoS(10), std::move(callback), sub_options);

ASSERT_EQ(callback_count, 0u);

// Publish a message so that the new entities (i.e. the subscriber) already have work to do
publisher->publish(test_msgs::msg::Empty());

// We need to select a duration that is greater than
// the time taken to refresh the entities collection and rebuild the waitset.
// spin-all is expected to process the notifier waitable event, rebuild the collection,
// and then collect more work, finding the subscription message event.
// This duration has been selected empirically.
executor.spin_all(std::chrono::milliseconds(500));

// Verify that the callback is called as part of the spin above
EXPECT_EQ(callback_count, 1u);
}

TYPED_TEST(TestExecutorsWarmup, spin_some_doesnt_require_warmup)
{
using ExecutorType = TypeParam;

// TODO(alsora): currently only the events-executor passes this test.
// Enable single-threaded and multi-threaded executors
// when https://github.com/ros2/rclcpp/pull/2595 gets merged
if (
!std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>())
{
GTEST_SKIP();
}

ExecutorType executor;

// Enable intra-process to guarantee deterministic and synchronous delivery of the message / event
auto node_options = rclcpp::NodeOptions().use_intra_process_comms(true);
auto node = std::make_shared<rclcpp::Node>("test_node", node_options);

// Add node to the executor before creating the entities
executor.add_node(node);

// Create entities, this will produce a notifier waitable event, telling the executor to refresh
// the entities collection
auto publisher = node->create_publisher<test_msgs::msg::Empty>("test_topic", rclcpp::QoS(10));
size_t callback_count = 0;
auto callback = [&callback_count](test_msgs::msg::Empty::ConstSharedPtr) {callback_count++;};
auto subscription =
node->create_subscription<test_msgs::msg::Empty>(
"test_topic", rclcpp::QoS(10), std::move(callback));

ASSERT_EQ(callback_count, 0u);

// Publish a message so that the new entities (i.e. the subscriber) already have work to do
publisher->publish(test_msgs::msg::Empty());

// NOTE: intra-process communication is enabled, so the subscription will immediately see
// the new message, no risk of race conditions where spin_some gets called before the
// message has been delivered.
executor.spin_some();

// Verify that the callback is called as part of the spin above
EXPECT_EQ(callback_count, 1u);
}

TYPED_TEST(TestExecutorsWarmup, spin_some_doesnt_require_warmup_with_cbgroup)
{
using ExecutorType = TypeParam;

// TODO(alsora): currently only the events-executor passes this test.
// Enable single-threaded and multi-threaded executors
// when https://github.com/ros2/rclcpp/pull/2595 gets merged
if (
!std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>())
{
GTEST_SKIP();
}

ExecutorType executor;

// Enable intra-process to guarantee deterministic and synchronous delivery of the message / event
auto node_options = rclcpp::NodeOptions().use_intra_process_comms(true);
auto node = std::make_shared<rclcpp::Node>("test_node", node_options);

auto callback_group = node->create_callback_group(
rclcpp::CallbackGroupType::MutuallyExclusive,
false);

// Add callback group to the executor before creating the entities
executor.add_callback_group(callback_group, node->get_node_base_interface());

// Create entities, this will produce a notifier waitable event, telling the executor to refresh
// the entities collection
auto publisher = node->create_publisher<test_msgs::msg::Empty>("test_topic", rclcpp::QoS(10));
size_t callback_count = 0;
auto callback = [&callback_count](test_msgs::msg::Empty::ConstSharedPtr) {callback_count++;};
rclcpp::SubscriptionOptions sub_options;
sub_options.callback_group = callback_group;
auto subscription =
node->create_subscription<test_msgs::msg::Empty>(
"test_topic", rclcpp::QoS(10), std::move(callback), sub_options);

ASSERT_EQ(callback_count, 0u);

// Publish a message so that the new entities (i.e. the subscriber) already have work to do
publisher->publish(test_msgs::msg::Empty());

// NOTE: intra-process communication is enabled, so the subscription will immediately see
// the new message, no risk of race conditions where spin_some gets called before the
// message has been delivered.
executor.spin_some();

// Verify that the callback is called as part of the spin above
EXPECT_EQ(callback_count, 1u);
}