$search
00001 /********************************************************************* 00002 * Software License Agreement (BSD License) 00003 * 00004 * Copyright (c) 2010, Willow Garage, Inc. 00005 * All rights reserved. 00006 * 00007 * Redistribution and use in source and binary forms, with or without 00008 * modification, are permitted provided that the following conditions 00009 * are met: 00010 * 00011 * * Redistributions of source code must retain the above copyright 00012 * notice, this list of conditions and the following disclaimer. 00013 * * Redistributions in binary form must reproduce the above 00014 * copyright notice, this list of conditions and the following 00015 * disclaimer in the documentation and/or other materials provided 00016 * with the distribution. 00017 * * Neither the name of the Willow Garage nor the names of its 00018 * contributors may be used to endorse or promote products derived 00019 * from this software without specific prior written permission. 00020 * 00021 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 00022 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 00023 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 00024 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 00025 * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 00026 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 00027 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00028 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 00029 * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 00030 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 00031 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00032 * POSSIBILITY OF SUCH DAMAGE. 00033 *********************************************************************/ 00034 00035 #include <gtest/gtest.h> 00036 00037 #include "rosrt/rosrt.h" 00038 00039 #include <ros/ros.h> 00040 00041 #include <std_msgs/UInt32.h> 00042 00043 #include <boost/thread.hpp> 00044 00045 using namespace rosrt; 00046 00047 void publishThread(ros::Publisher& pub, bool& done) 00048 { 00049 uint32_t i = 0; 00050 std_msgs::UInt32 msg; 00051 while (!done) 00052 { 00053 msg.data = i; 00054 pub.publish(msg); 00055 ros::WallDuration(0.0001).sleep(); 00056 ++i; 00057 } 00058 } 00059 00060 TEST(Subscriber, singleSubscriber) 00061 { 00062 ros::NodeHandle nh; 00063 ros::Publisher pub = nh.advertise<std_msgs::UInt32>("test", 1); 00064 bool done = false; 00065 boost::thread t(boost::bind(publishThread, boost::ref(pub), boost::ref(done))); 00066 00067 Subscriber<std_msgs::UInt32> sub(2, nh, "test"); 00068 00069 resetThreadAllocInfo(); 00070 00071 uint32_t count = 0; 00072 int32_t last = -1; 00073 while (count < 10000) 00074 { 00075 std_msgs::UInt32ConstPtr msg = sub.poll(); 00076 if (msg) 00077 { 00078 ASSERT_GT((int32_t)msg->data, last); 00079 last = msg->data; 00080 ++count; 00081 } 00082 } 00083 00084 ASSERT_EQ(getThreadAllocInfo().total_ops, 0UL); 00085 00086 done = true; 00087 t.join(); 00088 } 00089 00090 TEST(Subscriber, multipleSubscribersSameTopic) 00091 { 00092 ros::NodeHandle nh; 00093 ros::Publisher pub = nh.advertise<std_msgs::UInt32>("test", 1); 00094 bool done = false; 00095 boost::thread t(boost::bind(publishThread, boost::ref(pub), boost::ref(done))); 00096 00097 const size_t sub_count = 10; 00098 Subscriber<std_msgs::UInt32> subs[sub_count]; 00099 uint32_t counts[sub_count]; 00100 int32_t lasts[sub_count]; 00101 for (size_t i = 0; i < sub_count; ++i) 00102 { 00103 subs[i].initialize(2); 00104 subs[i].subscribe(nh, "test"); 00105 counts[i] = 0; 00106 lasts[i] = -1; 00107 } 00108 00109 resetThreadAllocInfo(); 00110 00111 bool all_done = false; 00112 while (!all_done) 00113 { 00114 all_done = true; 00115 00116 for (size_t i = 0; i < sub_count; ++i) 00117 { 00118 std_msgs::UInt32ConstPtr msg = subs[i].poll(); 00119 if (msg) 00120 { 00121 ASSERT_GT((int32_t)msg->data, lasts[i]); 00122 lasts[i] = msg->data; 00123 ++counts[i]; 00124 } 00125 00126 if (counts[i] < 10000) 00127 { 00128 all_done = false; 00129 } 00130 } 00131 } 00132 00133 ASSERT_EQ(getThreadAllocInfo().total_ops, 0UL); 00134 00135 done = true; 00136 t.join(); 00137 } 00138 00139 TEST(Subscriber, multipleSubscribersMultipleTopics) 00140 { 00141 const size_t sub_count = 10; 00142 00143 ros::NodeHandle nh; 00144 bool done = false; 00145 boost::thread_group tg; 00146 00147 ros::Publisher pubs[sub_count]; 00148 Subscriber<std_msgs::UInt32> subs[sub_count]; 00149 uint32_t counts[sub_count]; 00150 int32_t lasts[sub_count]; 00151 for (size_t i = 0; i < sub_count; ++i) 00152 { 00153 std::stringstream topic; 00154 topic << "test" << i; 00155 pubs[i] = nh.advertise<std_msgs::UInt32>(topic.str(), 1); 00156 tg.create_thread(boost::bind(publishThread, boost::ref(pubs[i]), boost::ref(done))); 00157 00158 subs[i].initialize(2); 00159 subs[i].subscribe(nh, topic.str()); 00160 counts[i] = 0; 00161 lasts[i] = -1; 00162 } 00163 00164 00165 bool all_done = false; 00166 while (!all_done) 00167 { 00168 all_done = true; 00169 00170 for (size_t i = 0; i < sub_count; ++i) 00171 { 00172 std_msgs::UInt32ConstPtr msg = subs[i].poll(); 00173 if (msg) 00174 { 00175 ASSERT_GT((int32_t)msg->data, lasts[i]); 00176 lasts[i] = msg->data; 00177 ++counts[i]; 00178 } 00179 00180 if (counts[i] < 10000) 00181 { 00182 all_done = false; 00183 } 00184 } 00185 } 00186 00187 done = true; 00188 tg.join_all(); 00189 } 00190 00191 void subscribeThread(Subscriber<std_msgs::UInt32>& sub, bool& failed) 00192 { 00193 resetThreadAllocInfo(); 00194 00195 const int count = 10000; 00196 int my_count = 0; 00197 int32_t last = -1; 00198 while (true) 00199 { 00200 std_msgs::UInt32ConstPtr msg = sub.poll(); 00201 if (msg) 00202 { 00203 if (last >= (int32_t)msg->data) 00204 { 00205 ROS_ERROR_STREAM("Thread " << boost::this_thread::get_id() << " last is greater than msg " << last << " >= " << msg->data); 00206 failed = true; 00207 break; 00208 } 00209 00210 last = msg->data; 00211 ++my_count; 00212 00213 //ROS_INFO_STREAM("Thread " << boost::this_thread::get_id() << " " << my_count); 00214 } 00215 00216 if (my_count >= count) 00217 { 00218 break; 00219 } 00220 00221 ros::WallDuration(0.0001).sleep(); 00222 } 00223 00224 if (getThreadAllocInfo().total_ops > 0UL) 00225 { 00226 ROS_ERROR_STREAM("Thread " << boost::this_thread::get_id() << " performed " << getThreadAllocInfo().total_ops << " memory operations (malloc/free/etc.)"); 00227 failed = true; 00228 } 00229 } 00230 00231 TEST(Subscriber, multipleThreadsSameTopic) 00232 { 00233 ros::NodeHandle nh; 00234 ros::Publisher pub = nh.advertise<std_msgs::UInt32>("test", 1); 00235 bool done = false; 00236 boost::thread t(boost::bind(publishThread, boost::ref(pub), boost::ref(done))); 00237 boost::thread_group tg; 00238 00239 const size_t sub_count = 10; 00240 Subscriber<std_msgs::UInt32> subs[sub_count]; 00241 bool failed[sub_count]; 00242 for (size_t i = 0; i < sub_count; ++i) 00243 { 00244 subs[i].initialize(2); 00245 subs[i].subscribe(nh, "test"); 00246 failed[i] = false; 00247 00248 tg.create_thread(boost::bind(subscribeThread, boost::ref(subs[i]), boost::ref(failed[i]))); 00249 } 00250 00251 tg.join_all(); 00252 done = true; 00253 t.join(); 00254 00255 for (size_t i = 0; i < sub_count; ++i) 00256 { 00257 EXPECT_FALSE(failed[i]); 00258 } 00259 } 00260 00261 int main(int argc, char** argv) 00262 { 00263 ros::init(argc, argv, "test_rt_subscriber"); 00264 testing::InitGoogleTest(&argc, argv); 00265 00266 ros::NodeHandle nh; 00267 rosrt::init(); 00268 00269 return RUN_ALL_TESTS(); 00270 }