Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ROS1 Bridge doesn't work with Multithreaded Executor on a ROS2 service #314

Open
joelbudu opened this issue May 18, 2021 · 14 comments · May be fixed by #315
Open

ROS1 Bridge doesn't work with Multithreaded Executor on a ROS2 service #314

joelbudu opened this issue May 18, 2021 · 14 comments · May be fixed by #315
Assignees

Comments

@joelbudu
Copy link

joelbudu commented May 18, 2021

Bug report

Required Info:

  • Operating System: Ubuntu 18.04
  • Installation type: Binary
  • Version or commit hash: dashing
  • DDS implementation: Default
  • Client library (if applicable): N/A

Steps to reproduce issue

  1. Create a service with a Multithreaded executor in ROS2
  2. Connect to ROS1 via the ros1_bridge
  3. Call the ROS2 service through the bridge.
    Only a single thread runs on the service

#include <chrono>
#include <functional>
#include <memory>
#include <string>
#include <thread>

#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/string.hpp"
#include "sensor_msgs/msg/laser_scan.hpp"
#include "geometry_msgs/msg/vector3_stamped.hpp"
#include "std_srvs/srv/empty.hpp"

using namespace std::chrono_literals;

/**
 * A small convenience function for converting a thread ID to a string
 **/
std::string string_thread_id()
{
  auto hashed = std::hash<std::thread::id>()(std::this_thread::get_id());
  return std::to_string(hashed);
}



class PublisherNode : public rclcpp::Node
{
public:
  PublisherNode()
  : Node("PublisherNode")
  {
    publisher_ = this->create_publisher<geometry_msgs::msg::Vector3Stamped>("topic", 10);
    auto timer_callback =
      [this]() -> void {
        auto message = geometry_msgs::msg::Vector3Stamped();
        message.header.stamp = this->now();

        this->publisher_->publish(message);
      };
    timer_ = this->create_wall_timer(500ms, timer_callback);
  }

private:
  rclcpp::TimerBase::SharedPtr timer_;
  rclcpp::Publisher<geometry_msgs::msg::Vector3Stamped>::SharedPtr publisher_;
};

class DualThreadedNode : public rclcpp::Node
{
public:
  DualThreadedNode()
  : Node("DualThreadedNode")
  {
    /* These define the callback groups
     * They don't really do much on their own, but they have to exist in order to
     * assign callbacks to them. They're also what the executor looks for when trying to run multiple threads
     */
    callback_group_subscriber1_ = this->create_callback_group(
      rclcpp::CallbackGroupType::MutuallyExclusive);
    callback_group_subscriber2_ = this->create_callback_group(
      rclcpp::CallbackGroupType::MutuallyExclusive);

     auto service = [this](const std::shared_ptr<rmw_request_id_t> request_header,
      const std::shared_ptr<std_srvs::srv::Empty::Request> req,
      const std::shared_ptr<std_srvs::srv::Empty::Response> res) -> void {
      this->test_service(request_header, req, res);
    };

    callback_group4_ = this->create_callback_group(
    rclcpp::callback_group::CallbackGroupType::MutuallyExclusive);
    test_srv_ = this->create_service<std_srvs::srv::Empty>("test_service", service, rmw_qos_profile_default, callback_group4_);

    auto sub1_opt = rclcpp::SubscriptionOptions();
    sub1_opt.callback_group = callback_group_subscriber1_;

    subscription1_ = this->create_subscription<geometry_msgs::msg::Vector3Stamped>(
      "topic",
      rclcpp::QoS(10),
    
      std::bind(
        &DualThreadedNode::subscriber1_cb, 
        this,                             
        std::placeholders::_1),           
                      
      sub1_opt);                 
  }

private:
  

  /**
   * Every time the Publisher publishes something, all subscribers to the topic get poked
   * This function gets called when Subscriber1 is poked (due to the std::bind we used when defining it)
   */
  void subscriber1_cb(const geometry_msgs::msg::Vector3Stamped::SharedPtr msg)
  {
    
    vector_msg_ = msg;
  }

  void test_service(
  const std::shared_ptr<rmw_request_id_t> request_header,
  const std::shared_ptr<std_srvs::srv::Empty::Request> request,
  const std::shared_ptr<std_srvs::srv::Empty::Response> response)
{
    
    RCLCPP_INFO(this->get_logger(), "Received message in callback1");

    RCLCPP_INFO(this->get_logger(), "msg timestamp : %f", rclcpp::Time(vector_msg_->header.stamp ).seconds());

    auto received_time = this->get_clock()->now();

    while (received_time +  rclcpp::Duration(5,0) > rclcpp::Time(vector_msg_->header.stamp) && rclcpp::ok())
    {  
        std::this_thread::sleep_for(std::chrono::milliseconds(50));
    }

    RCLCPP_INFO(this->get_logger(), "Exitted cb after 5 secs");

}

  rclcpp::CallbackGroup::SharedPtr callback_group_subscriber1_;
  rclcpp::CallbackGroup::SharedPtr callback_group_subscriber2_;
  rclcpp::CallbackGroup::SharedPtr callback_group4_;
  rclcpp::Subscription<geometry_msgs::msg::Vector3Stamped>::SharedPtr subscription1_;
  rclcpp::Subscription<std_msgs::msg::String>::SharedPtr subscription2_;

  rclcpp::Service<std_srvs::srv::Empty>::SharedPtr test_srv_;

  geometry_msgs::msg::Vector3Stamped::SharedPtr vector_msg_;
  rclcpp::Time stamp_;

};

int main(int argc, char * argv[])
{
  rclcpp::init(argc, argv);

  // You MUST use the MultiThreadedExecutor to use, well, multiple threads
  rclcpp::executors::MultiThreadedExecutor executor;
  auto pubnode = std::make_shared<PublisherNode>();
  auto subnode = std::make_shared<DualThreadedNode>();  // This contains BOTH subscriber callbacks.
                                                        // They will still run on different threads
                                                        // One Node. Two callbacks. Two Threads
  executor.add_node(pubnode);
  executor.add_node(subnode);
  executor.spin();
  rclcpp::shutdown();
  return 0;
}

# source ros1 workspace
# source ros2 workspace
ros2 ros1_bridge dynamic_bridge --bridge-all-topics;
# source ros1 workspace
rosservice call  /test_service std_srvs/srv/Empty;

Expected behaviour

Multithreaded executor should run multiple nodes in separate threads.

Actual behavior

ROS2 service is run in a single thread

Additional information


Feature request

ros1_bridge should support nodes with the Multithreaded executor

@iuhilnehc-ynos
Copy link

@joelbudu

Create a service with a Multithreaded executor in ROS2
Multithreaded executor should run multiple nodes in separate threads.

Do you expect that a service created by a node running on a multithreaded executor of ros2 can deal with multiple requests from ros1 at the same time? Or two more different services created by a node or nodes? I just want to make sure you make the right code (IIRC, the flag of Callback group, exclusive, will be checked inside if you use the default callback group only in one node) in ROS2.

About ros1_bridge, there are some locations that seem to be updated if we want to support dealing with multiple requests from ros1 at the same time. such as,

  1. use CallbackGroup with Reentrant (It just bridge the data for the service, so I think we can use reentrant here.) for

    bridge.server = ros2_node->create_service<ROS2_T>(name, f);

  2. use MultipleThreadedExecutor for

    rclcpp::executors::SingleThreadedExecutor executor;

BTW: could you provide an example and more detailed steps to reproduce what you expect?

@iuhilnehc-ynos
Copy link

Could you use foxy or rolling? Because it seems the EOL of dashing is on May, 2021.

@joelbudu
Copy link
Author

@iuhilnehc-ynos Thank you for your response. My use case is a scenario where you have a long-running blocking service that depends on other threads updating a member variable before returning the response. It is a single service per node but the node will have other callbacks from topics that need to be processed concurrently.

You have also pointed very correctly to the locations I thought would need updating to support this.

I would update the Issue with more detailed steps to reproduce as well.

With respect to the ros version I'm thinking it may be easy to merge the PR in both branches (including dashing).
If it's not possible I will have to create a fork and have the change in there to be able to work in my current environment

@iuhilnehc-ynos
Copy link

You have also pointed very correctly to the locations

I am sorry, item 1 pointed before is the bridge for ros2 client -> ros1 service. (For your case, it should be

bridge.client = ros2_node->create_client<ROS2_T>(name);
)

@iuhilnehc-ynos
Copy link

@joelbudu

My use case is a scenario where you have a long-running blocking service that depends on other threads updating a member variable before returning the response. It is a single service per node but the node will have other callbacks from topics that need to be processed concurrently.

Thank you for sharing this. At least you didn't use the same callback group (because of creating a service per node) for service callback, so we expect that the service callback can run on separate threads when using MultiThreadedExecutor.

@iuhilnehc-ynos
Copy link

And use ros::AsyncSpinner async_spinner(a_proper_number); (not 1, refer to http://wiki.ros.org/roscpp/Overview/Callbacks%20and%20Spinning#Multi-threaded_Spinning) for

ros::AsyncSpinner async_spinner(1);

@joelbudu
Copy link
Author

You have also pointed very correctly to the locations

I am sorry, item 1 pointed before is the bridge for ros2 client -> ros1 service. (For your case, it should be

bridge.client = ros2_node->create_client<ROS2_T>(name);

)

I might need a bit of clarity on how this works. I thought the Multithreading/Callback setup would be setup on the ROS2 service ?

@iuhilnehc-ynos
Copy link

@joelbudu

In your case, you just cared about the ros2 service callback that can run on separate threads, so we just need to make sure the ros1 spin with multiple threads which can call ros1_service_callback in ros1_bridge to request ros2 service at the same time.

Could you just confirm if updating
https://github.com/ros2/ros1_bridge/blob/dashing/src/dynamic_bridge.cpp#L778
with ros::AsyncSpinner async_spinner(2); is work for you?

@joelbudu
Copy link
Author

Yes @iuhilnehc-ynos I can confirm updating https://github.com/ros2/ros1_bridge/blob/dashing/src/dynamic_bridge.cpp#L778
with ros::AsyncSpinner async_spinner(2); works for me

@iuhilnehc-ynos
Copy link

@joelbudu , Thank you for your confirmation. Before creating a PR for this feature, I'll go deep to check if it introduces some potential problems.

@iuhilnehc-ynos
Copy link

Hi, @joelbudu, I found the example you updated on #314 (comment),
only containing one service (test_srv_) with callback_group4_(MutuallyExclusive) in one node (DualThreadedNode), I am a little confused about what you expected.

** Actual behavior **
ROS2 service is run in a single thread

@iuhilnehc-ynos iuhilnehc-ynos linked a pull request May 25, 2021 that will close this issue
@joelbudu
Copy link
Author

Actually, there is also a geometry_msgs::msg::Vector3Stamped publisher and subscriber running in different callback groups. So there is a shared variable that is being updated by the subscriber and the value of that variable is checked in the blocking while loop of the service

@joelbudu
Copy link
Author

joelbudu commented Jul 5, 2021

@iuhilnehc-ynos I've been following the PR request linked to this issue. I just wanted to confirm if for my scenario only changing the ros::AsyncSpinner async_spinner(2) would still suffice or is there any other change you would advice to change? Thanks

@iuhilnehc-ynos
Copy link

@joelbudu

If you just implemented a demo or a trial, I think the answer is yes, otherwise, I don't recommend using loop wait inside a service callback, because the loop in the service callback will make ros1_bridge occupy one ros::AsyncSpinner thread. If there are many such waiting loops all over other services, even setting 100 for ros::AsyncSpinner is not enough.

  void test_service(
  const std::shared_ptr<rmw_request_id_t> request_header,
  const std::shared_ptr<std_srvs::srv::Empty::Request> request,
  const std::shared_ptr<std_srvs::srv::Empty::Response> response)
{
    
    RCLCPP_INFO(this->get_logger(), "Received message in callback1");

    RCLCPP_INFO(this->get_logger(), "msg timestamp : %f", rclcpp::Time(vector_msg_->header.stamp ).seconds());

    auto received_time = this->get_clock()->now();

    while (received_time +  rclcpp::Duration(5,0) > rclcpp::Time(vector_msg_->header.stamp) && rclcpp::ok())
    // it seems an anti-pattern here
    {  
        std::this_thread::sleep_for(std::chrono::milliseconds(50));
    }

    RCLCPP_INFO(this->get_logger(), "Exitted cb after 5 secs");

}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants