diff --git a/nodelet/CMakeLists.txt b/nodelet/CMakeLists.txt index e2ead2c..918f762 100644 --- a/nodelet/CMakeLists.txt +++ b/nodelet/CMakeLists.txt @@ -1,4 +1,4 @@ -cmake_minimum_required(VERSION 3.0.2) +cmake_minimum_required(VERSION 3.1) project(nodelet) ## Find catkin dependencies @@ -74,3 +74,4 @@ install(TARGETS nodeletlib install(TARGETS nodelet DESTINATION ${CATKIN_PACKAGE_BIN_DESTINATION} ) + diff --git a/nodelet/include/nodelet/nodelet.h b/nodelet/include/nodelet/nodelet.h index ae94892..cbd88a7 100644 --- a/nodelet/include/nodelet/nodelet.h +++ b/nodelet/include/nodelet/nodelet.h @@ -234,6 +234,20 @@ class NODELETLIB_DECL Nodelet ros::CallbackQueueInterface* st_queue = NULL, ros::CallbackQueueInterface* mt_queue = NULL); + /**\brief Whether it is OK to continue working with this nodelet. This function starts returning true right + * before onInit() is called and starts returning false when the nodelet is requested to stop via + * requestStop(). + * \return Status of the nodelet. + * \note This does not take into account `ros::ok()`. This should be queried separately. + */ + bool ok() const; + + /**\brief Request this nodelet to stop. This function returns immediately. Afterwards, ok() returns false. + * This function is automatically called by the nodelet manager when the nodelet is requested to unload + * or is killed. + */ + void requestStop(); + virtual ~Nodelet(); }; diff --git a/nodelet/src/loader.cpp b/nodelet/src/loader.cpp index 6446cbf..14a0ba6 100644 --- a/nodelet/src/loader.cpp +++ b/nodelet/src/loader.cpp @@ -85,6 +85,28 @@ class LoaderROS bond_spinner_.start(); } + ~LoaderROS() + { + // Clean up the bond map with the mutex locked; if we left that just on the destructor, + // the bond map would be destroyed unlocked allowing async callbacks to access it + // meanwhile, which could lead to memory corruption and segfaults. + + boost::mutex::scoped_lock lock(lock_); + for (auto nameAndBond : bond_map_) + { + const auto& name = nameAndBond.first; + auto bond = nameAndBond.second; + + // Set the broken callback to just unload the nodelet and do not touch the bond map. + // It is being cleared explicitly in the next lines. We do not want to erase elements + // from it during its own .clear() call. The NoLock variant can be called because we + // already have locked the mutex above. + bond->setBrokenCallback(boost::bind(&LoaderROS::unloadNoLock, this, name)); + } + + bond_map_.clear(); + } + private: bool serviceLoad(nodelet::NodeletLoad::Request &req, nodelet::NodeletLoad::Response &res) @@ -95,7 +117,7 @@ class LoaderROS M_string remappings; if (req.remap_source_args.size() != req.remap_target_args.size()) { - ROS_ERROR("Bad remapppings provided, target and source of different length"); + ROS_ERROR("Bad remappings provided, target and source of different length"); } else { @@ -114,7 +136,7 @@ class LoaderROS bond::Bond* bond = new bond::Bond(nh_.getNamespace() + "/bond", req.bond_id); bond_map_.insert(req.name, bond); bond->setCallbackQueue(&bond_callback_queue_); - bond->setBrokenCallback(boost::bind(&LoaderROS::unload, this, req.name)); + bond->setBrokenCallback(boost::bind(&LoaderROS::unloadAndEraseBond, this, req.name)); bond->start(); } return res.success; @@ -123,25 +145,35 @@ class LoaderROS bool serviceUnload(nodelet::NodeletUnload::Request &req, nodelet::NodeletUnload::Response &res) { - res.success = unload(req.name); + res.success = unloadAndEraseBond(req.name); return res.success; } - bool unload(const std::string& name) + bool unloadNoLock(const std::string& name) { - boost::mutex::scoped_lock lock(lock_); - bool success = parent_->unload(name); + if (!success) { ROS_ERROR("Failed to find nodelet with name '%s' to unload.", name.c_str()); return success; } + return success; + } + + bool unloadAndEraseBond(const std::string& name) + { + boost::mutex::scoped_lock lock(lock_); + + bool success = unloadNoLock(name); + if (!success) + return success; + // break the bond, if there is one M_stringToBond::iterator it = bond_map_.find(name); if (it != bond_map_.end()) { - // disable callback for broken bond, as we are breaking it intentially now + // disable callback for broken bond, as we are breaking it intentionally now it->second->setBrokenCallback(boost::function()); // erase (and break) bond bond_map_.erase(name); @@ -194,6 +226,7 @@ struct ManagedNodelet : boost::noncopyable ~ManagedNodelet() { + nodelet->requestStop(); callback_manager->removeQueue(st_queue); callback_manager->removeQueue(mt_queue); } @@ -201,8 +234,6 @@ struct ManagedNodelet : boost::noncopyable struct Loader::Impl { - boost::shared_ptr services_; - boost::function (const std::string& lookup_name)> create_instance_; boost::function refresh_classes_; boost::shared_ptr callback_manager_; // Must outlive nodelets_ @@ -210,6 +241,8 @@ struct Loader::Impl typedef boost::ptr_map M_stringToNodelet; M_stringToNodelet nodelets_; /// services_; // Must be destroyed before nodelets_ + Impl() { // Under normal circumstances, we use pluginlib to load any registered nodelet diff --git a/nodelet/src/nodelet_class.cpp b/nodelet/src/nodelet_class.cpp index e53d0d4..c6c8d25 100644 --- a/nodelet/src/nodelet_class.cpp +++ b/nodelet/src/nodelet_class.cpp @@ -45,6 +45,11 @@ Nodelet::Nodelet () Nodelet::~Nodelet() { + // Calling requestStop() would not make sense here. Any downstream class that + // would like to use ok() to check for validity of itself would have already + // been destroyed by its own destructor before getting here. + // requestStop() has to be called by an external manager before actually + // starting the destruction sequence. } ros::CallbackQueueInterface& Nodelet::getSTCallbackQueue () const @@ -134,4 +139,14 @@ void Nodelet::init(const std::string& name, const M_string& remapping_args, cons this->onInit (); } +bool Nodelet::ok() const +{ + return inited_; +} + +void Nodelet::requestStop() +{ + inited_ = false; +} + } // namespace nodelet diff --git a/test_nodelet/CMakeLists.txt b/test_nodelet/CMakeLists.txt index 87ddd0f..c6f33e5 100644 --- a/test_nodelet/CMakeLists.txt +++ b/test_nodelet/CMakeLists.txt @@ -7,13 +7,17 @@ catkin_package() if(CATKIN_ENABLE_TESTING) find_package(Boost REQUIRED thread) - find_package(catkin REQUIRED nodelet pluginlib rostest) + find_package(catkin REQUIRED bond nodelet pluginlib rostest) include_directories(SYSTEM ${BOOST_INCLUDE_DIRS} ${catkin_INCLUDE_DIRS} ) + # find pthread and provide it as Threads::Threads imported target + set(THREADS_PREFER_PTHREAD_FLAG ON) + find_package(Threads REQUIRED) + #common commands for building c++ executables and libraries - add_library(${PROJECT_NAME} src/plus.cpp src/console_tests.cpp src/failing_nodelet.cpp src/nodehandles.cpp) + add_library(${PROJECT_NAME} src/plus.cpp src/console_tests.cpp src/failing_nodelet.cpp src/nodehandles.cpp src/request_stop.cpp) target_link_libraries(${PROJECT_NAME} ${BOOST_LIBRARIES} ${catkin_LIBRARIES} ) @@ -58,4 +62,11 @@ if(CATKIN_ENABLE_TESTING) add_executable(create_instance_cb_error src/create_instance_cb_error.cpp) target_link_libraries(create_instance_cb_error ${catkin_LIBRARIES}) + + add_rostest_gtest(test_nodelet_request_stop test/test_nodelet_request_stop.test test/test_nodelet_request_stop.cpp) + target_link_libraries(test_nodelet_request_stop ${BOOST_LIBRARIES} ${catkin_LIBRARIES} Threads::Threads) + + add_rostest(test/test_request_stop_unload.test DEPENDENCIES ${PROJECT_NAME}) + add_rostest(test/test_request_stop_unload_standalone.test DEPENDENCIES ${PROJECT_NAME}) + add_rostest(test/test_request_stop_unload_kill_manager.test DEPENDENCIES ${PROJECT_NAME}) endif() diff --git a/test_nodelet/package.xml b/test_nodelet/package.xml index 0f2c51b..765eddf 100644 --- a/test_nodelet/package.xml +++ b/test_nodelet/package.xml @@ -21,7 +21,10 @@ rostest std_msgs + bond rosbash + rosnode + rospy diff --git a/test_nodelet/src/request_stop.cpp b/test_nodelet/src/request_stop.cpp new file mode 100644 index 0000000..3040c7f --- /dev/null +++ b/test_nodelet/src/request_stop.cpp @@ -0,0 +1,96 @@ +/* + * Copyright (c) 2023, Open Source Robotics Foundation + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of the Willow Garage, Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include +#include +#include +#include +#include + +namespace test_nodelet +{ + +class RequestStop : public nodelet::Nodelet +{ +private: + void onInit() override + { + ros::NodeHandle pnh = getPrivateNodeHandle(); + std::string managerName {}; + managerName = pnh.param("manager_name", managerName); + waitForBond_ = pnh.param("wait_for_bond", waitForBond_); + + ros::NodeHandle nh = getNodeHandle(); + pub_ = nh.advertise("ready", 1, true); + timer_ = nh.createSteadyTimer(ros::WallDuration(0.1), &RequestStop::callback, this, false); + if (waitForBond_) + sub_ = nh.subscribe(ros::names::append(managerName, "bond"), 4, &RequestStop::bond, this); + } + + void bond(const bond::Status&) + { + numBondMsgs_ += 1; + } + + void callback(const ros::SteadyTimerEvent&) + { + // If running non-standalone, wait for the bonds to establish; otherwise, killing + // this nodelet too early would mean waiting 10 seconds for bond connect timeout, instead + // of waiting just 4 seconds for bond heartbeat timeout. Each end of the bond sends + // one message per second, so we wait for 2 messages to be sent from each end. + + if (waitForBond_ && numBondMsgs_ < 4) + return; + + timer_.stop(); + + std_msgs::String ready; + ready.data = this->getPrivateNodeHandle().getNamespace(); + NODELET_DEBUG("Ready"); + pub_.publish(ready); + + // Run an infinite loop which can only be ended by an async call to requestShutdown(). + while (ros::ok() && this->ok()) + ros::WallDuration(0.1).sleep(); + + ready.data = "interrupted"; + NODELET_INFO("Interrupted"); + std::cout << "Interrupted" << std::endl; + pub_.publish(ready); + } + + bool waitForBond_ {false}; + size_t numBondMsgs_ {0u}; + ros::Publisher pub_; + ros::Subscriber sub_; + ros::SteadyTimer timer_; +}; + +PLUGINLIB_EXPORT_CLASS(test_nodelet::RequestStop, nodelet::Nodelet) +} diff --git a/test_nodelet/test/test_nodelet_request_stop.cpp b/test_nodelet/test/test_nodelet_request_stop.cpp new file mode 100644 index 0000000..c673876 --- /dev/null +++ b/test_nodelet/test/test_nodelet_request_stop.cpp @@ -0,0 +1,515 @@ +/* + * Copyright (c) 2023, Open Source Robotics Foundation + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of the Willow Garage, Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "gtest/gtest.h" + +#include + +#include +#include +#include + +/** + * \brief A mock of the nodelet::Nodelet class with empty onInit(). + */ +struct TestNodelet : public nodelet::Nodelet +{ + /** + * \brief If wasOk is not null, set the value of this->ok() to it while the object is destroyed. + */ + ~TestNodelet() override + { + if (this->wasOk != nullptr) + *this->wasOk = this->ok(); + } + + /** + * \brief Empty. + */ + void onInit() override + { + } + + bool sleepCb(const ros::Duration& duration, volatile bool& startedExecuting) + { + volatile bool sleepRunning {true}; + volatile bool sleepOk {false}; + + timer = this->getNodeHandle().createWallTimer(ros::WallDuration(0.001), + [=,&sleepOk,&sleepRunning,&startedExecuting](const ros::WallTimerEvent&) { + startedExecuting = true; + sleepOk = this->sleep(duration); + sleepRunning = false; + }, true); + + // no access to this-> past this point; the nodelet might already be destroyed + + while (sleepRunning) + ros::WallDuration(1e-3).sleep(); + + return sleepOk; + } + + /** + * \brief Sleep for the given amount of time, checking this->ok() for possible interruptions. + * \param[in] duration Duration of the sleep. + * \return The value of this->ok() after the sleep. + */ + bool sleep(const ros::Duration& duration) + { + const auto endTime = ros::Time::now() + duration; + + while (ros::Time::now() < endTime && this->ok()) + ros::WallDuration(1e-3).sleep(); + + if (ros::Time::now() < endTime) + NODELET_INFO_STREAM("Ended " << (endTime - ros::Time::now()) << " s before deadline"); + + return this->ok(); + } + + //! \brief The value of this->ok() during destruction. + volatile bool* wasOk {nullptr}; + + ros::WallTimer timer; +}; + +/** + * \brief Test `ok()` and `requestStop()` functions. + */ +TEST(StatefulNodelet, ok) // NOLINT +{ + bool wasOk {true}; + { + TestNodelet nodelet; + nodelet.wasOk = &wasOk; + nodelet.init("test", {}, {}); + EXPECT_TRUE(nodelet.ok()); + + nodelet.requestStop(); + EXPECT_FALSE(nodelet.ok()); + } + EXPECT_FALSE(wasOk); +} + +/** + * \brief Test interrupting `sleep()` function with system time. + */ +TEST(StatefulNodelet, sleepInterruptSystime) // NOLINT +{ + ros::Time::init(); // use system time + ASSERT_TRUE(ros::Time::useSystemTime()); + + // Create a thread that sleeps for 1 second. After 0.1 s, check that it has started, then interrupt it, and after + // another 0.1 s, check that it has finished (if not, `finished` should still be false after the 0.2 s of waiting). + + TestNodelet nodelet; + nodelet.init("test", {}, {}); + bool started = false; + bool finished = false; + std::thread t([&]() + { + started = true; + EXPECT_FALSE(nodelet.sleep(ros::Duration(1))); + finished = true; + }); + + for (size_t i = 0; i < 1000 && !started; ++i) + ros::WallDuration(0.001).sleep(); + EXPECT_TRUE(started); + EXPECT_FALSE(finished); + + nodelet.requestStop(); + ros::WallDuration(0.1).sleep(); + + EXPECT_TRUE(finished); + + t.join(); +} + +/** + * \brief Test interrupting `sleep()` function with simulation time. + */ +TEST(StatefulNodelet, sleepInterruptSimtime) // NOLINT +{ + ros::Time::setNow({10, 0}); // use simulation time + ASSERT_TRUE(ros::Time::isSimTime()); + + // Create a thread that sleeps for 1 second. After 0.1 s, check that it has started, then interrupt it, and after + // another 0.1 s, check that it has finished (if not, `finished` should still be false after the 0.2 s of waiting). + + TestNodelet nodelet; + nodelet.init("test", {}, {}); + bool started = false; + bool finished = false; + std::thread t([&]() + { + started = true; + EXPECT_FALSE(nodelet.sleep({1, 0})); + finished = true; + }); + + for (size_t i = 0; i < 1000 && !started; ++i) + ros::WallDuration(0.001).sleep(); + EXPECT_TRUE(started); + EXPECT_FALSE(finished); + + ros::Time::setNow(ros::Time(10.1)); + + nodelet.requestStop(); + ros::WallDuration(0.1).sleep(); + + EXPECT_TRUE(finished); + + t.detach(); // detach the thread so that it doesn't block if the sleep did not end +} + +/** + * \brief Test finishing `sleep()` function with system time. + */ +TEST(StatefulNodelet, sleepFinishSystime) // NOLINT +{ + ros::Time::init(); // use system time + ASSERT_TRUE(ros::Time::isSystemTime()); + + // Create a thread that sleeps for 1 second. After 0.1 s, check that it has started but not finished. After another + // 1 s, check that it has finished. + + TestNodelet nodelet; + nodelet.init("test", {}, {}); + bool started = false; + bool finished = false; + std::thread t([&]() + { + started = true; + EXPECT_TRUE(nodelet.sleep(ros::Duration(1))); + finished = true; + }); + + for (size_t i = 0; i < 1000 && !started; ++i) + ros::WallDuration(0.001).sleep(); + EXPECT_TRUE(started); + EXPECT_FALSE(finished); + + ros::WallDuration(1.1).sleep(); + + EXPECT_TRUE(finished); + + t.detach(); // detach the thread so that it doesn't block if the sleep did not end +} + +/** + * \brief Test finishing `sleep()` function with simulation time. + */ +TEST(StatefulNodelet, sleepFinishSimtime) // NOLINT +{ + ros::Time::setNow({10, 0}); // use simulation time + ASSERT_TRUE(ros::Time::isSimTime()); + + // Create a thread that sleeps for 1 second. After 0.1 s, check that it has started but not finished. After another + // 1.1 s, check that it has finished. + + TestNodelet nodelet; + nodelet.init("test", {}, {}); + bool started = false; + bool finished = false; + std::thread t([&]() + { + started = true; + EXPECT_TRUE(nodelet.sleep({1, 0})); + finished = true; + }); + + for (size_t i = 0; i < 1000 && !started; ++i) + ros::WallDuration(0.001).sleep(); + EXPECT_TRUE(started); + EXPECT_FALSE(finished); + + ros::Time::setNow(ros::Time(11.1)); + ros::WallDuration(0.1).sleep(); + + EXPECT_TRUE(finished); + + t.detach(); // detach the thread so that it doesn't block if the sleep did not end +} + +/** + * \brief Test `ok()` on an uninitialized and unmanaged nodelet. + */ +TEST(StatefulNodelet, uninitialized) // NOLINT +{ + bool wasOk {true}; + { + TestNodelet nodelet; + nodelet.wasOk = &wasOk; + nodelet.requestStop(); + ros::WallDuration(0.1).sleep(); + } + EXPECT_FALSE(wasOk); +} + +/** + * \brief Test that `sleep()` can be interrupted by nodelet unloading in system time. + */ +TEST(StatefulNodelet, sleepInterruptByUnloadSystime) // NOLINT +{ + ros::Time::init(); + ASSERT_TRUE(ros::Time::isSystemTime()); + + // Can't be shared_ptr - it would lead to a segfault as Loader::unload() destroys the callback queues that the nodelet + // needs for its own destruction (to unregister its subscription helper). If the nodelet would outlive Loader in a + // shared_ptr, its destruction would therefore fail. + TestNodelet* nodelet; + // Loader is the standard class used for managing nodelets in a nodelet manager. We pass it a custom create_instance + // function that disables the normal pluginlib lookup algorithm and allows us to create the instances manually. + // We create it in a separate detached thread so that its destructor (which waits for all worker threads to finish) + // does not block the other tests (in case this test would never finish). One of the worker threads is spinning the + // nodelet's callback queue, so it might happen that it gets stuck. + nodelet::Loader* loaderPointer {nullptr}; + bool stop = false; + std::thread loaderThread([&]() + { + nodelet::Loader l([&](const std::string&) + { + return boost::shared_ptr(nodelet = new TestNodelet); + }); + loaderPointer = &l; + // Keep loaderThread running until the end of this test. + while (!stop && ros::ok()) + { + ros::spinOnce(); + ros::WallDuration(0, 1000000).sleep(); + } + }); + // Wait until loaderThread creates the loader + while (loaderPointer == nullptr) + ros::WallDuration(0, 1000).sleep(); + // Detach the loaderThread so that it doesn't cause a segfault when exiting the program (as we can't join() it). + loaderThread.detach(); + nodelet::Loader& loader = *loaderPointer; + + // Load a nodelet using the Loader. This will trigger the custom create_instance function and set `nodelet`. + EXPECT_TRUE(loader.load("my_nodelet", "MyNodelet", {}, {})); + ASSERT_NE(nullptr, nodelet); + volatile bool wasOk {true}; + nodelet->wasOk = &wasOk; + EXPECT_TRUE(nodelet->ok()); + + // First, try if the sleep will finish if waiting enough time + + volatile bool started = false; + volatile bool startedExecuting = false; + volatile bool finished = false; + std::thread t([&]() + { + started = true; + EXPECT_TRUE(nodelet->sleepCb(ros::Duration(1), startedExecuting)); + EXPECT_TRUE(nodelet->ok()); + finished = true; + }); + + for (size_t i = 0; i < 1000 && !started; ++i) + ros::WallDuration(0.001).sleep(); + EXPECT_TRUE(started); + EXPECT_FALSE(finished); + + for (size_t i = 0; i < 1000 && !startedExecuting; ++i) + ros::WallDuration(0.001).sleep(); + EXPECT_TRUE(startedExecuting); + + ros::WallDuration(1.01).sleep(); + EXPECT_TRUE(finished); + + // Now, test that the sleep can be interrupted. + + started = false; + startedExecuting = false; + finished = false; + std::thread t2([&]() + { + started = true; + EXPECT_FALSE(nodelet->sleepCb({1, 0}, startedExecuting)); + finished = true; + ros::WallDuration(0.1).sleep(); + // cannot test EXPECT_TRUE(nodelet->ok()) here as nodelet is no longer valid + EXPECT_FALSE(wasOk); + }); + + for (size_t i = 0; i < 1000 && !started; ++i) + ros::WallDuration(0.001).sleep(); + EXPECT_TRUE(started); + EXPECT_FALSE(finished); + + for (size_t i = 0; i < 1000 && !startedExecuting; ++i) + ros::WallDuration(0.001).sleep(); + EXPECT_TRUE(startedExecuting); + + EXPECT_TRUE(wasOk); + loader.unload("my_nodelet"); + ros::WallDuration(0.1).sleep(); // wait for the unloading to clean callback queues + EXPECT_FALSE(wasOk); + + ros::WallDuration(0.1).sleep(); + EXPECT_TRUE(finished); + + t.detach(); // detach the threads so that they don't block if the sleep did not end + t2.detach(); + stop = true; // stop the loaderThread +} + +/** + * \brief Test that `sleep()` can be interrupted by nodelet unloading in sim time. + */ +TEST(StatefulNodelet, sleepInterruptByUnloadSimtime) // NOLINT +{ + // The test runs with paused sim time to make the sleep() call infinitely waiting. + ros::Time::setNow({10, 0}); + ASSERT_TRUE(ros::Time::isSimTime()); + ASSERT_EQ(10, ros::Time::now().sec); + + // Can't be shared_ptr - it would lead to a segfault as Loader::unload() destroys the callback queues that the nodelet + // needs for its own destruction (to unregister its subscription helper). If the nodelet would outlive Loader in a + // shared_ptr, its destruction would therefore fail. + TestNodelet* nodelet; + // Loader is the standard class used for managing nodelets in a nodelet manager. We pass it a custom create_instance + // function that disables the normal pluginlib lookup algorithm and allows us to create the instances manually. + // We create it in a separate detached thread so that its destructor (which waits for all worker threads to finish) + // does not block the other tests (in case this test would never finish). One of the worker threads is spinning the + // nodelet's callback queue, so it might happen that it gets stuck. + nodelet::Loader* loaderPointer {nullptr}; + bool stop = false; + std::thread loaderThread([&]() + { + nodelet::Loader l([&](const std::string&) + { + return boost::shared_ptr(nodelet = new TestNodelet); + }); + loaderPointer = &l; + // Keep loaderThread running until the end of this test. + while (!stop && ros::ok()) + { + ros::spinOnce(); + ros::WallDuration(0, 1000000).sleep(); + } + }); + // Wait until loaderThread creates the loader + while (loaderPointer == nullptr) + ros::WallDuration(0, 1000).sleep(); + // Detach the loaderThread so that it doesn't cause a segfault when exiting the program (as we can't join() it). + loaderThread.detach(); + nodelet::Loader& loader = *loaderPointer; + + // Load a nodelet using the Loader. This will trigger the custom create_instance function and set `nodelet`. + EXPECT_TRUE(loader.load("my_nodelet", "MyNodelet", {}, {})); + EXPECT_NE(nullptr, nodelet); + volatile bool wasOk {true}; + nodelet->wasOk = &wasOk; + EXPECT_TRUE(nodelet->ok()); + + // First, try if the sleep will finish if waiting enough time + + bool started = false; + bool startedExecuting = false; + bool finished = false; + std::thread t([&]() + { + EXPECT_TRUE(nodelet->ok()); + started = true; + EXPECT_TRUE(nodelet->sleepCb({1, 0}, startedExecuting)); + finished = true; + EXPECT_TRUE(nodelet->ok()); + }); + + for (size_t i = 0; i < 1000 && !started; ++i) + ros::WallDuration(0.001).sleep(); + EXPECT_TRUE(started); + EXPECT_FALSE(finished); + + for (size_t i = 0; i < 1000 && !startedExecuting; ++i) + ros::WallDuration(0.001).sleep(); + EXPECT_TRUE(startedExecuting); + + EXPECT_TRUE(nodelet->ok()); + + ros::Time::setNow(ros::Time(11.1)); + ros::WallDuration(0.1).sleep(); + EXPECT_TRUE(finished); + + EXPECT_TRUE(nodelet->ok()); + + // Now, test that the sleep can be interrupted. + + ros::Time::setNow(ros::Time(10)); + + started = false; + startedExecuting = false; + finished = false; + std::thread t2([&]() + { + EXPECT_TRUE(nodelet->ok()); + started = true; + EXPECT_FALSE(nodelet->sleepCb({1, 0}, startedExecuting)); + finished = true; + // cannot test EXPECT_TRUE(nodelet->ok()) here as nodelet is no longer valid + ros::WallDuration(0.1).sleep(); + EXPECT_FALSE(wasOk); + }); + + ros::Time::setNow(ros::Time(10.1)); // not enough for the sleep to finish + + for (size_t i = 0; i < 1000 && !started; ++i) + ros::WallDuration(0.001).sleep(); + EXPECT_TRUE(started); + EXPECT_FALSE(finished); + + for (size_t i = 0; i < 1000 && !startedExecuting; ++i) + ros::WallDuration(0.001).sleep(); + EXPECT_TRUE(startedExecuting); + + EXPECT_TRUE(nodelet->ok()); + EXPECT_TRUE(wasOk); + loader.unload("my_nodelet"); + ros::WallDuration(0.1).sleep(); // wait for the unloading to clean callback queues + EXPECT_FALSE(wasOk); + + ros::WallDuration(0.1).sleep(); + EXPECT_TRUE(finished); + + t.detach(); // detach the threads so that they don't block if the sleep did not end + t2.detach(); + stop = true; // stop the loaderThread +} + +int main(int argc, char **argv) +{ + testing::InitGoogleTest(&argc, argv); + ros::init(argc, argv, "test_nodelet_request_stop"); + ros::start(); + return RUN_ALL_TESTS(); +} diff --git a/test_nodelet/test/test_nodelet_request_stop.test b/test_nodelet/test/test_nodelet_request_stop.test new file mode 100644 index 0000000..38eb5b3 --- /dev/null +++ b/test_nodelet/test/test_nodelet_request_stop.test @@ -0,0 +1,3 @@ + + + diff --git a/test_nodelet/test/test_request_stop.py b/test_nodelet/test/test_request_stop.py new file mode 100755 index 0000000..6dee776 --- /dev/null +++ b/test_nodelet/test/test_request_stop.py @@ -0,0 +1,122 @@ +#!/usr/bin/env python + +# Copyright (c) 2023, Open Source Robotics Foundation +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# * Neither the name of the Willow Garage, Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + + +import rosnode +import rospy +import rostest +import sys +from time import sleep +import unittest + +from bond.msg import Status +from nodelet.srv import NodeletUnload, NodeletUnloadRequest +from std_msgs.msg import String + + +class TestRequestStop(unittest.TestCase): + def __init__(self, methodName='runTest'): + super(TestRequestStop, self).__init__(methodName) + self._last_ready_msg = None + self._num_received = 0 + self._stop_method = rospy.get_param("~stop_method", "unload") + self._manager_name = rospy.get_param("~manager_name", "") + self._wait_for_bond = rospy.get_param("~wait_for_bond", False) + if self._wait_for_bond: + self._num_bond_msgs = 0 + self._bond_sub = rospy.Subscriber( + rospy.names.ns_join(self._manager_name, "bond"), Status, self.bond_cb, queue_size=4) + + def bond_cb(self, msg): + self._num_bond_msgs += 1 + + def ready_cb(self, msg): + self._last_ready_msg = msg + self._num_received += 1 + + def test_unload(self): + # The nodelet runs an infinite loop that can only be broken by calling `requestStop()` on the nodelet. + # Test this by unloading the nodelet, which should automatically call `requestStop()`. + # The nodelet publishes one message on /ready topic before entering the loop and one message after it is broken. + # However, if the nodelet is killed instead of unloaded, the `ros::shutdown()` call that happens during the kill + # will make it impossible to send the second message. So the kill tests just tests whether the node has stopped + # responding. + + if self._stop_method == "unload": + unload_srv = rospy.names.ns_join(self._manager_name, "unload_nodelet") + unload = rospy.ServiceProxy(unload_srv, NodeletUnload) + unload.wait_for_service() + + self._last_ready_msg = None + self._num_received = 0 + + sub = rospy.Subscriber("ready", String, self.ready_cb, queue_size=2) + + while not rospy.is_shutdown() and self._num_received < 1: + rospy.logdebug("Waiting for ready message") + sleep(0.1) + + self.assertGreaterEqual(self._num_received, 1) + node_name = self._last_ready_msg.data + + # Give the bonds time to start. If we let one bond end to send exactly one heartbeat, + # we run into https://github.com/ros/bond_core/pull/93 which puts the other end into + # infinite wait. We want to make sure at least two heartbeats are sent. Each end sends + # one message per second to the bond topic, so we expect to see at least 4 messages. + while self._wait_for_bond and not rospy.is_shutdown() and self._num_bond_msgs < 4: + rospy.loginfo("Waiting for bonds. Have %i messages so far." % (self._num_bond_msgs,)) + sleep(0.1) + + if self._stop_method == "unload": + req = NodeletUnloadRequest() + req.name = node_name + res = unload.call(req) + self.assertTrue(res.success) + + while not rospy.is_shutdown() and self._num_received < 2: + rospy.loginfo("Waiting for interrupted message") + sleep(0.1) + + self.assertEqual(self._num_received, 2) + self.assertEqual(self._last_ready_msg.data, "interrupted") + + elif self._stop_method.startswith("kill"): + name_to_kill = node_name if self._stop_method == "kill" else self._manager_name + success, fail = rosnode.kill_nodes((name_to_kill,)) + self.assertEqual(success, [name_to_kill]) + self.assertEqual(fail, []) + + while rosnode.rosnode_ping(node_name, max_count=1): + sleep(0.1) + self.assertFalse(rosnode.rosnode_ping(node_name, max_count=1)) + + +if __name__ == '__main__': + rospy.init_node('test_request_stop') + rostest.rosrun('test_request_stop', 'test_request_stop', TestRequestStop) diff --git a/test_nodelet/test/test_request_stop_unload.test b/test_nodelet/test/test_request_stop_unload.test new file mode 100644 index 0000000..cbca7d9 --- /dev/null +++ b/test_nodelet/test/test_request_stop_unload.test @@ -0,0 +1,12 @@ + + + + + + + + + + + + \ No newline at end of file diff --git a/test_nodelet/test/test_request_stop_unload_kill_manager.test b/test_nodelet/test/test_request_stop_unload_kill_manager.test new file mode 100644 index 0000000..7693835 --- /dev/null +++ b/test_nodelet/test/test_request_stop_unload_kill_manager.test @@ -0,0 +1,14 @@ + + + + + + + + + + + + + + \ No newline at end of file diff --git a/test_nodelet/test/test_request_stop_unload_standalone.test b/test_nodelet/test/test_request_stop_unload_standalone.test new file mode 100644 index 0000000..096d5ca --- /dev/null +++ b/test_nodelet/test/test_request_stop_unload_standalone.test @@ -0,0 +1,7 @@ + + + + + + + diff --git a/test_nodelet/test_nodelet.xml b/test_nodelet/test_nodelet.xml index 4f7a0e1..43876ee 100644 --- a/test_nodelet/test_nodelet.xml +++ b/test_nodelet/test_nodelet.xml @@ -19,4 +19,9 @@ A node that creates public and private nodehandles. + + + A node that indefinitely waits until requested to stop. + +