From 6a2240609ddfa07455518a7db96855e0026e8ba0 Mon Sep 17 00:00:00 2001 From: iory Date: Tue, 23 Aug 2022 21:52:02 +0900 Subject: [PATCH 1/2] [jsk_topic_tools] Add filtered relay --- jsk_topic_tools/scripts/filtered_relay.py | 91 +++++++++++++++++++++++ 1 file changed, 91 insertions(+) create mode 100755 jsk_topic_tools/scripts/filtered_relay.py diff --git a/jsk_topic_tools/scripts/filtered_relay.py b/jsk_topic_tools/scripts/filtered_relay.py new file mode 100755 index 000000000..f884ace2a --- /dev/null +++ b/jsk_topic_tools/scripts/filtered_relay.py @@ -0,0 +1,91 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from importlib import import_module + +import roslib.message +import rospy + + +def expr_eval(expr, modules): + def eval_fn(topic, m, t): + return eval(expr, modules, {'m': m}) + return eval_fn + + +class FilteredRelay(object): + + def __init__(self): + import_list = rospy.get_param('~import', []) + self.modules = {} + for module in import_list: + try: + mod = import_module(module) + except ImportError: + rospy.logerr('Failed to import module: %s' % module) + else: + self.modules[module] = mod + + self.filter_expression = rospy.get_param('~filter', None) + if self.filter_expression is not None: + self.filter_expression = expr_eval( + self.filter_expression, self.modules) + + self.output_type = rospy.get_param('~output_type') + self.output_class = roslib.message.get_message_class(self.output_type) + + self.transform_expression = rospy.get_param('~transform', 'm') + self.pub_transform = rospy.Publisher( + '~output', + self.output_class, queue_size=1) + + self.topic_name = rospy.resolve_name('~input') + self.sub = rospy.Subscriber( + self.topic_name, + rospy.AnyMsg, + callback=lambda msg, tn=self.topic_name: self.callback(tn, msg), + queue_size=1) + + def publish(self, m): + if self.output_type == 'std_msgs/Empty': + self.pub_transform.publish() + return + if self.transform_expression is not None: + try: + res = eval(self.transform_expression, self.modules, {'m': m}) + except NameError as e: + rospy.logerr("Expression using variables other than 'm': %s" + % e.message) + except UnboundLocalError as e: + rospy.logerr('Wrong expression:%s' % e.message) + except Exception as e: + rospy.logerr(str(e)) + else: + if not isinstance(res, (list, tuple)): + res = [res] + self.pub_transform.publish(*res) + else: + self.pub_transform.publish(m) + + def callback(self, topic_name, msg): + if isinstance(msg, rospy.msg.AnyMsg): + package, msg_type = msg._connection_header['type'].split('/') + ros_pkg = package + '.msg' + msg_class = getattr(import_module(ros_pkg), msg_type) + self.sub.unregister() + deserialized_sub = rospy.Subscriber( + topic_name, msg_class, + lambda msg, tn=topic_name: self.callback(tn, msg)) + self.sub = deserialized_sub + msg = msg_class().deserialize(msg._buff) + if self.filter_expression is not None: + if self.filter_expression(topic_name, msg, rospy.Time.now()): + self.publish(msg) + else: + self.publish(msg) + + +if __name__ == '__main__': + rospy.init_node('filtered_relay') + node = FilteredRelay() + rospy.spin() From 932b980dd7a00230a7b2dd4af3d5ef5d5a99a612 Mon Sep 17 00:00:00 2001 From: iory Date: Wed, 24 Aug 2022 16:45:38 +0900 Subject: [PATCH 2/2] [jsk_topic_tools/filtered_relay] Add test and document --- doc/jsk_topic_tools/scripts/filtered_relay.md | 65 +++++++++++++++++++ jsk_topic_tools/CMakeLists.txt | 1 + .../sample/sample_filtered_relay.launch | 57 ++++++++++++++++ jsk_topic_tools/test/test_filtered_relay.test | 19 ++++++ 4 files changed, 142 insertions(+) create mode 100644 doc/jsk_topic_tools/scripts/filtered_relay.md create mode 100644 jsk_topic_tools/sample/sample_filtered_relay.launch create mode 100644 jsk_topic_tools/test/test_filtered_relay.test diff --git a/doc/jsk_topic_tools/scripts/filtered_relay.md b/doc/jsk_topic_tools/scripts/filtered_relay.md new file mode 100644 index 000000000..efcd433aa --- /dev/null +++ b/doc/jsk_topic_tools/scripts/filtered_relay.md @@ -0,0 +1,65 @@ +# filtered_relay.py + +## Description + +`rostopic echo` has a filter function, but it cannot publish a topic. +`topic_tools/transform` can format a topic and publish it, but it does not have the ability to filter. +Transforming while filtering a topic is simple and powerful. +This node provides a filter and publish (i.e. relay) functions. + +## Subscribing Topic + +* `~input` (`rospy/AnyMsg`) + + Input message. + +## Publishing Topic + +* `~output` (`~output_type`) + + Output topic. You can specify `~output_type` param to publish topic. + +## Parameters + +* `~output_type` (`String`, required) + + Message type of output_topic like `std_msgs/Float32`. This is the input for the [get_message_class](http://docs.ros.org/en/diamondback/api/roslib/html/python/roslib.message-module.html#get_message_class) function. + +* `~filter` (`String`, default: `None`) + + Condition of messages that match a specified Python expression. + + The Python expression can access any of the Python builtins plus: ``topic`` (the topic of the message), ``m`` (the message) and ``t`` (time of message). + + For example, ``~input`` topic is ``std_msgs/String`` and if you want to check whether a sentence is a ``hello``, you can do the following. + +```bash +filter: m.data == 'hello' +``` + + Note that, use escape sequence when using the following symbols ``<(<)``, ``>(>)``, ``&(&)``, ``'(')`` and ``"(")`` in launch file. + +* `~transform` (`String`, default: `m`) + + Python expression that transform the input messages, which are given in the variable m. The default expression is `m`, which results in forwarding input (which can be a topic field) into output_topic. + +* `~import` (`List[String]`, default: `[]`) + + List of Python modules to import and use in the expression. + +## Usage + +```bash +$ rosrun jsk_topic_tools filtered_relay.py ~input:=/right_endeffector/wrench \ + ~output:=/right_endeffector/force_norm \ + _filter:='numpy.linalg.norm([m.wrench.force.x, m.wrench.force.y, m.wrench.force.z]) > 10.0' \ + _output_type:=geometry_msgs/WrenchStamped _import:="[geometry_msgs, numpy]" +``` + +## Example + +The following example subscribe to `/right_endeffector/wrench` and only those with a force norm greater than 10 are published as `/right_endeffector/force_norm`. + +```bash +roslaunch jsk_topic_tools sample_filtered_relay.launch +``` diff --git a/jsk_topic_tools/CMakeLists.txt b/jsk_topic_tools/CMakeLists.txt index 9234c0732..921b95133 100644 --- a/jsk_topic_tools/CMakeLists.txt +++ b/jsk_topic_tools/CMakeLists.txt @@ -166,6 +166,7 @@ if (CATKIN_ENABLE_TESTING) endmacro() find_package(rostest REQUIRED) jsk_topic_tools_add_rostest(test/test_boolean_node.test) + jsk_topic_tools_add_rostest(test/test_filtered_relay.test) jsk_topic_tools_add_rostest(test/test_topic_buffer.test) jsk_topic_tools_add_rostest(test/test_topic_buffer_close_wait.test) # topic_buffer tests are tend to fail in GA because of CPU high load. diff --git a/jsk_topic_tools/sample/sample_filtered_relay.launch b/jsk_topic_tools/sample/sample_filtered_relay.launch new file mode 100644 index 000000000..2320ddc52 --- /dev/null +++ b/jsk_topic_tools/sample/sample_filtered_relay.launch @@ -0,0 +1,57 @@ + + + + + + + + + + + + + + + output_type: std_msgs/Float32 + import: [std_msgs, geometry_msgs, numpy] + filter: "numpy.linalg.norm([m.wrench.force.x, m.wrench.force.y, m.wrench.force.z]) > 10.0" + transform: "numpy.linalg.norm([m.wrench.force.x, m.wrench.force.y, m.wrench.force.z])" + + + + + + + + + + + output_type: geometry_msgs/WrenchStamped + import: [geometry_msgs, numpy] + filter: "numpy.linalg.norm([m.wrench.force.x, m.wrench.force.y, m.wrench.force.z]) > 10.0" + + + + + + + + output_type: geometry_msgs/WrenchStamped + import: [geometry_msgs] + + + + diff --git a/jsk_topic_tools/test/test_filtered_relay.test b/jsk_topic_tools/test/test_filtered_relay.test new file mode 100644 index 000000000..447d491e9 --- /dev/null +++ b/jsk_topic_tools/test/test_filtered_relay.test @@ -0,0 +1,19 @@ + + + + + + + + topic_0: /right_endeffector/force_norm + timeout_0: 30 + topic_1: /right_endeffector/filtered_wrench + timeout_1: 30 + topic_2: /right_endeffector/relayed_wrench + timeout_2: 30 + + + +