Go to the documentation of this file.00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 
00031 
00032 
00033 
00034 
00035 #include <gtest/gtest.h>
00036 
00037 #include "rosrt/rosrt.h"
00038 
00039 #include <ros/ros.h>
00040 
00041 #include <std_msgs/UInt32.h>
00042 
00043 #include <boost/thread.hpp>
00044 
00045 using namespace rosrt;
00046 
00047 void publishThread(ros::Publisher& pub, bool& done)
00048 {
00049   uint32_t i = 0;
00050   std_msgs::UInt32 msg;
00051   while (!done)
00052   {
00053     msg.data = i;
00054     pub.publish(msg);
00055     ros::WallDuration(0.0001).sleep();
00056     ++i;
00057   }
00058 }
00059 
00060 TEST(Subscriber, singleSubscriber)
00061 {
00062   ros::NodeHandle nh;
00063   ros::Publisher pub = nh.advertise<std_msgs::UInt32>("test", 1);
00064   bool done = false;
00065   boost::thread t(boost::bind(publishThread, boost::ref(pub), boost::ref(done)));
00066 
00067   Subscriber<std_msgs::UInt32> sub(2, nh, "test");
00068 
00069   resetThreadAllocInfo();
00070 
00071   uint32_t count = 0;
00072   int32_t last = -1;
00073   while (count < 10000)
00074   {
00075     std_msgs::UInt32ConstPtr msg = sub.poll();
00076     if (msg)
00077     {
00078       ASSERT_GT((int32_t)msg->data, last);
00079       last = msg->data;
00080       ++count;
00081     }
00082   }
00083 
00084   ASSERT_EQ(getThreadAllocInfo().total_ops, 0UL);
00085 
00086   done = true;
00087   t.join();
00088 }
00089 
00090 TEST(Subscriber, multipleSubscribersSameTopic)
00091 {
00092   ros::NodeHandle nh;
00093   ros::Publisher pub = nh.advertise<std_msgs::UInt32>("test", 1);
00094   bool done = false;
00095   boost::thread t(boost::bind(publishThread, boost::ref(pub), boost::ref(done)));
00096 
00097   const size_t sub_count = 10;
00098   Subscriber<std_msgs::UInt32> subs[sub_count];
00099   uint32_t counts[sub_count];
00100   int32_t lasts[sub_count];
00101   for (size_t i = 0; i < sub_count; ++i)
00102   {
00103     subs[i].initialize(2);
00104     subs[i].subscribe(nh, "test");
00105     counts[i] = 0;
00106     lasts[i] = -1;
00107   }
00108 
00109   resetThreadAllocInfo();
00110 
00111   bool all_done = false;
00112   while (!all_done)
00113   {
00114     all_done = true;
00115 
00116     for (size_t i = 0; i < sub_count; ++i)
00117     {
00118       std_msgs::UInt32ConstPtr msg = subs[i].poll();
00119       if (msg)
00120       {
00121         ASSERT_GT((int32_t)msg->data, lasts[i]);
00122         lasts[i] = msg->data;
00123         ++counts[i];
00124       }
00125 
00126       if (counts[i] < 10000)
00127       {
00128         all_done = false;
00129       }
00130     }
00131   }
00132 
00133   ASSERT_EQ(getThreadAllocInfo().total_ops, 0UL);
00134 
00135   done = true;
00136   t.join();
00137 }
00138 
00139 TEST(Subscriber, multipleSubscribersMultipleTopics)
00140 {
00141   const size_t sub_count = 10;
00142 
00143   ros::NodeHandle nh;
00144   bool done = false;
00145   boost::thread_group tg;
00146 
00147   ros::Publisher pubs[sub_count];
00148   Subscriber<std_msgs::UInt32> subs[sub_count];
00149   uint32_t counts[sub_count];
00150   int32_t lasts[sub_count];
00151   for (size_t i = 0; i < sub_count; ++i)
00152   {
00153     std::stringstream topic;
00154     topic << "test" << i;
00155     pubs[i] = nh.advertise<std_msgs::UInt32>(topic.str(), 1);
00156     tg.create_thread(boost::bind(publishThread, boost::ref(pubs[i]), boost::ref(done)));
00157 
00158     subs[i].initialize(2);
00159     subs[i].subscribe(nh, topic.str());
00160     counts[i] = 0;
00161     lasts[i] = -1;
00162   }
00163 
00164 
00165   bool all_done = false;
00166   while (!all_done)
00167   {
00168     all_done = true;
00169 
00170     for (size_t i = 0; i < sub_count; ++i)
00171     {
00172       std_msgs::UInt32ConstPtr msg = subs[i].poll();
00173       if (msg)
00174       {
00175         ASSERT_GT((int32_t)msg->data, lasts[i]);
00176         lasts[i] = msg->data;
00177         ++counts[i];
00178       }
00179 
00180       if (counts[i] < 10000)
00181       {
00182         all_done = false;
00183       }
00184     }
00185   }
00186 
00187   done = true;
00188   tg.join_all();
00189 }
00190 
00191 void subscribeThread(Subscriber<std_msgs::UInt32>& sub, bool& failed)
00192 {
00193   resetThreadAllocInfo();
00194 
00195   const int count = 10000;
00196   int my_count = 0;
00197   int32_t last = -1;
00198   while (true)
00199   {
00200     std_msgs::UInt32ConstPtr msg = sub.poll();
00201     if (msg)
00202     {
00203       if (last >= (int32_t)msg->data)
00204       {
00205         ROS_ERROR_STREAM("Thread " << boost::this_thread::get_id() << " last is greater than msg " << last << " >= " << msg->data);
00206         failed = true;
00207         break;
00208       }
00209 
00210       last = msg->data;
00211       ++my_count;
00212 
00213       
00214     }
00215 
00216     if (my_count >= count)
00217     {
00218       break;
00219     }
00220 
00221     ros::WallDuration(0.0001).sleep();
00222   }
00223 
00224   if (getThreadAllocInfo().total_ops > 0UL)
00225   {
00226     ROS_ERROR_STREAM("Thread " << boost::this_thread::get_id() << " performed " << getThreadAllocInfo().total_ops << " memory operations (malloc/free/etc.)");
00227     failed = true;
00228   }
00229 }
00230 
00231 TEST(Subscriber, multipleThreadsSameTopic)
00232 {
00233   ros::NodeHandle nh;
00234   ros::Publisher pub = nh.advertise<std_msgs::UInt32>("test", 1);
00235   bool done = false;
00236   boost::thread t(boost::bind(publishThread, boost::ref(pub), boost::ref(done)));
00237   boost::thread_group tg;
00238 
00239   const size_t sub_count = 10;
00240   Subscriber<std_msgs::UInt32> subs[sub_count];
00241   bool failed[sub_count];
00242   for (size_t i = 0; i < sub_count; ++i)
00243   {
00244     subs[i].initialize(2);
00245     subs[i].subscribe(nh, "test");
00246     failed[i] = false;
00247 
00248     tg.create_thread(boost::bind(subscribeThread, boost::ref(subs[i]), boost::ref(failed[i])));
00249   }
00250 
00251   tg.join_all();
00252   done = true;
00253   t.join();
00254 
00255   for (size_t i = 0; i < sub_count; ++i)
00256   {
00257     EXPECT_FALSE(failed[i]);
00258   }
00259 }
00260 
00261 int main(int argc, char** argv)
00262 {
00263   ros::init(argc, argv, "test_rt_subscriber");
00264   testing::InitGoogleTest(&argc, argv);
00265 
00266   ros::NodeHandle nh;
00267   rosrt::init();
00268 
00269   return RUN_ALL_TESTS();
00270 }