diff --git a/scenario_execution_ros/scenario_execution_ros/actions/assert_topic_latency.py b/scenario_execution_ros/scenario_execution_ros/actions/assert_topic_latency.py index 0670483e..c9c3dc54 100644 --- a/scenario_execution_ros/scenario_execution_ros/actions/assert_topic_latency.py +++ b/scenario_execution_ros/scenario_execution_ros/actions/assert_topic_latency.py @@ -18,10 +18,10 @@ from collections import deque import py_trees # pylint: disable=import-error from rclpy.node import Node -import importlib import time from scenario_execution_ros.actions.conversions import get_comparison_operator, get_qos_preset_profile from scenario_execution.actions.base_action import BaseAction, ActionError +from rosidl_runtime_py.utilities import get_message class AssertTopicLatency(BaseAction): @@ -112,45 +112,48 @@ def update(self) -> py_trees.common.Status: return result def check_topic(self): - if self.wait_for_first_message: - available_topics = self.node.get_topic_names_and_types() - for name, topic_type in available_topics: - if name == self.topic_name: - topic_type = topic_type[0].replace('/', '.') - if self.topic_type: - if self.topic_type == topic_type: - self.call_subscriber() - self.is_topic = True - return True - else: - return False # Invalid topic or type speficied. - else: - self.topic_type = topic_type - self.call_subscriber() - self.is_topic = True # 'topic_type' is not specified, the process will continue with the found one - return True - # Topic not available wait.... - return True - else: + result = True + if not self.wait_for_first_message: if not self.topic_type: - return False # Topic type must be specified. (wait_for_first_message == False) + result = False # Topic type must be specified. (wait_for_first_message == False) else: self.call_subscriber() self.is_topic = True # wait_for_first_message' is set to false, the process will proceed with the specified topic and type - return True + else: + available_topics = self.node.get_topic_names_and_types() + topic_found = False + for name, topic_types in available_topics: + if name == self.topic_name: + topic_found = True + if not topic_types: + result = False + break + current_topic_type = topic_types[0].replace('/', '.') + if not self.topic_type: + self.topic_type = current_topic_type # 'topic_type' is not specified, the process will continue with the found one + self.call_subscriber() + self.is_topic = True + elif self.topic_type != current_topic_type: + result = False + else: + self.call_subscriber() + self.is_topic = True + break + if not topic_found: + pass # Topic not available wait.... + return result def call_subscriber(self): - datatype_in_list = self.topic_type.split(".") - topic_type = getattr( - importlib.import_module(".".join(datatype_in_list[:-1])), - datatype_in_list[-1] - ) - + topic_type = self.topic_type.replace('.', '/') + message_class = get_message(topic_type) + if message_class is None: + raise ValueError(f"Message type '{topic_type}' could not be found.") self.subscription = self.node.create_subscription( - msg_type=topic_type, + msg_type=message_class, topic=self.topic_name, callback=self._callback, - qos_profile=get_qos_preset_profile(['sensor_data'])) + qos_profile=get_qos_preset_profile(['sensor_data']) + ) def _callback(self, msg): self.first_message_received = True