diff --git a/rosrt/CMakeLists.txt b/rosrt/CMakeLists.txt index e09b4ca..91d892a 100644 --- a/rosrt/CMakeLists.txt +++ b/rosrt/CMakeLists.txt @@ -49,3 +49,8 @@ rosbuild_add_executable(test_publisher_no_initialize test/test_publisher_no_init rosbuild_declare_test(test_publisher_no_initialize) target_link_libraries(test_publisher_no_initialize ${PROJECT_NAME}) rosbuild_add_rostest(test/test_publisher_no_initialize.xml) + +rosbuild_add_executable(test_filtered_subscriber EXCLUDE_FROM_ALL test/test_filtered_subscriber.cpp) +target_link_libraries(test_filtered_subscriber ${PROJECT_NAME}) +rosbuild_add_gtest_build_flags(test_filtered_subscriber) +rosbuild_add_rostest(test/test_filtered_subscriber.xml) diff --git a/rosrt/include/rosrt/filtered_subscriber.h b/rosrt/include/rosrt/filtered_subscriber.h new file mode 100644 index 0000000..3fd48c4 --- /dev/null +++ b/rosrt/include/rosrt/filtered_subscriber.h @@ -0,0 +1,238 @@ +/********************************************************************* +* Software License Agreement (BSD License) +* +* Copyright (c) 2010, CLMC Lab, University of Southern California +* All rights reserved. +* +* Redistribution and use in source and binary forms, with or without +* modification, are permitted provided that the following conditions +* are met: +* +* * Redistributions of source code must retain the above copyright +* notice, this list of conditions and the following disclaimer. +* * Redistributions in binary form must reproduce the above +* copyright notice, this list of conditions and the following +* disclaimer in the documentation and/or other materials provided +* with the distribution. +* * Neither the name of the Willow Garage nor the names of its +* contributors may be used to endorse or promote products derived +* from this software without specific prior written permission. +* +* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +* FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE +* COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +* BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +* POSSIBILITY OF SUCH DAMAGE. +*********************************************************************/ + +#ifndef ROSRT_FILTERED_SUBSCRIBER_H +#define ROSRT_FILTERED_SUBSCRIBER_H + +#include + +#include +#include "detail/pool_gc.h" + +#include +#include +#include + +namespace rosrt +{ + +/** + * \brief A lock-free, filtered subscriber. Allows you to receive ROS messages inside a realtime thread. + * + * This subscriber will pass the message through a user-defined "filter", which will convert the message + * into another user-defined type. The filter function will be called outside of realtime. + * + * This subscriber works in a polling manner rather than the usual callback-based mechanism, e.g.: +\verbatim +bool filter(const MsgConstPtr& msg, const FilteredPtr filtered) +{ + // filtered->data = msg->data; +} + +FilteredSubscriber sub(2, nh, "my_topic", filter); +while (true) +{ + FilteredConstPtr filtered_msg = sub.poll(); + if (filtered_msg) + { + // do something with filtered_msg + ... + } +} +\endverbatim + */ +template +class FilteredSubscriber +{ +public: + /** + * \brief Default constructor. You must call initialize() before doing anything else if you use this constructor. + */ + FilteredSubscriber() + : filtered_pool_(0) + { + } + + /** + * \brief Constructor with initialization. Call subscribe() to subscribe to a topic. + * \param message_pool_size The size of the message pool to use. If this pool fills up no more messages + * will be received until some messages are freed. + */ + FilteredSubscriber(uint32_t message_pool_size) + : filtered_pool_(0) + { + initialize(message_pool_size); + } + + /** + * \brief Constructor with initialization and subscription + * \param message_pool_size The size of the message pool to use. If this pool fills up no more messages + * will be received until some messages are freed. + * \param nh The ros::NodeHandle to use to subscribe + * \param topic The topic to subscribe on + * \param filter The filter function to convert the message into the type Filtered + * \param [optional] transport_hints the transport hints to use + */ + FilteredSubscriber(uint32_t message_pool_size, ros::NodeHandle& nh, const std::string& topic, + boost::function& msg, const boost::shared_ptr filtered)> filter, + const ros::TransportHints& transport_hints = ros::TransportHints()) + : filtered_pool_(0) + { + initialize(message_pool_size); + subscribe(nh, topic, filter, transport_hints); + } + + ~FilteredSubscriber() + { + Filtered const* latest = latest_.exchange(0); + if (latest) + { + filtered_pool_->free(latest); + } + + detail::addPoolToGC((void*)filtered_pool_, detail::deletePool, detail::poolIsDeletable); + } + + /** + * \brief Initialize this subscribe. Only use with the default constructor. + * \param message_pool_size The size of the message pool to use. If this pool fills up no more messages + * will be received until some messages are freed. + */ + void initialize(uint32_t message_pool_size) + { + ROS_ASSERT(message_pool_size > 1); + ROS_ASSERT(!filtered_pool_); + filtered_pool_ = new lockfree::ObjectPool(); + filtered_pool_->initialize(message_pool_size, Filtered()); + latest_.store(0); + } + + /** + * \brief Initialize this subscribe. Only use with the default constructor. + * \param message_pool_size The size of the message pool to use. If this pool fills up no more messages + * will be received until some messages are freed. + * \param nh The ros::NodeHandle to use to subscribe + * \param topic The topic to subscribe on + * \param filter The filter function to convert the message into the type Filtered + * \param [optional] transport_hints the transport hints to use + * \return Whether or not we successfully subscribed + */ + bool initialize(uint32_t message_pool_size, ros::NodeHandle& nh, const std::string& topic, + boost::function& msg, const boost::shared_ptr filtered)> filter, + const ros::TransportHints& transport_hints = ros::TransportHints()) + { + initialize(message_pool_size); + return subscribe(nh, topic, filter, transport_hints); + } + + /** + * \brief Subscribe to a topic + * \param nh The ros::NodeHandle to use to subscribe + * \param topic The topic to subscribe on + * \param filter The filter function to convert the message into the type Filtered + * \param [optional] transport_hints the transport hints to use + * \return Whether or not we successfully subscribed + */ + bool subscribe(ros::NodeHandle& nh, const std::string& topic, + boost::function& msg, const boost::shared_ptr filtered)> filter, + const ros::TransportHints& transport_hints = ros::TransportHints()) + { + ros::SubscribeOptions ops; + ops.template init(topic, 1, boost::bind(&FilteredSubscriber::callback, this, _1)); + ops.callback_queue = detail::getSubscriberCallbackQueue(); + sub_ = nh.subscribe(ops); + filter_ = filter; + return (bool)sub_; + } + + /** + * \brief Retrieve the newest message received. + * + * The same message will only be returned once, i.e: +\verbatim + +msg = poll(); // Returns a valid message +msg = poll(); // Returns NULL +\endverbatim + */ + boost::shared_ptr poll() + { + Filtered* latest = latest_.exchange(0); + if (!latest) + { + return boost::shared_ptr(); + } + + boost::shared_ptr ptr = filtered_pool_->makeShared(latest); + if (!ptr) + { + filtered_pool_->free(latest); + return boost::shared_ptr(); + } + + return ptr; + } + +private: + void callback(const boost::shared_ptr& msg) + { + boost::shared_ptr filtered = filtered_pool_->allocateShared(); + if (!filtered) + { + return; + } + if (!filter_(msg, filtered)) + { + ROS_ERROR("FilteredSubscriber: filter function failed."); + return; + } + + Filtered* latest = filtered_pool_->removeShared(filtered); + Filtered* old = latest_.exchange(latest); + if (old) + { + filtered_pool_->free(old); + } + } + + ros::atomic latest_; + + lockfree::ObjectPool* filtered_pool_; + ros::Subscriber sub_; + boost::function& msg, const boost::shared_ptr filtered)> filter_; +}; + +} // namespace rosrt + +#endif // ROSRT_FILTERED_SUBSCRIBER_H diff --git a/rosrt/include/rosrt/rosrt.h b/rosrt/include/rosrt/rosrt.h index a640667..f35423a 100644 --- a/rosrt/include/rosrt/rosrt.h +++ b/rosrt/include/rosrt/rosrt.h @@ -37,6 +37,7 @@ #include "publisher.h" #include "subscriber.h" +#include "filtered_subscriber.h" #include "malloc_wrappers.h" #include "init.h" diff --git a/rosrt/test/test_filtered_subscriber.cpp b/rosrt/test/test_filtered_subscriber.cpp new file mode 100644 index 0000000..b8db918 --- /dev/null +++ b/rosrt/test/test_filtered_subscriber.cpp @@ -0,0 +1,126 @@ +/********************************************************************* +* Software License Agreement (BSD License) +* +* Copyright (c) 2010, CLMC Lab, University of Southern California +* All rights reserved. +* +* Redistribution and use in source and binary forms, with or without +* modification, are permitted provided that the following conditions +* are met: +* +* * Redistributions of source code must retain the above copyright +* notice, this list of conditions and the following disclaimer. +* * Redistributions in binary form must reproduce the above +* copyright notice, this list of conditions and the following +* disclaimer in the documentation and/or other materials provided +* with the distribution. +* * Neither the name of the Willow Garage nor the names of its +* contributors may be used to endorse or promote products derived +* from this software without specific prior written permission. +* +* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +* FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE +* COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +* BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +* POSSIBILITY OF SUCH DAMAGE. +*********************************************************************/ + +#include + +#include "rosrt/rosrt.h" + +#include + +#include +#include + +#include + +#ifdef __XENO__ +#include +#include +#endif + +using namespace rosrt; + +void publishThread(ros::Publisher& pub, bool& done) +{ + uint32_t i = 0; + std_msgs::UInt32 msg; + while (!done) + { + msg.data = i; + pub.publish(msg); + ros::WallDuration(0.0001).sleep(); + ++i; + } +} + +bool filter(const boost::shared_ptr& msg, const boost::shared_ptr filtered) +{ + filtered->data = msg->data; + + // some dummy memory allocations + int* x = new int(); + *x=0; + delete x; + + return true; +} + +TEST(FilteredSubscriber, singleSubscriber) +{ + ros::NodeHandle nh; + ros::Publisher pub = nh.advertise("test", 1); + bool done = false; + boost::thread t(boost::bind(publishThread, boost::ref(pub), boost::ref(done))); + + FilteredSubscriber sub; + sub.initialize(4, nh, "test", filter); + + resetThreadAllocInfo(); + + uint32_t count = 0; + int32_t last = -1; + while (count < 10000) + { + std_msgs::UInt64Ptr msg = sub.poll(); + if (msg) + { + ASSERT_GT((int32_t)msg->data, last); + last = msg->data; + ++count; + } +#ifdef __XENO__ + rt_task_sleep(1000000); +#endif + } + + ASSERT_EQ(getThreadAllocInfo().total_ops, 0UL); + + done = true; + t.join(); +} + +int main(int argc, char** argv) +{ +#ifdef __XENO__ + mlockall(MCL_CURRENT | MCL_FUTURE); + rt_task_shadow(NULL, "test_rt_filtered_subscriber", 0, 0); +#endif + + ros::init(argc, argv, "test_rt_filtered_subscriber"); + testing::InitGoogleTest(&argc, argv); + + ros::NodeHandle nh; + rosrt::init(); + + return RUN_ALL_TESTS(); +} diff --git a/rosrt/test/test_filtered_subscriber.xml b/rosrt/test/test_filtered_subscriber.xml new file mode 100644 index 0000000..558f937 --- /dev/null +++ b/rosrt/test/test_filtered_subscriber.xml @@ -0,0 +1,6 @@ + + + + + +