test_subscriber.cpp
Go to the documentation of this file.
1 /*********************************************************************
2 * Software License Agreement (BSD License)
3 *
4 * Copyright (c) 2010, Willow Garage, Inc.
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 *
11 * * Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * * Redistributions in binary form must reproduce the above
14 * copyright notice, this list of conditions and the following
15 * disclaimer in the documentation and/or other materials provided
16 * with the distribution.
17 * * Neither the name of the Willow Garage nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific prior written permission.
20 *
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29 * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 * POSSIBILITY OF SUCH DAMAGE.
33 *********************************************************************/
34 
35 #include <gtest/gtest.h>
36 
37 #include "rosrt/rosrt.h"
38 
39 #include <ros/ros.h>
40 
41 #include <std_msgs/UInt32.h>
42 
43 #include <boost/thread.hpp>
44 
45 using namespace rosrt;
46 
47 void publishThread(ros::Publisher& pub, bool& done)
48 {
49  uint32_t i = 0;
50  std_msgs::UInt32 msg;
51  while (!done)
52  {
53  msg.data = i;
54  pub.publish(msg);
55  ros::WallDuration(0.0001).sleep();
56  ++i;
57  }
58 }
59 
60 TEST(Subscriber, singleSubscriber)
61 {
62  ros::NodeHandle nh;
63  ros::Publisher pub = nh.advertise<std_msgs::UInt32>("test", 1);
64  bool done = false;
65  boost::thread t(boost::bind(publishThread, boost::ref(pub), boost::ref(done)));
66 
67  Subscriber<std_msgs::UInt32> sub(2, nh, "test");
68 
70 
71  uint32_t count = 0;
72  int32_t last = -1;
73  while (count < 10000)
74  {
75  std_msgs::UInt32ConstPtr msg = sub.poll();
76  if (msg)
77  {
78  ASSERT_GT((int32_t)msg->data, last);
79  last = msg->data;
80  ++count;
81  }
82  }
83 
84  ASSERT_EQ(getThreadAllocInfo().total_ops, 0UL);
85 
86  done = true;
87  t.join();
88 }
89 
90 TEST(Subscriber, multipleSubscribersSameTopic)
91 {
92  ros::NodeHandle nh;
93  ros::Publisher pub = nh.advertise<std_msgs::UInt32>("test", 1);
94  bool done = false;
95  boost::thread t(boost::bind(publishThread, boost::ref(pub), boost::ref(done)));
96 
97  const size_t sub_count = 10;
98  Subscriber<std_msgs::UInt32> subs[sub_count];
99  uint32_t counts[sub_count];
100  int32_t lasts[sub_count];
101  for (size_t i = 0; i < sub_count; ++i)
102  {
103  subs[i].initialize(2);
104  subs[i].subscribe(nh, "test");
105  counts[i] = 0;
106  lasts[i] = -1;
107  }
108 
110 
111  bool all_done = false;
112  while (!all_done)
113  {
114  all_done = true;
115 
116  for (size_t i = 0; i < sub_count; ++i)
117  {
118  std_msgs::UInt32ConstPtr msg = subs[i].poll();
119  if (msg)
120  {
121  ASSERT_GT((int32_t)msg->data, lasts[i]);
122  lasts[i] = msg->data;
123  ++counts[i];
124  }
125 
126  if (counts[i] < 10000)
127  {
128  all_done = false;
129  }
130  }
131  }
132 
133  ASSERT_EQ(getThreadAllocInfo().total_ops, 0UL);
134 
135  done = true;
136  t.join();
137 }
138 
139 TEST(Subscriber, multipleSubscribersMultipleTopics)
140 {
141  const size_t sub_count = 10;
142 
143  ros::NodeHandle nh;
144  bool done = false;
145  boost::thread_group tg;
146 
147  ros::Publisher pubs[sub_count];
148  Subscriber<std_msgs::UInt32> subs[sub_count];
149  uint32_t counts[sub_count];
150  int32_t lasts[sub_count];
151  for (size_t i = 0; i < sub_count; ++i)
152  {
153  std::stringstream topic;
154  topic << "test" << i;
155  pubs[i] = nh.advertise<std_msgs::UInt32>(topic.str(), 1);
156  tg.create_thread(boost::bind(publishThread, boost::ref(pubs[i]), boost::ref(done)));
157 
158  subs[i].initialize(2);
159  subs[i].subscribe(nh, topic.str());
160  counts[i] = 0;
161  lasts[i] = -1;
162  }
163 
164 
165  bool all_done = false;
166  while (!all_done)
167  {
168  all_done = true;
169 
170  for (size_t i = 0; i < sub_count; ++i)
171  {
172  std_msgs::UInt32ConstPtr msg = subs[i].poll();
173  if (msg)
174  {
175  ASSERT_GT((int32_t)msg->data, lasts[i]);
176  lasts[i] = msg->data;
177  ++counts[i];
178  }
179 
180  if (counts[i] < 10000)
181  {
182  all_done = false;
183  }
184  }
185  }
186 
187  done = true;
188  tg.join_all();
189 }
190 
192 {
194 
195  const int count = 10000;
196  int my_count = 0;
197  int32_t last = -1;
198  while (true)
199  {
200  std_msgs::UInt32ConstPtr msg = sub.poll();
201  if (msg)
202  {
203  if (last >= (int32_t)msg->data)
204  {
205  ROS_ERROR_STREAM("Thread " << boost::this_thread::get_id() << " last is greater than msg " << last << " >= " << msg->data);
206  failed = true;
207  break;
208  }
209 
210  last = msg->data;
211  ++my_count;
212 
213  //ROS_INFO_STREAM("Thread " << boost::this_thread::get_id() << " " << my_count);
214  }
215 
216  if (my_count >= count)
217  {
218  break;
219  }
220 
221  ros::WallDuration(0.0001).sleep();
222  }
223 
224  if (getThreadAllocInfo().total_ops > 0UL)
225  {
226  ROS_ERROR_STREAM("Thread " << boost::this_thread::get_id() << " performed " << getThreadAllocInfo().total_ops << " memory operations (malloc/free/etc.)");
227  failed = true;
228  }
229 }
230 
231 TEST(Subscriber, multipleThreadsSameTopic)
232 {
233  ros::NodeHandle nh;
234  ros::Publisher pub = nh.advertise<std_msgs::UInt32>("test", 1);
235  bool done = false;
236  boost::thread t(boost::bind(publishThread, boost::ref(pub), boost::ref(done)));
237  boost::thread_group tg;
238 
239  const size_t sub_count = 10;
240  Subscriber<std_msgs::UInt32> subs[sub_count];
241  bool failed[sub_count];
242  for (size_t i = 0; i < sub_count; ++i)
243  {
244  subs[i].initialize(2);
245  subs[i].subscribe(nh, "test");
246  failed[i] = false;
247 
248  tg.create_thread(boost::bind(subscribeThread, boost::ref(subs[i]), boost::ref(failed[i])));
249  }
250 
251  tg.join_all();
252  done = true;
253  t.join();
254 
255  for (size_t i = 0; i < sub_count; ++i)
256  {
257  EXPECT_FALSE(failed[i]);
258  }
259 }
260 
261 int main(int argc, char** argv)
262 {
263  ros::init(argc, argv, "test_rt_subscriber");
264  testing::InitGoogleTest(&argc, argv);
265 
266  ros::NodeHandle nh;
267  rosrt::init();
268 
269  return RUN_ALL_TESTS();
270 }
ros::WallDuration::sleep
bool sleep() const
ros::Publisher
ROS_ERROR_STREAM
#define ROS_ERROR_STREAM(args)
ros::init
ROSCPP_DECL void init(const M_string &remappings, const std::string &name, uint32_t options=0)
rosrt
Definition: managers.h:38
ros.h
publishThread
void publishThread(ros::Publisher &pub, bool &done)
Definition: test_subscriber.cpp:47
main
int main(int argc, char **argv)
Definition: test_subscriber.cpp:261
rosrt::init
void init(const InitOptions &ops=InitOptions())
Definition: init.cpp:135
ros::Publisher::publish
void publish(const boost::shared_ptr< M > &message) const
ros::NodeHandle::advertise
Publisher advertise(AdvertiseOptions &ops)
rosrt::Subscriber::poll
boost::shared_ptr< M const > poll()
Definition: subscriber.h:182
rosrt::resetThreadAllocInfo
void resetThreadAllocInfo()
Definition: malloc.cpp:165
subscribeThread
void subscribeThread(Subscriber< std_msgs::UInt32 > &sub, bool &failed)
Definition: test_subscriber.cpp:191
rosrt::Subscriber::initialize
void initialize(uint32_t message_pool_size)
Initialize this subscribe. Only use with the default constructor.
Definition: subscriber.h:128
rosrt::getThreadAllocInfo
AllocInfo getThreadAllocInfo()
Definition: malloc.cpp:142
rosrt.h
ros::WallDuration
rosrt::Subscriber::subscribe
bool subscribe(ros::NodeHandle &nh, const std::string &topic, const ros::TransportHints &transport_hints=ros::TransportHints())
Subscribe to a topic.
Definition: subscriber.h:159
rosrt::Subscriber
A lock-free subscriber. Allows you to receive ROS messages inside a realtime thread.
Definition: subscriber.h:75
ros::NodeHandle
TEST
TEST(Subscriber, singleSubscriber)
Definition: test_subscriber.cpp:60


rosrt
Author(s): Josh Faust
autogenerated on Wed Mar 2 2022 00:54:17