test_lazy_subscriber.cpp
Go to the documentation of this file.
1 
9 #include "gtest/gtest.h"
10 
11 // HACK: we need to access TopicManager::subscriptions_
12 #define private public
13 #include <ros/topic_manager.h>
14 #undef private
15 
16 #include <memory>
17 #include <string>
18 #include <utility>
19 
20 #include <ros/ros.h>
21 #include <ros/subscription.h>
22 #include <std_msgs/Header.h>
23 #include <topic_tools/shape_shifter.h>
24 
26 
27 void spin(double duration, double wait)
28 {
29  double waited = 0;
30  while (waited < duration)
31  {
32  ros::spinOnce();
34  waited += wait;
35  }
36 }
37 
38 void spin(double duration)
39 {
40  spin(duration, duration / 1000.0);
41 }
42 
43 size_t getNumSubscriptions(const std::string& topic)
44 {
45  size_t num = 0;
46  auto subs = ros::TopicManager::instance()->subscriptions_;
47  for (const auto& sub : subs)
48  {
49  if (sub->getName() == topic)
50  {
51  num += sub->getNumCallbacks();
52  }
53  }
54  return num;
55 }
56 
57 template<typename M>
59 {
60 public:
61  TestLazySubscriber(ros::NodeHandle publisherNh, const std::string& publisherTopic,
64  const cras::LogHelperPtr& logHelper = std::make_shared<cras::NodeLogHelper>()) :
66  std::move(publisherNh), publisherTopic, std::move(connectFn), std::move(disconnectFn), logHelper)
67  {
68  }
70 };
71 
72 TEST(LazySubscriberBase, Test) // NOLINT
73 {
74  ros::NodeHandle nh;
75  ros::NodeHandle pnh({"tmp"}, "my");
76  auto outPub = pnh.advertise<std_msgs::Header>("out", 10);
77  auto inPub = nh.advertise<std_msgs::Header>("in", 10);
78 
79  size_t numOutSubscribes = 0;
80  size_t numInReceived = 0;
81  size_t numOutReceived = 0;
82  boost::function<void(const std_msgs::Header&)> relayCb = [&](const std_msgs::Header& h)
83  {
84  ++numInReceived;
85  outPub.publish(h);
86  };
87  boost::function<void(const std_msgs::Header&)> outCb = [&](const std_msgs::Header& h)
88  {
89  ++numOutReceived;
90  };
91 
92  auto lazySub = std::make_unique<TestLazySubscriber<std_msgs::Header>>(
93  pnh, outPub.getTopic(), [&](ros::Subscriber& sub)
94  {
95  ++numOutSubscribes;
96  sub = nh.subscribe<std_msgs::Header>("in", 10, relayCb);
97  });
98 
99  spin(0.1);
100  EXPECT_EQ(0, numOutSubscribes);
101  EXPECT_EQ(0, numInReceived);
102  EXPECT_EQ(0, outPub.getNumSubscribers());
103  EXPECT_EQ(0, inPub.getNumSubscribers());
104  EXPECT_EQ(0, getNumSubscriptions("/in"));
105  EXPECT_EQ(false, lazySub->isSubscribed());
106 
107  {
108  auto outSub = pnh.subscribe<std_msgs::Header>("out", 10, outCb);
109 
110  spin(0.1);
111  EXPECT_EQ(1, numOutSubscribes);
112  EXPECT_EQ(0, numInReceived);
113  EXPECT_EQ(0, numOutReceived);
114  EXPECT_EQ(1, outPub.getNumSubscribers());
115  EXPECT_EQ(1, inPub.getNumSubscribers());
116  EXPECT_EQ(1, getNumSubscriptions("/in"));
117  EXPECT_EQ(1, getNumSubscriptions("/tmp/my/out"));
118  EXPECT_EQ(true, lazySub->isSubscribed());
119 
120  inPub.publish(std_msgs::Header());
121 
122  spin(0.1);
123  EXPECT_EQ(1, numOutSubscribes);
124  EXPECT_EQ(1, numInReceived);
125  EXPECT_EQ(1, numOutReceived);
126  EXPECT_EQ(1, outPub.getNumSubscribers());
127  EXPECT_EQ(1, inPub.getNumSubscribers());
128  EXPECT_EQ(1, getNumSubscriptions("/in"));
129  EXPECT_EQ(1, getNumSubscriptions("/tmp/my/out"));
130  EXPECT_EQ(true, lazySub->isSubscribed());
131  }
132 
133  spin(0.1);
134  EXPECT_EQ(1, numOutSubscribes);
135  EXPECT_EQ(1, numInReceived);
136  EXPECT_EQ(1, numOutReceived);
137  EXPECT_EQ(0, outPub.getNumSubscribers());
138  EXPECT_EQ(0, inPub.getNumSubscribers());
139  EXPECT_EQ(0, getNumSubscriptions("/in"));
140  EXPECT_EQ(0, getNumSubscriptions("/tmp/my/out"));
141  EXPECT_EQ(false, lazySub->isSubscribed());
142 
143  inPub.publish(std_msgs::Header());
144 
145  spin(0.1);
146  EXPECT_EQ(1, numOutSubscribes);
147  EXPECT_EQ(1, numInReceived);
148  EXPECT_EQ(1, numOutReceived);
149  EXPECT_EQ(0, outPub.getNumSubscribers());
150  EXPECT_EQ(0, inPub.getNumSubscribers());
151  EXPECT_EQ(0, getNumSubscriptions("/in"));
152  EXPECT_EQ(0, getNumSubscriptions("/tmp/my/out"));
153  EXPECT_EQ(false, lazySub->isSubscribed());
154 
155  {
156  auto outSub = pnh.subscribe<std_msgs::Header>("out", 10, outCb);
157  auto outSub2 = pnh.subscribe<std_msgs::Header>("out", 10, outCb);
158 
159  spin(0.1);
160  EXPECT_EQ(2, numOutSubscribes);
161  EXPECT_EQ(1, numInReceived);
162  EXPECT_EQ(1, numOutReceived);
163  EXPECT_EQ(1, outPub.getNumSubscribers());
164  EXPECT_EQ(1, inPub.getNumSubscribers());
165  EXPECT_EQ(1, getNumSubscriptions("/in"));
166  EXPECT_EQ(2, getNumSubscriptions("/tmp/my/out"));
167  EXPECT_EQ(true, lazySub->isSubscribed());
168 
169  // Simulate connection from a second node; this can't be done from within a single executable.
170  // This should not have any effect as we're only interested in the first or last subscriber.
171  lazySub->connectCb({{}});
172 
173  spin(0.1);
174  EXPECT_EQ(2, numOutSubscribes);
175  EXPECT_EQ(1, numInReceived);
176  EXPECT_EQ(1, numOutReceived);
177  EXPECT_EQ(1, outPub.getNumSubscribers());
178  EXPECT_EQ(1, inPub.getNumSubscribers());
179  EXPECT_EQ(1, getNumSubscriptions("/in"));
180  EXPECT_EQ(2, getNumSubscriptions("/tmp/my/out"));
181  EXPECT_EQ(true, lazySub->isSubscribed());
182 
183  inPub.publish(std_msgs::Header());
184  inPub.publish(std_msgs::Header());
185 
186  spin(0.1);
187  EXPECT_EQ(2, numOutSubscribes);
188  EXPECT_EQ(3, numInReceived);
189  EXPECT_EQ(5, numOutReceived);
190  EXPECT_EQ(1, outPub.getNumSubscribers());
191  EXPECT_EQ(1, inPub.getNumSubscribers());
192  EXPECT_EQ(1, getNumSubscriptions("/in"));
193  EXPECT_EQ(2, getNumSubscriptions("/tmp/my/out"));
194  EXPECT_EQ(true, lazySub->isSubscribed());
195  }
196 
197  spin(0.1);
198  EXPECT_EQ(2, numOutSubscribes);
199  EXPECT_EQ(3, numInReceived);
200  EXPECT_EQ(5, numOutReceived);
201  EXPECT_EQ(0, outPub.getNumSubscribers());
202  EXPECT_EQ(0, inPub.getNumSubscribers());
203  EXPECT_EQ(0, getNumSubscriptions("/in"));
204  EXPECT_EQ(0, getNumSubscriptions("/tmp/my/out"));
205  EXPECT_EQ(false, lazySub->isSubscribed());
206 
207  inPub.publish(std_msgs::Header());
208  inPub.publish(std_msgs::Header());
209  inPub.publish(std_msgs::Header());
210 
211  spin(0.1);
212  EXPECT_EQ(2, numOutSubscribes);
213  EXPECT_EQ(3, numInReceived);
214  EXPECT_EQ(5, numOutReceived);
215  EXPECT_EQ(0, outPub.getNumSubscribers());
216  EXPECT_EQ(0, inPub.getNumSubscribers());
217  EXPECT_EQ(0, getNumSubscriptions("/in"));
218  EXPECT_EQ(0, getNumSubscriptions("/tmp/my/out"));
219  EXPECT_EQ(false, lazySub->isSubscribed());
220 
221  {
222  auto outSub = pnh.subscribe<std_msgs::Header>("out", 10, outCb);
223 
224  spin(0.1);
225  EXPECT_EQ(3, numOutSubscribes);
226  EXPECT_EQ(3, numInReceived);
227  EXPECT_EQ(5, numOutReceived);
228  EXPECT_EQ(1, outPub.getNumSubscribers());
229  EXPECT_EQ(1, inPub.getNumSubscribers());
230  EXPECT_EQ(1, getNumSubscriptions("/in"));
231  EXPECT_EQ(1, getNumSubscriptions("/tmp/my/out"));
232  EXPECT_EQ(true, lazySub->isSubscribed());
233 
234  // Delete the pubsub object - it should disconnect the subscriber.
235  lazySub.reset();
236 
237  spin(0.1);
238  EXPECT_EQ(3, numOutSubscribes);
239  EXPECT_EQ(3, numInReceived);
240  EXPECT_EQ(5, numOutReceived);
241  EXPECT_EQ(1, outPub.getNumSubscribers());
242  EXPECT_EQ(0, inPub.getNumSubscribers());
243  EXPECT_EQ(0, getNumSubscriptions("/in"));
244  EXPECT_EQ(1, getNumSubscriptions("/tmp/my/out"));
245  }
246 }
247 
248 TEST(LazySubscriberBase, SetLazy) // NOLINT
249 {
250  ros::NodeHandle nh;
251  ros::NodeHandle pnh({"tmp"}, "my");
252  auto outPub = pnh.advertise<std_msgs::Header>("out", 10);
253  auto inPub = nh.advertise<std_msgs::Header>("in", 10);
254 
255  size_t numOutSubscribes = 0;
256  size_t numInReceived = 0;
257  size_t numOutReceived = 0;
258  boost::function<void(const std_msgs::Header&)> relayCb = [&](const std_msgs::Header& h)
259  {
260  ++numInReceived;
261  outPub.publish(h);
262  };
263  boost::function<void(const std_msgs::Header&)> outCb = [&](const std_msgs::Header& h)
264  {
265  ++numOutReceived;
266  };
267 
268  auto lazySub = std::make_unique<TestLazySubscriber<std_msgs::Header>>(
269  pnh, outPub.getTopic(), [&](ros::Subscriber& sub)
270  {
271  ++numOutSubscribes;
272  sub = nh.subscribe<std_msgs::Header>("in", 10, relayCb);
273  });
274 
275  spin(0.1);
276  EXPECT_EQ(0, numOutSubscribes);
277  EXPECT_EQ(0, numInReceived);
278  EXPECT_EQ(0, numOutReceived);
279  EXPECT_EQ(0, outPub.getNumSubscribers());
280  EXPECT_EQ(0, inPub.getNumSubscribers());
281  EXPECT_EQ(0, getNumSubscriptions("/in"));
282  EXPECT_EQ(0, getNumSubscriptions("/tmp/my/out"));
283  EXPECT_EQ(false, lazySub->isSubscribed());
284  EXPECT_EQ(true, lazySub->isLazy());
285 
286  lazySub->setLazy(false);
287 
288  spin(0.1);
289  EXPECT_EQ(1, numOutSubscribes);
290  EXPECT_EQ(0, numInReceived);
291  EXPECT_EQ(0, numOutReceived);
292  EXPECT_EQ(0, outPub.getNumSubscribers());
293  EXPECT_EQ(1, inPub.getNumSubscribers());
294  EXPECT_EQ(1, getNumSubscriptions("/in"));
295  EXPECT_EQ(0, getNumSubscriptions("/tmp/my/out"));
296  EXPECT_EQ(true, lazySub->isSubscribed());
297  EXPECT_EQ(false, lazySub->isLazy());
298 
299  inPub.publish(std_msgs::Header());
300 
301  spin(0.1);
302  EXPECT_EQ(1, numOutSubscribes);
303  EXPECT_EQ(1, numInReceived);
304  EXPECT_EQ(0, numOutReceived);
305  EXPECT_EQ(0, outPub.getNumSubscribers());
306  EXPECT_EQ(1, inPub.getNumSubscribers());
307  EXPECT_EQ(1, getNumSubscriptions("/in"));
308  EXPECT_EQ(0, getNumSubscriptions("/tmp/my/out"));
309  EXPECT_EQ(true, lazySub->isSubscribed());
310  EXPECT_EQ(false, lazySub->isLazy());
311 
312  {
313  auto outSub = pnh.subscribe<std_msgs::Header>("out", 10, outCb);
314 
315  spin(0.1);
316  EXPECT_EQ(1, numOutSubscribes);
317  EXPECT_EQ(1, numInReceived);
318  EXPECT_EQ(0, numOutReceived);
319  EXPECT_EQ(1, outPub.getNumSubscribers());
320  EXPECT_EQ(1, inPub.getNumSubscribers());
321  EXPECT_EQ(1, getNumSubscriptions("/in"));
322  EXPECT_EQ(1, getNumSubscriptions("/tmp/my/out"));
323  EXPECT_EQ(true, lazySub->isSubscribed());
324  EXPECT_EQ(false, lazySub->isLazy());
325 
326  inPub.publish(std_msgs::Header());
327 
328  spin(0.1);
329  EXPECT_EQ(1, numOutSubscribes);
330  EXPECT_EQ(2, numInReceived);
331  EXPECT_EQ(1, numOutReceived);
332  EXPECT_EQ(1, outPub.getNumSubscribers());
333  EXPECT_EQ(1, inPub.getNumSubscribers());
334  EXPECT_EQ(1, getNumSubscriptions("/in"));
335  EXPECT_EQ(1, getNumSubscriptions("/tmp/my/out"));
336  EXPECT_EQ(true, lazySub->isSubscribed());
337  EXPECT_EQ(false, lazySub->isLazy());
338  }
339 
340  spin(0.1);
341  EXPECT_EQ(1, numOutSubscribes);
342  EXPECT_EQ(2, numInReceived);
343  EXPECT_EQ(1, numOutReceived);
344  EXPECT_EQ(0, outPub.getNumSubscribers());
345  EXPECT_EQ(1, inPub.getNumSubscribers());
346  EXPECT_EQ(1, getNumSubscriptions("/in"));
347  EXPECT_EQ(0, getNumSubscriptions("/tmp/my/out"));
348  EXPECT_EQ(true, lazySub->isSubscribed());
349  EXPECT_EQ(false, lazySub->isLazy());
350 
351  lazySub->setLazy(true);
352 
353  spin(0.1);
354  EXPECT_EQ(1, numOutSubscribes);
355  EXPECT_EQ(2, numInReceived);
356  EXPECT_EQ(1, numOutReceived);
357  EXPECT_EQ(0, outPub.getNumSubscribers());
358  EXPECT_EQ(0, inPub.getNumSubscribers());
359  EXPECT_EQ(0, getNumSubscriptions("/in"));
360  EXPECT_EQ(0, getNumSubscriptions("/tmp/my/out"));
361  EXPECT_EQ(false, lazySub->isSubscribed());
362  EXPECT_EQ(true, lazySub->isLazy());
363 
364  {
365  auto outSub = pnh.subscribe<std_msgs::Header>("out", 10, outCb);
366 
367  spin(0.1);
368  EXPECT_EQ(2, numOutSubscribes);
369  EXPECT_EQ(2, numInReceived);
370  EXPECT_EQ(1, numOutReceived);
371  EXPECT_EQ(1, outPub.getNumSubscribers());
372  EXPECT_EQ(1, inPub.getNumSubscribers());
373  EXPECT_EQ(1, getNumSubscriptions("/in"));
374  EXPECT_EQ(1, getNumSubscriptions("/tmp/my/out"));
375  EXPECT_EQ(true, lazySub->isSubscribed());
376  EXPECT_EQ(true, lazySub->isLazy());
377 
378  inPub.publish(std_msgs::Header());
379 
380  spin(0.1);
381  EXPECT_EQ(2, numOutSubscribes);
382  EXPECT_EQ(3, numInReceived);
383  EXPECT_EQ(2, numOutReceived);
384  EXPECT_EQ(1, outPub.getNumSubscribers());
385  EXPECT_EQ(1, inPub.getNumSubscribers());
386  EXPECT_EQ(1, getNumSubscriptions("/in"));
387  EXPECT_EQ(1, getNumSubscriptions("/tmp/my/out"));
388  EXPECT_EQ(true, lazySub->isSubscribed());
389  EXPECT_EQ(true, lazySub->isLazy());
390  }
391 
392  spin(0.1);
393  EXPECT_EQ(2, numOutSubscribes);
394  EXPECT_EQ(3, numInReceived);
395  EXPECT_EQ(2, numOutReceived);
396  EXPECT_EQ(0, outPub.getNumSubscribers());
397  EXPECT_EQ(0, inPub.getNumSubscribers());
398  EXPECT_EQ(0, getNumSubscriptions("/in"));
399  EXPECT_EQ(0, getNumSubscriptions("/tmp/my/out"));
400  EXPECT_EQ(false, lazySub->isSubscribed());
401  EXPECT_EQ(true, lazySub->isLazy());
402 }
403 
404 TEST(LazySubscriberBase, Chain) // NOLINT
405 {
406  ros::NodeHandle nh;
407  ros::NodeHandle pnh({"tmp"}, "my");
408  auto outPub = pnh.advertise<std_msgs::Header>("out", 10);
409  auto inPub = nh.advertise<std_msgs::Header>("in", 10);
410  auto inter1Pub = nh.advertise<std_msgs::Header>("inter1", 10);
411  auto inter2Pub = nh.advertise<std_msgs::Header>("inter2", 10);
412 
413  size_t numInter1Subscribes = 0;
414  size_t numInter2Subscribes = 0;
415  size_t numOutSubscribes = 0;
416  size_t numInReceived = 0;
417  size_t numInter1Received = 0;
418  size_t numInter2Received = 0;
419  size_t numOutReceived = 0;
420  boost::function<void(const std_msgs::Header&)> relayToInter1Cb = [&](const std_msgs::Header& h)
421  {
422  ++numInReceived;
423  inter1Pub.publish(h);
424  };
425  boost::function<void(const std_msgs::Header&)> relayToInter2Cb = [&](const std_msgs::Header& h)
426  {
427  ++numInter1Received;
428  inter2Pub.publish(h);
429  };
430  boost::function<void(const std_msgs::Header&)> relayToOutCb = [&](const std_msgs::Header& h)
431  {
432  ++numInter2Received;
433  outPub.publish(h);
434  };
435  boost::function<void(const std_msgs::Header&)> outCb = [&](const std_msgs::Header& h)
436  {
437  ++numOutReceived;
438  };
439  TestLazySubscriber<std_msgs::Header> lazySub1(nh, inter1Pub.getTopic(), [&](ros::Subscriber& sub)
440  {
441  ++numInter1Subscribes;
442  sub = nh.subscribe<std_msgs::Header>("in", 10, relayToInter1Cb);
443  });
444  TestLazySubscriber<std_msgs::Header> lazySub2(nh, inter2Pub.getTopic(), [&](ros::Subscriber& sub)
445  {
446  ++numInter2Subscribes;
447  sub = nh.subscribe<std_msgs::Header>("inter1", 10, relayToInter2Cb);
448  });
449  TestLazySubscriber<std_msgs::Header> lazySub3(nh, outPub.getTopic(), [&](ros::Subscriber& sub)
450  {
451  ++numOutSubscribes;
452  sub = nh.subscribe<std_msgs::Header>("inter2", 10, relayToOutCb);
453  });
454 
455  spin(0.1);
456  EXPECT_EQ(0, numInter1Subscribes);
457  EXPECT_EQ(0, numInter2Subscribes);
458  EXPECT_EQ(0, numOutSubscribes);
459  EXPECT_EQ(0, numInReceived);
460  EXPECT_EQ(0, numInter1Received);
461  EXPECT_EQ(0, numInter2Received);
462  EXPECT_EQ(0, numOutReceived);
463  EXPECT_EQ(0, inPub.getNumSubscribers());
464  EXPECT_EQ(0, inter1Pub.getNumSubscribers());
465  EXPECT_EQ(0, inter2Pub.getNumSubscribers());
466  EXPECT_EQ(0, outPub.getNumSubscribers());
467  EXPECT_EQ(false, lazySub1.isSubscribed());
468  EXPECT_EQ(false, lazySub2.isSubscribed());
469  EXPECT_EQ(false, lazySub3.isSubscribed());
470 
471  {
472  auto outSub = pnh.subscribe<std_msgs::Header>("out", 10, outCb);
473 
474  spin(0.1);
475  EXPECT_EQ(1, numInter1Subscribes);
476  EXPECT_EQ(1, numInter2Subscribes);
477  EXPECT_EQ(1, numOutSubscribes);
478  EXPECT_EQ(0, numInReceived);
479  EXPECT_EQ(0, numInter1Received);
480  EXPECT_EQ(0, numInter2Received);
481  EXPECT_EQ(0, numOutReceived);
482  EXPECT_EQ(1, inPub.getNumSubscribers());
483  EXPECT_EQ(1, inter1Pub.getNumSubscribers());
484  EXPECT_EQ(1, inter2Pub.getNumSubscribers());
485  EXPECT_EQ(1, outPub.getNumSubscribers());
486  EXPECT_EQ(true, lazySub1.isSubscribed());
487  EXPECT_EQ(true, lazySub2.isSubscribed());
488  EXPECT_EQ(true, lazySub3.isSubscribed());
489 
490  inPub.publish(std_msgs::Header());
491 
492  spin(0.1);
493  EXPECT_EQ(1, numInter1Subscribes);
494  EXPECT_EQ(1, numInter2Subscribes);
495  EXPECT_EQ(1, numOutSubscribes);
496  EXPECT_EQ(1, numInReceived);
497  EXPECT_EQ(1, numInter1Received);
498  EXPECT_EQ(1, numInter2Received);
499  EXPECT_EQ(1, numOutReceived);
500  EXPECT_EQ(1, inPub.getNumSubscribers());
501  EXPECT_EQ(1, inter1Pub.getNumSubscribers());
502  EXPECT_EQ(1, inter2Pub.getNumSubscribers());
503  EXPECT_EQ(1, outPub.getNumSubscribers());
504  EXPECT_EQ(true, lazySub1.isSubscribed());
505  EXPECT_EQ(true, lazySub2.isSubscribed());
506  EXPECT_EQ(true, lazySub3.isSubscribed());
507 
508  inPub.publish(std_msgs::Header());
509 
510  spin(0.1);
511  EXPECT_EQ(1, numInter1Subscribes);
512  EXPECT_EQ(1, numInter2Subscribes);
513  EXPECT_EQ(1, numOutSubscribes);
514  EXPECT_EQ(2, numInReceived);
515  EXPECT_EQ(2, numInter1Received);
516  EXPECT_EQ(2, numInter2Received);
517  EXPECT_EQ(2, numOutReceived);
518  EXPECT_EQ(1, inPub.getNumSubscribers());
519  EXPECT_EQ(1, inter1Pub.getNumSubscribers());
520  EXPECT_EQ(1, inter2Pub.getNumSubscribers());
521  EXPECT_EQ(1, outPub.getNumSubscribers());
522  EXPECT_EQ(true, lazySub1.isSubscribed());
523  EXPECT_EQ(true, lazySub2.isSubscribed());
524  EXPECT_EQ(true, lazySub3.isSubscribed());
525  }
526 
527  spin(0.1);
528  EXPECT_EQ(1, numInter1Subscribes);
529  EXPECT_EQ(1, numInter2Subscribes);
530  EXPECT_EQ(1, numOutSubscribes);
531  EXPECT_EQ(2, numInReceived);
532  EXPECT_EQ(2, numInter1Received);
533  EXPECT_EQ(2, numInter2Received);
534  EXPECT_EQ(2, numOutReceived);
535  EXPECT_EQ(0, inPub.getNumSubscribers());
536  EXPECT_EQ(0, inter1Pub.getNumSubscribers());
537  EXPECT_EQ(0, inter2Pub.getNumSubscribers());
538  EXPECT_EQ(0, outPub.getNumSubscribers());
539  EXPECT_EQ(false, lazySub1.isSubscribed());
540  EXPECT_EQ(false, lazySub2.isSubscribed());
541  EXPECT_EQ(false, lazySub3.isSubscribed());
542 
543  inPub.publish(std_msgs::Header());
544 
545  spin(0.1);
546  EXPECT_EQ(1, numInter1Subscribes);
547  EXPECT_EQ(1, numInter2Subscribes);
548  EXPECT_EQ(1, numOutSubscribes);
549  EXPECT_EQ(2, numInReceived);
550  EXPECT_EQ(2, numInter1Received);
551  EXPECT_EQ(2, numInter2Received);
552  EXPECT_EQ(2, numOutReceived);
553  EXPECT_EQ(0, inPub.getNumSubscribers());
554  EXPECT_EQ(0, inter1Pub.getNumSubscribers());
555  EXPECT_EQ(0, inter2Pub.getNumSubscribers());
556  EXPECT_EQ(0, outPub.getNumSubscribers());
557  EXPECT_EQ(false, lazySub1.isSubscribed());
558  EXPECT_EQ(false, lazySub2.isSubscribed());
559  EXPECT_EQ(false, lazySub3.isSubscribed());
560 }
561 
562 TEST(LazySubscriber, Test) // NOLINT
563 {
564  ros::NodeHandle nh;
565  ros::NodeHandle pnh({"tmp"}, "my");
566  auto outPub = pnh.advertise<std_msgs::Header>("out", 10);
567  auto inPub = nh.advertise<std_msgs::Header>("in", 10);
568 
569  size_t numInReceived = 0;
570  size_t numOutReceived = 0;
571  boost::function<void(const std_msgs::Header&)> relayCb = [&](const std_msgs::Header& h)
572  {
573  ++numInReceived;
574  outPub.publish(h);
575  };
576  boost::function<void(const std_msgs::Header&)> outCb = [&](const std_msgs::Header& h)
577  {
578  ++numOutReceived;
579  };
580 
581  auto lazySub = std::make_unique<cras::LazySubscriber<std_msgs::Header>>(
582  pnh, outPub.getTopic(), nh, "in", 10, relayCb);
583 
584  spin(0.1);
585  EXPECT_EQ(0, numInReceived);
586  EXPECT_EQ(0, outPub.getNumSubscribers());
587  EXPECT_EQ(0, inPub.getNumSubscribers());
588  EXPECT_EQ(0, getNumSubscriptions("/in"));
589  EXPECT_EQ(false, lazySub->isSubscribed());
590 
591  {
592  auto outSub = pnh.subscribe<std_msgs::Header>("out", 10, outCb);
593 
594  spin(0.1);
595  EXPECT_EQ(0, numInReceived);
596  EXPECT_EQ(0, numOutReceived);
597  EXPECT_EQ(1, outPub.getNumSubscribers());
598  EXPECT_EQ(1, inPub.getNumSubscribers());
599  EXPECT_EQ(1, getNumSubscriptions("/in"));
600  EXPECT_EQ(1, getNumSubscriptions("/tmp/my/out"));
601  EXPECT_EQ(true, lazySub->isSubscribed());
602 
603  inPub.publish(std_msgs::Header());
604 
605  spin(0.1);
606  EXPECT_EQ(1, numInReceived);
607  EXPECT_EQ(1, numOutReceived);
608  EXPECT_EQ(1, outPub.getNumSubscribers());
609  EXPECT_EQ(1, inPub.getNumSubscribers());
610  EXPECT_EQ(1, getNumSubscriptions("/in"));
611  EXPECT_EQ(1, getNumSubscriptions("/tmp/my/out"));
612  EXPECT_EQ(true, lazySub->isSubscribed());
613  }
614 
615  spin(0.1);
616  EXPECT_EQ(1, numInReceived);
617  EXPECT_EQ(1, numOutReceived);
618  EXPECT_EQ(0, outPub.getNumSubscribers());
619  EXPECT_EQ(0, inPub.getNumSubscribers());
620  EXPECT_EQ(0, getNumSubscriptions("/in"));
621  EXPECT_EQ(0, getNumSubscriptions("/tmp/my/out"));
622  EXPECT_EQ(false, lazySub->isSubscribed());
623 
624  inPub.publish(std_msgs::Header());
625 
626  spin(0.1);
627  EXPECT_EQ(1, numInReceived);
628  EXPECT_EQ(1, numOutReceived);
629  EXPECT_EQ(0, outPub.getNumSubscribers());
630  EXPECT_EQ(0, inPub.getNumSubscribers());
631  EXPECT_EQ(0, getNumSubscriptions("/in"));
632  EXPECT_EQ(0, getNumSubscriptions("/tmp/my/out"));
633  EXPECT_EQ(false, lazySub->isSubscribed());
634 
635  {
636  auto outSub = pnh.subscribe<std_msgs::Header>("out", 10, outCb);
637  auto outSub2 = pnh.subscribe<std_msgs::Header>("out", 10, outCb);
638 
639  spin(0.1);
640  EXPECT_EQ(1, numInReceived);
641  EXPECT_EQ(1, numOutReceived);
642  EXPECT_EQ(1, outPub.getNumSubscribers());
643  EXPECT_EQ(1, inPub.getNumSubscribers());
644  EXPECT_EQ(1, getNumSubscriptions("/in"));
645  EXPECT_EQ(2, getNumSubscriptions("/tmp/my/out"));
646  EXPECT_EQ(true, lazySub->isSubscribed());
647 
648  inPub.publish(std_msgs::Header());
649  inPub.publish(std_msgs::Header());
650 
651  spin(0.1);
652  EXPECT_EQ(3, numInReceived);
653  EXPECT_EQ(5, numOutReceived);
654  EXPECT_EQ(1, outPub.getNumSubscribers());
655  EXPECT_EQ(1, inPub.getNumSubscribers());
656  EXPECT_EQ(1, getNumSubscriptions("/in"));
657  EXPECT_EQ(2, getNumSubscriptions("/tmp/my/out"));
658  EXPECT_EQ(true, lazySub->isSubscribed());
659  }
660 
661  spin(0.1);
662  EXPECT_EQ(3, numInReceived);
663  EXPECT_EQ(5, numOutReceived);
664  EXPECT_EQ(0, outPub.getNumSubscribers());
665  EXPECT_EQ(0, inPub.getNumSubscribers());
666  EXPECT_EQ(0, getNumSubscriptions("/in"));
667  EXPECT_EQ(0, getNumSubscriptions("/tmp/my/out"));
668  EXPECT_EQ(false, lazySub->isSubscribed());
669 
670  inPub.publish(std_msgs::Header());
671  inPub.publish(std_msgs::Header());
672  inPub.publish(std_msgs::Header());
673 
674  spin(0.1);
675  EXPECT_EQ(3, numInReceived);
676  EXPECT_EQ(5, numOutReceived);
677  EXPECT_EQ(0, outPub.getNumSubscribers());
678  EXPECT_EQ(0, inPub.getNumSubscribers());
679  EXPECT_EQ(0, getNumSubscriptions("/in"));
680  EXPECT_EQ(0, getNumSubscriptions("/tmp/my/out"));
681  EXPECT_EQ(false, lazySub->isSubscribed());
682 
683  {
684  auto outSub = pnh.subscribe<std_msgs::Header>("out", 10, outCb);
685 
686  spin(0.1);
687  EXPECT_EQ(3, numInReceived);
688  EXPECT_EQ(5, numOutReceived);
689  EXPECT_EQ(1, outPub.getNumSubscribers());
690  EXPECT_EQ(1, inPub.getNumSubscribers());
691  EXPECT_EQ(1, getNumSubscriptions("/in"));
692  EXPECT_EQ(1, getNumSubscriptions("/tmp/my/out"));
693  EXPECT_EQ(true, lazySub->isSubscribed());
694 
695  // Delete the pubsub object - it should disconnect the subscriber.
696  lazySub.reset();
697 
698  spin(0.1);
699  EXPECT_EQ(3, numInReceived);
700  EXPECT_EQ(5, numOutReceived);
701  EXPECT_EQ(1, outPub.getNumSubscribers());
702  EXPECT_EQ(0, inPub.getNumSubscribers());
703  EXPECT_EQ(0, getNumSubscriptions("/in"));
704  EXPECT_EQ(1, getNumSubscriptions("/tmp/my/out"));
705  }
706 }
707 
708 TEST(LazySubscriber, MessageEvent) // NOLINT
709 {
710  ros::NodeHandle nh;
711  ros::NodeHandle pnh({"tmp"}, "my");
712  auto outPub = pnh.advertise<std_msgs::Header>("out", 10);
713  auto inPub = nh.advertise<std_msgs::Header>("in", 10);
714 
715  size_t numInReceived = 0;
716  size_t numOutReceived = 0;
717  boost::function<void(const ros::MessageEvent<const topic_tools::ShapeShifter>&)> relayCb =
719  {
720  ++numInReceived;
721  outPub.publish(*h.getConstMessage());
722  };
723  boost::function<void(const std_msgs::Header&)> outCb = [&](const std_msgs::Header& h)
724  {
725  ++numOutReceived;
726  };
727 
728  auto lazySub = std::make_unique<
730  pnh, outPub.getTopic(), nh, "in", 10, relayCb);
731 
732  spin(0.1);
733  EXPECT_EQ(0, numInReceived);
734  EXPECT_EQ(0, outPub.getNumSubscribers());
735  EXPECT_EQ(0, inPub.getNumSubscribers());
736  EXPECT_EQ(0, getNumSubscriptions("/in"));
737  EXPECT_EQ(false, lazySub->isSubscribed());
738 
739  {
740  auto outSub = pnh.subscribe<std_msgs::Header>("out", 10, outCb);
741 
742  spin(0.1);
743  EXPECT_EQ(0, numInReceived);
744  EXPECT_EQ(0, numOutReceived);
745  EXPECT_EQ(1, outPub.getNumSubscribers());
746  EXPECT_EQ(1, inPub.getNumSubscribers());
747  EXPECT_EQ(1, getNumSubscriptions("/in"));
748  EXPECT_EQ(1, getNumSubscriptions("/tmp/my/out"));
749  EXPECT_EQ(true, lazySub->isSubscribed());
750 
751  inPub.publish(std_msgs::Header());
752 
753  spin(0.1);
754  EXPECT_EQ(1, numInReceived);
755  EXPECT_EQ(1, numOutReceived);
756  EXPECT_EQ(1, outPub.getNumSubscribers());
757  EXPECT_EQ(1, inPub.getNumSubscribers());
758  EXPECT_EQ(1, getNumSubscriptions("/in"));
759  EXPECT_EQ(1, getNumSubscriptions("/tmp/my/out"));
760  EXPECT_EQ(true, lazySub->isSubscribed());
761  }
762 
763  spin(0.1);
764  EXPECT_EQ(1, numInReceived);
765  EXPECT_EQ(1, numOutReceived);
766  EXPECT_EQ(0, outPub.getNumSubscribers());
767  EXPECT_EQ(0, inPub.getNumSubscribers());
768  EXPECT_EQ(0, getNumSubscriptions("/in"));
769  EXPECT_EQ(0, getNumSubscriptions("/tmp/my/out"));
770  EXPECT_EQ(false, lazySub->isSubscribed());
771 
772  inPub.publish(std_msgs::Header());
773 
774  spin(0.1);
775  EXPECT_EQ(1, numInReceived);
776  EXPECT_EQ(1, numOutReceived);
777  EXPECT_EQ(0, outPub.getNumSubscribers());
778  EXPECT_EQ(0, inPub.getNumSubscribers());
779  EXPECT_EQ(0, getNumSubscriptions("/in"));
780  EXPECT_EQ(0, getNumSubscriptions("/tmp/my/out"));
781  EXPECT_EQ(false, lazySub->isSubscribed());
782 
783  {
784  auto outSub = pnh.subscribe<std_msgs::Header>("out", 10, outCb);
785  auto outSub2 = pnh.subscribe<std_msgs::Header>("out", 10, outCb);
786 
787  spin(0.1);
788  EXPECT_EQ(1, numInReceived);
789  EXPECT_EQ(1, numOutReceived);
790  EXPECT_EQ(1, outPub.getNumSubscribers());
791  EXPECT_EQ(1, inPub.getNumSubscribers());
792  EXPECT_EQ(1, getNumSubscriptions("/in"));
793  EXPECT_EQ(2, getNumSubscriptions("/tmp/my/out"));
794  EXPECT_EQ(true, lazySub->isSubscribed());
795 
796  inPub.publish(std_msgs::Header());
797  inPub.publish(std_msgs::Header());
798 
799  spin(0.1);
800  EXPECT_EQ(3, numInReceived);
801  EXPECT_EQ(5, numOutReceived);
802  EXPECT_EQ(1, outPub.getNumSubscribers());
803  EXPECT_EQ(1, inPub.getNumSubscribers());
804  EXPECT_EQ(1, getNumSubscriptions("/in"));
805  EXPECT_EQ(2, getNumSubscriptions("/tmp/my/out"));
806  EXPECT_EQ(true, lazySub->isSubscribed());
807  }
808 
809  spin(0.1);
810  EXPECT_EQ(3, numInReceived);
811  EXPECT_EQ(5, numOutReceived);
812  EXPECT_EQ(0, outPub.getNumSubscribers());
813  EXPECT_EQ(0, inPub.getNumSubscribers());
814  EXPECT_EQ(0, getNumSubscriptions("/in"));
815  EXPECT_EQ(0, getNumSubscriptions("/tmp/my/out"));
816  EXPECT_EQ(false, lazySub->isSubscribed());
817 
818  inPub.publish(std_msgs::Header());
819  inPub.publish(std_msgs::Header());
820  inPub.publish(std_msgs::Header());
821 
822  spin(0.1);
823  EXPECT_EQ(3, numInReceived);
824  EXPECT_EQ(5, numOutReceived);
825  EXPECT_EQ(0, outPub.getNumSubscribers());
826  EXPECT_EQ(0, inPub.getNumSubscribers());
827  EXPECT_EQ(0, getNumSubscriptions("/in"));
828  EXPECT_EQ(0, getNumSubscriptions("/tmp/my/out"));
829  EXPECT_EQ(false, lazySub->isSubscribed());
830 
831  {
832  auto outSub = pnh.subscribe<std_msgs::Header>("out", 10, outCb);
833 
834  spin(0.1);
835  EXPECT_EQ(3, numInReceived);
836  EXPECT_EQ(5, numOutReceived);
837  EXPECT_EQ(1, outPub.getNumSubscribers());
838  EXPECT_EQ(1, inPub.getNumSubscribers());
839  EXPECT_EQ(1, getNumSubscriptions("/in"));
840  EXPECT_EQ(1, getNumSubscriptions("/tmp/my/out"));
841  EXPECT_EQ(true, lazySub->isSubscribed());
842 
843  // Delete the pubsub object - it should disconnect the subscriber.
844  lazySub.reset();
845 
846  spin(0.1);
847  EXPECT_EQ(3, numInReceived);
848  EXPECT_EQ(5, numOutReceived);
849  EXPECT_EQ(1, outPub.getNumSubscribers());
850  EXPECT_EQ(0, inPub.getNumSubscribers());
851  EXPECT_EQ(0, getNumSubscriptions("/in"));
852  EXPECT_EQ(1, getNumSubscriptions("/tmp/my/out"));
853  }
854 }
855 
856 int main(int argc, char **argv)
857 {
858  testing::InitGoogleTest(&argc, argv);
859  ros::init(argc, argv, "test_lazy_subscriber");
862  // This prevents the ROS background threads from shutting down after the first test's nodehandles disappear
863  ros::NodeHandle nh;
864  return RUN_ALL_TESTS();
865 }
getNumSubscriptions
size_t getNumSubscriptions(const std::string &topic)
Definition: test_lazy_subscriber.cpp:43
ros::WallDuration::sleep
bool sleep() const
topic_manager.h
ros::init
ROSCPP_DECL void init(const M_string &remappings, const std::string &name, uint32_t options=0)
ros.h
cras::ConditionalSubscriber::disconnectFn
DisconnectFn disconnectFn
The function used to close connection.
Definition: lazy_subscriber.hpp:138
ros::spinOnce
ROSCPP_DECL void spinOnce()
cras::ConditionalSubscriber::ConnectFn
::std::function< void(::ros::Subscriber &sub)> ConnectFn
Type of the function that connects the subscriber.
Definition: lazy_subscriber.hpp:38
ros::Subscriber::shutdown
void shutdown()
lazy_subscriber.hpp
Lazy subscriber that subscribes only when a paired publisher has subscribers.
wait
void wait(int seconds)
subscription.h
ros::console::set_logger_level
ROSCONSOLE_DECL bool set_logger_level(const std::string &name, levels::Level level)
ros::Publisher::publish
void publish(const boost::shared_ptr< M > &message) const
ros::NodeHandle::advertise
Publisher advertise(AdvertiseOptions &ops)
cras::ConditionalSubscriber::sub
::ros::Subscriber sub
The underlying subscriber (valid only when subscribed is true).
Definition: lazy_subscriber.hpp:126
TestLazySubscriber::TestLazySubscriber
TestLazySubscriber(ros::NodeHandle publisherNh, const std::string &publisherTopic, typename cras::ConditionalSubscriber::ConnectFn connectFn, typename cras::ConditionalSubscriber::DisconnectFn disconnectFn=[](ros::Subscriber &sub) { sub.shutdown();}, const cras::LogHelperPtr &logHelper=std::make_shared< cras::NodeLogHelper >())
Definition: test_lazy_subscriber.cpp:61
cras::ConditionalSubscriber::isSubscribed
bool isSubscribed() const
Whether the subscriber is currently subscribed to its topic or not.
Definition: lazy_subscriber.cpp:61
ros::console::levels::Debug
Debug
spin
void spin(double duration, double wait)
Definition: test_lazy_subscriber.cpp:27
main
int main(int argc, char **argv)
Definition: test_lazy_subscriber.cpp:856
ros::NodeHandle::subscribe
Subscriber subscribe(const std::string &topic, uint32_t queue_size, const boost::function< void(C)> &callback, const VoidConstPtr &tracked_object=VoidConstPtr(), const TransportHints &transport_hints=TransportHints())
cras::ConditionalSubscriber::DisconnectFn
::std::function< void(::ros::Subscriber &sub)> DisconnectFn
Type of the function that disconnects the subscriber.
Definition: lazy_subscriber.hpp:42
cras::LazySubscriber
Lazy subscriber that subscribes only when a paired publisher has subscribers.
Definition: lazy_subscriber.hpp:197
cras::LogHelperPtr
::cras::LogHelper::Ptr LogHelperPtr
ros::console::notifyLoggerLevelsChanged
ROSCONSOLE_DECL void notifyLoggerLevelsChanged()
ros::TopicManager::instance
static const TopicManagerPtr & instance()
cras::LazySubscriberBase
Base for lazy subscribers that subscribes only when a paired publisher has subscribers.
Definition: lazy_subscriber.hpp:149
TEST
TEST(LazySubscriberBase, Test)
Definition: test_lazy_subscriber.cpp:72
ROSCONSOLE_DEFAULT_NAME
#define ROSCONSOLE_DEFAULT_NAME
ros::WallDuration
TestLazySubscriber
Definition: test_lazy_subscriber.cpp:58
ros::NodeHandle
ros::MessageEvent
ros::Subscriber
cras::ConditionalSubscriber::connectFn
ConnectFn connectFn
The function used to establish connection.
Definition: lazy_subscriber.hpp:135


cras_topic_tools
Author(s): Martin Pecka
autogenerated on Sun Mar 2 2025 03:51:09