Skip to content

Commit

Permalink
Have the EventsExecutor use more common code (#2570)
Browse files Browse the repository at this point in the history
* move notify waitable setup to its own function
* move mutex lock to retrieve_entity utility
* use entities_need_rebuild_ atomic bool in events-executors
* remove duplicated set_on_ready_callback for notify_waitable
* use mutex from base class rather than a new recursive mutex
* use current_collection_ member in events-executor
* delay adding notify waitable to collection
* postpone clearing the current collection
* commonize notify waitable and collection
* commonize add/remove node/cbg methods
* fix linter errors

---------

Signed-off-by: Alberto Soragna <[email protected]>
  • Loading branch information
alsora authored Jul 22, 2024
1 parent bdf1f8f commit 54b8f9c
Show file tree
Hide file tree
Showing 9 changed files with 90 additions and 244 deletions.
5 changes: 3 additions & 2 deletions rclcpp/include/rclcpp/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -541,8 +541,9 @@ class Executor
*
* \param[in] notify if true will execute a trigger that will wake up a waiting executor
*/
void
trigger_entity_recollect(bool notify);
RCLCPP_PUBLIC
virtual void
handle_updated_entities(bool notify);

/// Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins.
std::atomic_bool spinning;
Expand Down
11 changes: 11 additions & 0 deletions rclcpp/include/rclcpp/executors/executor_notify_waitable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,14 @@ class ExecutorNotifyWaitable : public rclcpp::Waitable
void
clear_on_ready_callback() override;

/// Set a new callback to be called whenever this waitable is executed.
/**
* \param[in] on_execute_callback The new callback
*/
RCLCPP_PUBLIC
void
set_execute_callback(std::function<void(void)> on_execute_callback);

/// Remove a guard condition from being waited on.
/**
* \param[in] weak_guard_condition The guard condition to remove.
Expand All @@ -142,7 +150,10 @@ class ExecutorNotifyWaitable : public rclcpp::Waitable
/// Callback to run when waitable executes
std::function<void(void)> execute_callback_;

/// Mutex to procetect the guard conditions
std::mutex guard_condition_mutex_;
/// Mutex to protect the execute callback
std::mutex execute_mutex_;

std::function<void(size_t)> on_ready_callback_;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ namespace executors
*/
class EventsExecutor : public rclcpp::Executor
{
friend class EventsExecutorEntitiesCollector;

public:
RCLCPP_SMART_PTR_DEFINITIONS(EventsExecutor)

Expand All @@ -72,7 +70,7 @@ class EventsExecutor : public rclcpp::Executor
* \param[in] options Options used to configure the executor.
*/
RCLCPP_PUBLIC
explicit EventsExecutor(
EventsExecutor(
rclcpp::experimental::executors::EventsQueue::UniquePtr events_queue = std::make_unique<
rclcpp::experimental::executors::SimpleEventsQueue>(),
bool execute_timers_separate_thread = false,
Expand Down Expand Up @@ -128,87 +126,6 @@ class EventsExecutor : public rclcpp::Executor
void
spin_all(std::chrono::nanoseconds max_duration) override;

/// Add a node to the executor.
/**
* \sa rclcpp::Executor::add_node
*/
RCLCPP_PUBLIC
void
add_node(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
bool notify = true) override;

/// Convenience function which takes Node and forwards NodeBaseInterface.
/**
* \sa rclcpp::EventsExecutor::add_node
*/
RCLCPP_PUBLIC
void
add_node(std::shared_ptr<rclcpp::Node> node_ptr, bool notify = true) override;

/// Remove a node from the executor.
/**
* \sa rclcpp::Executor::remove_node
*/
RCLCPP_PUBLIC
void
remove_node(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
bool notify = true) override;

/// Convenience function which takes Node and forwards NodeBaseInterface.
/**
* \sa rclcpp::Executor::remove_node
*/
RCLCPP_PUBLIC
void
remove_node(std::shared_ptr<rclcpp::Node> node_ptr, bool notify = true) override;

/// Add a callback group to an executor.
/**
* \sa rclcpp::Executor::add_callback_group
*/
RCLCPP_PUBLIC
void
add_callback_group(
rclcpp::CallbackGroup::SharedPtr group_ptr,
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
bool notify = true) override;

/// Remove callback group from the executor
/**
* \sa rclcpp::Executor::remove_callback_group
*/
RCLCPP_PUBLIC
void
remove_callback_group(
rclcpp::CallbackGroup::SharedPtr group_ptr,
bool notify = true) override;

/// Get callback groups that belong to executor.
/**
* \sa rclcpp::Executor::get_all_callback_groups()
*/
RCLCPP_PUBLIC
std::vector<rclcpp::CallbackGroup::WeakPtr>
get_all_callback_groups() override;

/// Get callback groups that belong to executor.
/**
* \sa rclcpp::Executor::get_manually_added_callback_groups()
*/
RCLCPP_PUBLIC
std::vector<rclcpp::CallbackGroup::WeakPtr>
get_manually_added_callback_groups() override;

/// Get callback groups that belong to executor.
/**
* \sa rclcpp::Executor::get_automatically_added_callback_groups_from_nodes()
*/
RCLCPP_PUBLIC
std::vector<rclcpp::CallbackGroup::WeakPtr>
get_automatically_added_callback_groups_from_nodes() override;

protected:
/// Internal implementation of spin_once
RCLCPP_PUBLIC
Expand All @@ -220,16 +137,21 @@ class EventsExecutor : public rclcpp::Executor
void
spin_some_impl(std::chrono::nanoseconds max_duration, bool exhaustive);

/// Collect entities from callback groups and refresh the current collection with them
RCLCPP_PUBLIC
void
handle_updated_entities(bool notify) override;

private:
RCLCPP_DISABLE_COPY(EventsExecutor)

/// Execute a provided executor event if its associated entities are available
void
execute_event(const ExecutorEvent & event);

/// Collect entities from callback groups and refresh the current collection with them
/// Rebuilds the executor's notify waitable, as we can't use the one built in the base class
void
refresh_current_collection_from_callback_groups();
setup_notify_waitable();

/// Refresh the current collection using the provided new_collection
void
Expand All @@ -253,6 +175,11 @@ class EventsExecutor : public rclcpp::Executor
typename CollectionType::EntitySharedPtr
retrieve_entity(typename CollectionType::Key entity_id, CollectionType & collection)
{
// Note: we lock the mutex because we assume that you are trying to get an element from the
// current collection... If there will be a use-case to retrieve elements also from other
// collections, we can move the mutex back to the calling codes.
std::lock_guard<std::mutex> guard(mutex_);

// Check if the entity_id is in the collection
auto it = collection.find(entity_id);
if (it == collection.end()) {
Expand All @@ -273,16 +200,6 @@ class EventsExecutor : public rclcpp::Executor
/// Queue where entities can push events
rclcpp::experimental::executors::EventsQueue::UniquePtr events_queue_;

std::shared_ptr<rclcpp::executors::ExecutorEntitiesCollector> entities_collector_;
std::shared_ptr<rclcpp::executors::ExecutorNotifyWaitable> notify_waitable_;

/// Mutex to protect the current_entities_collection_
std::recursive_mutex collection_mutex_;
std::shared_ptr<rclcpp::executors::ExecutorEntitiesCollection> current_entities_collection_;

/// Flag used to reduce the number of unnecessary waitable events
std::atomic<bool> notify_waitable_event_pushed_ {false};

/// Timers manager used to track and/or execute associated timers
std::shared_ptr<rclcpp::experimental::TimersManager> timers_manager_;
};
Expand Down
18 changes: 9 additions & 9 deletions rclcpp/src/rclcpp/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ Executor::~Executor()
}

void
Executor::trigger_entity_recollect(bool notify)
Executor::handle_updated_entities(bool notify)
{
this->entities_need_rebuild_.store(true);

Expand Down Expand Up @@ -174,11 +174,11 @@ Executor::add_callback_group(
this->collector_.add_callback_group(group_ptr);

try {
this->trigger_entity_recollect(notify);
this->handle_updated_entities(notify);
} catch (const rclcpp::exceptions::RCLError & ex) {
throw std::runtime_error(
std::string(
"Failed to trigger guard condition on callback group add: ") + ex.what());
"Failed to handle entities update on callback group add: ") + ex.what());
}
}

Expand All @@ -188,11 +188,11 @@ Executor::add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_pt
this->collector_.add_node(node_ptr);

try {
this->trigger_entity_recollect(notify);
this->handle_updated_entities(notify);
} catch (const rclcpp::exceptions::RCLError & ex) {
throw std::runtime_error(
std::string(
"Failed to trigger guard condition on node add: ") + ex.what());
"Failed to handle entities update on node add: ") + ex.what());
}
}

Expand All @@ -204,11 +204,11 @@ Executor::remove_callback_group(
this->collector_.remove_callback_group(group_ptr);

try {
this->trigger_entity_recollect(notify);
this->handle_updated_entities(notify);
} catch (const rclcpp::exceptions::RCLError & ex) {
throw std::runtime_error(
std::string(
"Failed to trigger guard condition on callback group remove: ") + ex.what());
"Failed to handle entities update on callback group remove: ") + ex.what());
}
}

Expand All @@ -224,11 +224,11 @@ Executor::remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node
this->collector_.remove_node(node_ptr);

try {
this->trigger_entity_recollect(notify);
this->handle_updated_entities(notify);
} catch (const rclcpp::exceptions::RCLError & ex) {
throw std::runtime_error(
std::string(
"Failed to trigger guard condition on node remove: ") + ex.what());
"Failed to handle entities update on node remove: ") + ex.what());
}
}

Expand Down
14 changes: 10 additions & 4 deletions rclcpp/src/rclcpp/executors/executor_notify_waitable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <iostream>

#include "rclcpp/exceptions.hpp"
#include "rclcpp/executors/executor_notify_waitable.hpp"

Expand Down Expand Up @@ -91,9 +89,9 @@ ExecutorNotifyWaitable::is_ready(const rcl_wait_set_t & wait_set)
}

void
ExecutorNotifyWaitable::execute(const std::shared_ptr<void> & data)
ExecutorNotifyWaitable::execute(const std::shared_ptr<void> & /*data*/)
{
(void) data;
std::lock_guard<std::mutex> lock(execute_mutex_);
this->execute_callback_();
}

Expand Down Expand Up @@ -149,6 +147,14 @@ ExecutorNotifyWaitable::clear_on_ready_callback()
}
}

RCLCPP_PUBLIC
void
ExecutorNotifyWaitable::set_execute_callback(std::function<void(void)> on_execute_callback)
{
std::lock_guard<std::mutex> lock(execute_mutex_);
execute_callback_ = on_execute_callback;
}

void
ExecutorNotifyWaitable::add_guard_condition(rclcpp::GuardCondition::WeakPtr weak_guard_condition)
{
Expand Down
Loading

0 comments on commit 54b8f9c

Please sign in to comment.