test_subscriber.cpp
Go to the documentation of this file.
00001 /*********************************************************************
00002 * Software License Agreement (BSD License)
00003 *
00004 *  Copyright (c) 2010, Willow Garage, Inc.
00005 *  All rights reserved.
00006 *
00007 *  Redistribution and use in source and binary forms, with or without
00008 *  modification, are permitted provided that the following conditions
00009 *  are met:
00010 *
00011 *   * Redistributions of source code must retain the above copyright
00012 *     notice, this list of conditions and the following disclaimer.
00013 *   * Redistributions in binary form must reproduce the above
00014 *     copyright notice, this list of conditions and the following
00015 *     disclaimer in the documentation and/or other materials provided
00016 *     with the distribution.
00017 *   * Neither the name of the Willow Garage nor the names of its
00018 *     contributors may be used to endorse or promote products derived
00019 *     from this software without specific prior written permission.
00020 *
00021 *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022 *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023 *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024 *  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025 *  COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026 *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027 *  BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028 *  LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029 *  CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030 *  LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031 *  ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032 *  POSSIBILITY OF SUCH DAMAGE.
00033 *********************************************************************/
00034 
00035 #include <gtest/gtest.h>
00036 
00037 #include "rosrt/rosrt.h"
00038 
00039 #include <ros/ros.h>
00040 
00041 #include <std_msgs/UInt32.h>
00042 
00043 #include <boost/thread.hpp>
00044 
00045 using namespace rosrt;
00046 
00047 void publishThread(ros::Publisher& pub, bool& done)
00048 {
00049   uint32_t i = 0;
00050   std_msgs::UInt32 msg;
00051   while (!done)
00052   {
00053     msg.data = i;
00054     pub.publish(msg);
00055     ros::WallDuration(0.0001).sleep();
00056     ++i;
00057   }
00058 }
00059 
00060 TEST(Subscriber, singleSubscriber)
00061 {
00062   ros::NodeHandle nh;
00063   ros::Publisher pub = nh.advertise<std_msgs::UInt32>("test", 1);
00064   bool done = false;
00065   boost::thread t(boost::bind(publishThread, boost::ref(pub), boost::ref(done)));
00066 
00067   Subscriber<std_msgs::UInt32> sub(2, nh, "test");
00068 
00069   resetThreadAllocInfo();
00070 
00071   uint32_t count = 0;
00072   int32_t last = -1;
00073   while (count < 10000)
00074   {
00075     std_msgs::UInt32ConstPtr msg = sub.poll();
00076     if (msg)
00077     {
00078       ASSERT_GT((int32_t)msg->data, last);
00079       last = msg->data;
00080       ++count;
00081     }
00082   }
00083 
00084   ASSERT_EQ(getThreadAllocInfo().total_ops, 0UL);
00085 
00086   done = true;
00087   t.join();
00088 }
00089 
00090 TEST(Subscriber, multipleSubscribersSameTopic)
00091 {
00092   ros::NodeHandle nh;
00093   ros::Publisher pub = nh.advertise<std_msgs::UInt32>("test", 1);
00094   bool done = false;
00095   boost::thread t(boost::bind(publishThread, boost::ref(pub), boost::ref(done)));
00096 
00097   const size_t sub_count = 10;
00098   Subscriber<std_msgs::UInt32> subs[sub_count];
00099   uint32_t counts[sub_count];
00100   int32_t lasts[sub_count];
00101   for (size_t i = 0; i < sub_count; ++i)
00102   {
00103     subs[i].initialize(2);
00104     subs[i].subscribe(nh, "test");
00105     counts[i] = 0;
00106     lasts[i] = -1;
00107   }
00108 
00109   resetThreadAllocInfo();
00110 
00111   bool all_done = false;
00112   while (!all_done)
00113   {
00114     all_done = true;
00115 
00116     for (size_t i = 0; i < sub_count; ++i)
00117     {
00118       std_msgs::UInt32ConstPtr msg = subs[i].poll();
00119       if (msg)
00120       {
00121         ASSERT_GT((int32_t)msg->data, lasts[i]);
00122         lasts[i] = msg->data;
00123         ++counts[i];
00124       }
00125 
00126       if (counts[i] < 10000)
00127       {
00128         all_done = false;
00129       }
00130     }
00131   }
00132 
00133   ASSERT_EQ(getThreadAllocInfo().total_ops, 0UL);
00134 
00135   done = true;
00136   t.join();
00137 }
00138 
00139 TEST(Subscriber, multipleSubscribersMultipleTopics)
00140 {
00141   const size_t sub_count = 10;
00142 
00143   ros::NodeHandle nh;
00144   bool done = false;
00145   boost::thread_group tg;
00146 
00147   ros::Publisher pubs[sub_count];
00148   Subscriber<std_msgs::UInt32> subs[sub_count];
00149   uint32_t counts[sub_count];
00150   int32_t lasts[sub_count];
00151   for (size_t i = 0; i < sub_count; ++i)
00152   {
00153     std::stringstream topic;
00154     topic << "test" << i;
00155     pubs[i] = nh.advertise<std_msgs::UInt32>(topic.str(), 1);
00156     tg.create_thread(boost::bind(publishThread, boost::ref(pubs[i]), boost::ref(done)));
00157 
00158     subs[i].initialize(2);
00159     subs[i].subscribe(nh, topic.str());
00160     counts[i] = 0;
00161     lasts[i] = -1;
00162   }
00163 
00164 
00165   bool all_done = false;
00166   while (!all_done)
00167   {
00168     all_done = true;
00169 
00170     for (size_t i = 0; i < sub_count; ++i)
00171     {
00172       std_msgs::UInt32ConstPtr msg = subs[i].poll();
00173       if (msg)
00174       {
00175         ASSERT_GT((int32_t)msg->data, lasts[i]);
00176         lasts[i] = msg->data;
00177         ++counts[i];
00178       }
00179 
00180       if (counts[i] < 10000)
00181       {
00182         all_done = false;
00183       }
00184     }
00185   }
00186 
00187   done = true;
00188   tg.join_all();
00189 }
00190 
00191 void subscribeThread(Subscriber<std_msgs::UInt32>& sub, bool& failed)
00192 {
00193   resetThreadAllocInfo();
00194 
00195   const int count = 10000;
00196   int my_count = 0;
00197   int32_t last = -1;
00198   while (true)
00199   {
00200     std_msgs::UInt32ConstPtr msg = sub.poll();
00201     if (msg)
00202     {
00203       if (last >= (int32_t)msg->data)
00204       {
00205         ROS_ERROR_STREAM("Thread " << boost::this_thread::get_id() << " last is greater than msg " << last << " >= " << msg->data);
00206         failed = true;
00207         break;
00208       }
00209 
00210       last = msg->data;
00211       ++my_count;
00212 
00213       //ROS_INFO_STREAM("Thread " << boost::this_thread::get_id() << " " << my_count);
00214     }
00215 
00216     if (my_count >= count)
00217     {
00218       break;
00219     }
00220 
00221     ros::WallDuration(0.0001).sleep();
00222   }
00223 
00224   if (getThreadAllocInfo().total_ops > 0UL)
00225   {
00226     ROS_ERROR_STREAM("Thread " << boost::this_thread::get_id() << " performed " << getThreadAllocInfo().total_ops << " memory operations (malloc/free/etc.)");
00227     failed = true;
00228   }
00229 }
00230 
00231 TEST(Subscriber, multipleThreadsSameTopic)
00232 {
00233   ros::NodeHandle nh;
00234   ros::Publisher pub = nh.advertise<std_msgs::UInt32>("test", 1);
00235   bool done = false;
00236   boost::thread t(boost::bind(publishThread, boost::ref(pub), boost::ref(done)));
00237   boost::thread_group tg;
00238 
00239   const size_t sub_count = 10;
00240   Subscriber<std_msgs::UInt32> subs[sub_count];
00241   bool failed[sub_count];
00242   for (size_t i = 0; i < sub_count; ++i)
00243   {
00244     subs[i].initialize(2);
00245     subs[i].subscribe(nh, "test");
00246     failed[i] = false;
00247 
00248     tg.create_thread(boost::bind(subscribeThread, boost::ref(subs[i]), boost::ref(failed[i])));
00249   }
00250 
00251   tg.join_all();
00252   done = true;
00253   t.join();
00254 
00255   for (size_t i = 0; i < sub_count; ++i)
00256   {
00257     EXPECT_FALSE(failed[i]);
00258   }
00259 }
00260 
00261 int main(int argc, char** argv)
00262 {
00263   ros::init(argc, argv, "test_rt_subscriber");
00264   testing::InitGoogleTest(&argc, argv);
00265 
00266   ros::NodeHandle nh;
00267   rosrt::init();
00268 
00269   return RUN_ALL_TESTS();
00270 }


rosrt
Author(s): Josh Faust
autogenerated on Sat Dec 28 2013 17:34:37