00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #include <gtest/gtest.h>
00036
00037 #include "rosrt/rosrt.h"
00038
00039 #include <ros/ros.h>
00040
00041 #include <std_msgs/UInt32.h>
00042
00043 #include <boost/thread.hpp>
00044
00045 using namespace rosrt;
00046
00047 void publishThread(ros::Publisher& pub, bool& done)
00048 {
00049 uint32_t i = 0;
00050 std_msgs::UInt32 msg;
00051 while (!done)
00052 {
00053 msg.data = i;
00054 pub.publish(msg);
00055 ros::WallDuration(0.0001).sleep();
00056 ++i;
00057 }
00058 }
00059
00060 TEST(Subscriber, singleSubscriber)
00061 {
00062 ros::NodeHandle nh;
00063 ros::Publisher pub = nh.advertise<std_msgs::UInt32>("test", 1);
00064 bool done = false;
00065 boost::thread t(boost::bind(publishThread, boost::ref(pub), boost::ref(done)));
00066
00067 Subscriber<std_msgs::UInt32> sub(2, nh, "test");
00068
00069 resetThreadAllocInfo();
00070
00071 uint32_t count = 0;
00072 int32_t last = -1;
00073 while (count < 10000)
00074 {
00075 std_msgs::UInt32ConstPtr msg = sub.poll();
00076 if (msg)
00077 {
00078 ASSERT_GT((int32_t)msg->data, last);
00079 last = msg->data;
00080 ++count;
00081 }
00082 }
00083
00084 ASSERT_EQ(getThreadAllocInfo().total_ops, 0UL);
00085
00086 done = true;
00087 t.join();
00088 }
00089
00090 TEST(Subscriber, multipleSubscribersSameTopic)
00091 {
00092 ros::NodeHandle nh;
00093 ros::Publisher pub = nh.advertise<std_msgs::UInt32>("test", 1);
00094 bool done = false;
00095 boost::thread t(boost::bind(publishThread, boost::ref(pub), boost::ref(done)));
00096
00097 const size_t sub_count = 10;
00098 Subscriber<std_msgs::UInt32> subs[sub_count];
00099 uint32_t counts[sub_count];
00100 int32_t lasts[sub_count];
00101 for (size_t i = 0; i < sub_count; ++i)
00102 {
00103 subs[i].initialize(2);
00104 subs[i].subscribe(nh, "test");
00105 counts[i] = 0;
00106 lasts[i] = -1;
00107 }
00108
00109 resetThreadAllocInfo();
00110
00111 bool all_done = false;
00112 while (!all_done)
00113 {
00114 all_done = true;
00115
00116 for (size_t i = 0; i < sub_count; ++i)
00117 {
00118 std_msgs::UInt32ConstPtr msg = subs[i].poll();
00119 if (msg)
00120 {
00121 ASSERT_GT((int32_t)msg->data, lasts[i]);
00122 lasts[i] = msg->data;
00123 ++counts[i];
00124 }
00125
00126 if (counts[i] < 10000)
00127 {
00128 all_done = false;
00129 }
00130 }
00131 }
00132
00133 ASSERT_EQ(getThreadAllocInfo().total_ops, 0UL);
00134
00135 done = true;
00136 t.join();
00137 }
00138
00139 TEST(Subscriber, multipleSubscribersMultipleTopics)
00140 {
00141 const size_t sub_count = 10;
00142
00143 ros::NodeHandle nh;
00144 bool done = false;
00145 boost::thread_group tg;
00146
00147 ros::Publisher pubs[sub_count];
00148 Subscriber<std_msgs::UInt32> subs[sub_count];
00149 uint32_t counts[sub_count];
00150 int32_t lasts[sub_count];
00151 for (size_t i = 0; i < sub_count; ++i)
00152 {
00153 std::stringstream topic;
00154 topic << "test" << i;
00155 pubs[i] = nh.advertise<std_msgs::UInt32>(topic.str(), 1);
00156 tg.create_thread(boost::bind(publishThread, boost::ref(pubs[i]), boost::ref(done)));
00157
00158 subs[i].initialize(2);
00159 subs[i].subscribe(nh, topic.str());
00160 counts[i] = 0;
00161 lasts[i] = -1;
00162 }
00163
00164
00165 bool all_done = false;
00166 while (!all_done)
00167 {
00168 all_done = true;
00169
00170 for (size_t i = 0; i < sub_count; ++i)
00171 {
00172 std_msgs::UInt32ConstPtr msg = subs[i].poll();
00173 if (msg)
00174 {
00175 ASSERT_GT((int32_t)msg->data, lasts[i]);
00176 lasts[i] = msg->data;
00177 ++counts[i];
00178 }
00179
00180 if (counts[i] < 10000)
00181 {
00182 all_done = false;
00183 }
00184 }
00185 }
00186
00187 done = true;
00188 tg.join_all();
00189 }
00190
00191 void subscribeThread(Subscriber<std_msgs::UInt32>& sub, bool& failed)
00192 {
00193 resetThreadAllocInfo();
00194
00195 const int count = 10000;
00196 int my_count = 0;
00197 int32_t last = -1;
00198 while (true)
00199 {
00200 std_msgs::UInt32ConstPtr msg = sub.poll();
00201 if (msg)
00202 {
00203 if (last >= (int32_t)msg->data)
00204 {
00205 ROS_ERROR_STREAM("Thread " << boost::this_thread::get_id() << " last is greater than msg " << last << " >= " << msg->data);
00206 failed = true;
00207 break;
00208 }
00209
00210 last = msg->data;
00211 ++my_count;
00212
00213
00214 }
00215
00216 if (my_count >= count)
00217 {
00218 break;
00219 }
00220
00221 ros::WallDuration(0.0001).sleep();
00222 }
00223
00224 if (getThreadAllocInfo().total_ops > 0UL)
00225 {
00226 ROS_ERROR_STREAM("Thread " << boost::this_thread::get_id() << " performed " << getThreadAllocInfo().total_ops << " memory operations (malloc/free/etc.)");
00227 failed = true;
00228 }
00229 }
00230
00231 TEST(Subscriber, multipleThreadsSameTopic)
00232 {
00233 ros::NodeHandle nh;
00234 ros::Publisher pub = nh.advertise<std_msgs::UInt32>("test", 1);
00235 bool done = false;
00236 boost::thread t(boost::bind(publishThread, boost::ref(pub), boost::ref(done)));
00237 boost::thread_group tg;
00238
00239 const size_t sub_count = 10;
00240 Subscriber<std_msgs::UInt32> subs[sub_count];
00241 bool failed[sub_count];
00242 for (size_t i = 0; i < sub_count; ++i)
00243 {
00244 subs[i].initialize(2);
00245 subs[i].subscribe(nh, "test");
00246 failed[i] = false;
00247
00248 tg.create_thread(boost::bind(subscribeThread, boost::ref(subs[i]), boost::ref(failed[i])));
00249 }
00250
00251 tg.join_all();
00252 done = true;
00253 t.join();
00254
00255 for (size_t i = 0; i < sub_count; ++i)
00256 {
00257 EXPECT_FALSE(failed[i]);
00258 }
00259 }
00260
00261 int main(int argc, char** argv)
00262 {
00263 ros::init(argc, argv, "test_rt_subscriber");
00264 testing::InitGoogleTest(&argc, argv);
00265
00266 ros::NodeHandle nh;
00267 rosrt::init();
00268
00269 return RUN_ALL_TESTS();
00270 }