Skip to content

Commit

Permalink
Update jazzy latency topic (#214)
Browse files Browse the repository at this point in the history
* fix asset_topic_latency action (#210)

* fix action

* add support for retrigger

* add text case

* fix typo

* add param type_check

* fix

* docs

* fix
  • Loading branch information
Nikhil-Singhal-06 authored Oct 17, 2024
1 parent 3bb5e93 commit 93ec1dc
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,32 +18,34 @@
from collections import deque
import py_trees # pylint: disable=import-error
from rclpy.node import Node
import importlib
import time
from scenario_execution_ros.actions.conversions import get_comparison_operator, get_qos_preset_profile
from scenario_execution.actions.base_action import BaseAction, ActionError
from rosidl_runtime_py.utilities import get_message


class AssertTopicLatency(BaseAction):

def __init__(self, topic_name: str, topic_type: str, latency: float, comparison_operator: bool, rolling_average_count: int, wait_for_first_message: bool):
def __init__(self, topic_name: str, topic_type: str, wait_for_first_message: bool):
super().__init__()
self.topic_name = topic_name
self.topic_type = topic_type
self.latency = latency
self.comparison_operator_feedback = comparison_operator[0]
self.comparison_operator = get_comparison_operator(comparison_operator)
self.rolling_average_count = rolling_average_count
self.rolling_average_count_queue = deque(maxlen=rolling_average_count)
self.latency = None
self.comparison_operator_feedback = None
self.comparison_operator = None
self.rolling_average_count = None
self.rolling_average_count_queue = None
self.wait_for_first_message = wait_for_first_message
self.initial_wait_for_first_message = wait_for_first_message
self.first_message_received = False
self.node = None
self.subscription = None
self.last_receive_time = 0
self.msg_count = 0
self.average_latency = 0.
self.timer = 0
self.timer = time.time()
self.is_topic = False
self.retrigger = None

def setup(self, **kwargs):
try:
Expand All @@ -59,13 +61,24 @@ def setup(self, **kwargs):
elif not success and not self.wait_for_first_message:
raise ActionError("Topic type must be specified. Please provide a valid topic type.", action=self)

def execute(self):
if self.timer != 0:
raise ActionError("Action does not yet support to get retriggered", action=self)
def execute(self, latency: float, comparison_operator: bool, rolling_average_count: int):
self.latency = latency
self.comparison_operator_feedback = comparison_operator[0]
self.comparison_operator = get_comparison_operator(comparison_operator)
self.rolling_average_count = rolling_average_count
self.rolling_average_count_queue = deque(maxlen=rolling_average_count)
self.timer = time.time()
self.last_receive_time = 0
self.msg_count = 0
self.average_latency = 0.
self.is_topic = False
self.retrigger = True

def update(self) -> py_trees.common.Status:
result = py_trees.common.Status.FAILURE
if self.retrigger:
self.wait_for_first_message = self.initial_wait_for_first_message
self.retrigger = False
if not self.is_topic:
self.check_topic()
self.logger.info(f"Waiting for the topic '{self.topic_name}' to become available")
Expand All @@ -85,7 +98,10 @@ def update(self) -> py_trees.common.Status:
self.feedback_message = f"No message received on the topic '{self.topic_name}'" # pylint: disable= attribute-defined-outside-init
result = py_trees.common.Status.RUNNING
elif self.msg_count > 1:
if self.comparison_operator(self.average_latency, self.latency):
if self.comparison_operator(self.latency, time.time() - self.last_receive_time):
self.feedback_message = f"Failed to receive message within the expected latency threshold ({self.latency} seconds)" # pylint: disable= attribute-defined-outside-init
result = py_trees.common.Status.FAILURE
elif self.comparison_operator(self.average_latency, self.latency):
result = py_trees.common.Status.RUNNING
self.feedback_message = f'Latency within range: expected {self.comparison_operator_feedback} {self.latency} s, actual {self.average_latency} s' # pylint: disable= attribute-defined-outside-init
else:
Expand All @@ -96,45 +112,48 @@ def update(self) -> py_trees.common.Status:
return result

def check_topic(self):
if self.wait_for_first_message:
available_topics = self.node.get_topic_names_and_types()
for name, topic_type in available_topics:
if name == self.topic_name:
topic_type = topic_type[0].replace('/', '.')
if self.topic_type:
if self.topic_type == topic_type:
self.call_subscriber()
self.is_topic = True
return True
else:
return False # Invalid topic or type speficied.
else:
self.topic_type = topic_type
self.call_subscriber()
self.is_topic = True # 'topic_type' is not specified, the process will continue with the found one
return True
# Topic not available wait....
return True
else:
result = True
if not self.wait_for_first_message:
if not self.topic_type:
return False # Topic type must be specified. (wait_for_first_message == False)
result = False # Topic type must be specified. (wait_for_first_message == False)
else:
self.call_subscriber()
self.is_topic = True # wait_for_first_message' is set to false, the process will proceed with the specified topic and type
return True
else:
available_topics = self.node.get_topic_names_and_types()
topic_found = False
for name, topic_types in available_topics:
if name == self.topic_name:
topic_found = True
if not topic_types:
result = False
break
current_topic_type = topic_types[0].replace('/', '.')
if not self.topic_type:
self.topic_type = current_topic_type # 'topic_type' is not specified, the process will continue with the found one
self.call_subscriber()
self.is_topic = True
elif self.topic_type != current_topic_type:
result = False
else:
self.call_subscriber()
self.is_topic = True
break
if not topic_found:
pass # Topic not available wait....
return result

def call_subscriber(self):
datatype_in_list = self.topic_type.split(".")
topic_type = getattr(
importlib.import_module(".".join(datatype_in_list[:-1])),
datatype_in_list[-1]
)

topic_type = self.topic_type.replace('.', '/')
message_class = get_message(topic_type)
if message_class is None:
raise ValueError(f"Message type '{topic_type}' could not be found.")
self.subscription = self.node.create_subscription(
msg_type=topic_type,
msg_type=message_class,
topic=self.topic_name,
callback=self._callback,
qos_profile=get_qos_preset_profile(['sensor_data']))
qos_profile=get_qos_preset_profile(['sensor_data'])
)

def _callback(self, msg):
self.first_message_received = True
Expand Down
25 changes: 25 additions & 0 deletions scenario_execution_ros/test/test_assert_topic_latency.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ def tearDown(self):
# Case 13: Test continues running if message arrives within the latency time.
# Case 14: Test fails if the topic provided doesn't show up in the latency time.

# 5. Retrigger
# Case 15: Test succeeds with timeout as action fails everytime (as recorded latency is greater than actual latency) and repeat gets triggered because of failure_is_success modifier.


def test_case_1(self):
scenario_content = """
Expand Down Expand Up @@ -339,3 +342,25 @@ def test_case_14(self):
"""
self.execute(scenario_content)
self.assertFalse(self.scenario_execution_ros.process_results())

def test_case_15(self):
scenario_content = """
import osc.ros
import osc.helpers
scenario test_assert_topic_latency:
do parallel:
serial:
repeat()
assert_topic_latency(
topic_name: '/bla',
latency: 0.5s,
topic_type: 'std_msgs.msg.String') with:
failure_is_success()
time_out: serial:
wait elapsed(10s)
emit end
"""
self.execute(scenario_content)
self.assertTrue(self.scenario_execution_ros.process_results())

0 comments on commit 93ec1dc

Please sign in to comment.