test_publish_subscribe.cpp
Go to the documentation of this file.
1 /****************************************************************************
2  *
3  * fkie_message_filters
4  * Copyright © 2018-2020 Fraunhofer FKIE
5  * Author: Timo Röhling
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  * http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  ****************************************************************************/
20 
21 #include <ros/ros.h>
22 #include <gtest/gtest.h>
23 #include <std_msgs/Empty.h>
28 
29 namespace mf = fkie_message_filters;
30 
31 namespace {
32  void process_pending_events()
33  {
34  using namespace std::chrono_literals;
35  std::this_thread::sleep_for(50ms); // Apparently, we need to allow for some time to complete the message round trip
36  ros::spinOnce();
37  }
38 }
39 
40 TEST(fkie_message_filters_ros_integration, PublisherMessage)
41 {
42  std::size_t received_msgs = 0;
43  ros::NodeHandle nh("~");
44  ros::Subscriber sub = nh.subscribe<std_msgs::Empty>("publisher_test", 1, [&received_msgs](const std_msgs::EmptyConstPtr&) { ++received_msgs; });
47  src.connect_to_sink(pub);
48  ASSERT_EQ(0, sub.getNumPublishers());
49  pub.advertise(nh, "publisher_test", 1);
50  process_pending_events();
51  ASSERT_EQ(1, sub.getNumPublishers());
52  ASSERT_EQ(0, received_msgs);
53  src(std_msgs::Empty());
54  process_pending_events();
55  ASSERT_EQ(1, received_msgs);
56 }
57 
58 TEST(fkie_message_filters_ros_integration, PublisherMessageConstPtr)
59 {
60  std::size_t received_msgs = 0;
61  ros::NodeHandle nh("~");
62  ros::Subscriber sub = nh.subscribe<std_msgs::Empty>("publisher_test", 1, [&received_msgs](const std_msgs::EmptyConstPtr&) { ++received_msgs; });
65  src.connect_to_sink(pub);
66  ASSERT_EQ(0, sub.getNumPublishers());
67  pub.advertise(nh, "publisher_test", 1);
68  process_pending_events();
69  ASSERT_EQ(1, sub.getNumPublishers());
70  ASSERT_EQ(0, received_msgs);
71  src(std_msgs::Empty::ConstPtr(new std_msgs::Empty()));
72  process_pending_events();
73  ASSERT_EQ(1, received_msgs);
74 }
75 
76 TEST(fkie_message_filters_ros_integration, PublisherMessageStdSharedPtr)
77 {
78  std::size_t received_msgs = 0;
79  ros::NodeHandle nh("~");
80  ros::Subscriber sub = nh.subscribe<std_msgs::Empty>("publisher_test", 1, [&received_msgs](const std_msgs::EmptyConstPtr&) { ++received_msgs; });
83  src.connect_to_sink(pub);
84  ASSERT_EQ(0, sub.getNumPublishers());
85  pub.advertise(nh, "publisher_test", 1);
86  process_pending_events();
87  ASSERT_EQ(1, sub.getNumPublishers());
88  ASSERT_EQ(0, received_msgs);
89  src(std::shared_ptr<const std_msgs::Empty>(new std_msgs::Empty()));
90  process_pending_events();
91  ASSERT_EQ(1, received_msgs);
92 }
93 
94 TEST(fkie_message_filters_ros_integration, PublisherMessageEvent)
95 {
96  std::size_t received_msgs = 0;
97  ros::NodeHandle nh("~");
98  ros::Subscriber sub = nh.subscribe<std_msgs::Empty>("publisher_test", 1, [&received_msgs](const std_msgs::EmptyConstPtr&) { ++received_msgs; });
101  src.connect_to_sink(pub);
102  ASSERT_EQ(0, sub.getNumPublishers());
103  pub.advertise(nh, "publisher_test", 1);
104  process_pending_events();
105  ASSERT_EQ(1, sub.getNumPublishers());
106  ASSERT_EQ(0, received_msgs);
107  src(ros::MessageEvent<const std_msgs::Empty>(std_msgs::Empty::ConstPtr(new std_msgs::Empty())));
108  process_pending_events();
109  ASSERT_EQ(1, received_msgs);
110 }
111 
112 TEST(fkie_message_filters_ros_integration, PublisherWithSubscriberCB)
113 {
114  std::size_t received_msgs = 0, connected_cbs = 0, disconnected_cbs = 0;
115  ros::NodeHandle nh("~");
118  src.connect_to_sink(pub);
119  pub.advertise(nh, "publisher_test", 1,
120  [&connected_cbs](const ros::SingleSubscriberPublisher&) { ++connected_cbs; },
121  [&disconnected_cbs](const ros::SingleSubscriberPublisher&) { ++disconnected_cbs; }
122  );
123  process_pending_events();
124  ASSERT_EQ(false, pub.is_active());
125  ASSERT_EQ(0, connected_cbs);
126  ASSERT_EQ(0, disconnected_cbs);
127  ros::Subscriber sub = nh.subscribe<std_msgs::Empty>("publisher_test", 1, [&received_msgs](const std_msgs::EmptyConstPtr&) { ++received_msgs; });
128  process_pending_events();
129  ASSERT_EQ(1, sub.getNumPublishers());
130  ASSERT_EQ(0, received_msgs);
131  ASSERT_EQ(1, connected_cbs);
132  ASSERT_EQ(0, disconnected_cbs);
133  src(std_msgs::Empty());
134  process_pending_events();
135  ASSERT_EQ(1, received_msgs);
136  sub.shutdown();
137  process_pending_events();
138  ASSERT_EQ(1, connected_cbs);
139  ASSERT_EQ(1, disconnected_cbs);
140 }
141 
142 TEST(fkie_message_filters_ros_integration, SubscriberMessageEvent)
143 {
144  std::size_t received_msgs = 0;
145  ros::NodeHandle nh("~");
146  ros::Publisher pub = nh.advertise<std_msgs::Empty>("subscriber_test", 1);
149  sub.connect_to_sink(flt);
150  flt.set_processing_function([&received_msgs](const ros::MessageEvent<const std_msgs::Empty>&) -> bool { ++received_msgs; return true; });
151  ASSERT_EQ(0, pub.getNumSubscribers());
152  sub.subscribe(nh, "subscriber_test", 1);
153  process_pending_events();
154  ASSERT_EQ(1, pub.getNumSubscribers());
155  ASSERT_EQ(0, received_msgs);
156  pub.publish<std_msgs::Empty>(std_msgs::Empty());
157  process_pending_events();
158  ASSERT_EQ(1, received_msgs);
159 }
160 
161 TEST(fkie_message_filters_ros_integration, SubscriberMessageConstPtr)
162 {
163  std::size_t received_msgs = 0;
164  ros::NodeHandle nh("~");
165  ros::Publisher pub = nh.advertise<std_msgs::Empty>("subscriber_test", 1);
168  sub.connect_to_sink(flt);
169  flt.set_processing_function([&received_msgs](const std_msgs::Empty::ConstPtr&) -> bool { ++received_msgs; return true; });
170  ASSERT_EQ(0, pub.getNumSubscribers());
171  sub.subscribe(nh, "subscriber_test", 1);
172  process_pending_events();
173  ASSERT_EQ(1, pub.getNumSubscribers());
174  ASSERT_EQ(0, received_msgs);
175  pub.publish<std_msgs::Empty>(std_msgs::Empty());
176  process_pending_events();
177  ASSERT_EQ(1, received_msgs);
178 }
179 
180 TEST(fkie_message_filters_ros_integration, SubscriberMessageStdSharedPtr)
181 {
182  std::size_t received_msgs = 0;
183  ros::NodeHandle nh("~");
184  ros::Publisher pub = nh.advertise<std_msgs::Empty>("subscriber_test", 1);
187  sub.connect_to_sink(flt);
188  flt.set_processing_function([&received_msgs](const std::shared_ptr<const std_msgs::Empty>&) -> bool { ++received_msgs; return true; });
189  ASSERT_EQ(0, pub.getNumSubscribers());
190  sub.subscribe(nh, "subscriber_test", 1);
191  process_pending_events();
192  ASSERT_EQ(1, pub.getNumSubscribers());
193  ASSERT_EQ(0, received_msgs);
194  pub.publish<std_msgs::Empty>(std_msgs::Empty());
195  process_pending_events();
196  ASSERT_EQ(1, received_msgs);
197 }
198 
199 TEST(fkie_message_filters_ros_integration, SubscriberMessage)
200 {
201  std::size_t received_msgs = 0;
202  ros::NodeHandle nh("~");
203  ros::Publisher pub = nh.advertise<std_msgs::Empty>("subscriber_test", 1);
206  sub.connect_to_sink(flt);
207  flt.set_processing_function([&received_msgs](const std_msgs::Empty&) -> bool { ++received_msgs; return true; });
208  ASSERT_EQ(0, pub.getNumSubscribers());
209  sub.subscribe(nh, "subscriber_test", 1);
210  process_pending_events();
211  ASSERT_EQ(1, pub.getNumSubscribers());
212  ASSERT_EQ(0, received_msgs);
213  pub.publish<std_msgs::Empty>(std_msgs::Empty());
214  process_pending_events();
215  ASSERT_EQ(1, received_msgs);
216 }
217 
218 int main (int argc, char** argv)
219 {
220  testing::InitGoogleTest(&argc, argv);
221  ros::init (argc, argv, "test_publish_subscribe");
222  ros::NodeHandle nh;
223  return RUN_ALL_TESTS();
224 }
Subscriber subscribe(const std::string &topic, uint32_t queue_size, void(T::*fp)(M), T *obj, const TransportHints &transport_hints=TransportHints())
void set_processing_function(const ProcessingFunction &f) noexcept
Set the user-defined processing function.
ROSCPP_DECL void init(int &argc, char **argv, const std::string &name, uint32_t options=0)
uint32_t getNumPublishers() const
Publish consumed data on a ROS topic.
Definition: publisher.h:51
Subscribe to a ROS topic as data provider.
Definition: subscriber.h:54
void advertise(ros::NodeHandle &nh, const std::string &topic, uint32_t queue_size, bool latch=false, ros::CallbackQueueInterface *callback_queue=nullptr) noexcept
Advertise ROS topic.
int main(int argc, char **argv)
void publish(const boost::shared_ptr< M > &message) const
Connection connect_to_sink(Sink< Outputs... > &dst) noexcept
Connect this source to a sink.
Definition: source_impl.h:30
TEST(fkie_message_filters_ros_integration, PublisherMessage)
Primary namespace.
Definition: buffer.h:33
Publisher advertise(const std::string &topic, uint32_t queue_size, bool latch=false)
Manually operated data source.
Definition: user_source.h:37
uint32_t getNumSubscribers() const
Simplified filter with user-defined callback function.
ROSCPP_DECL void spinOnce()
virtual bool is_active() const noexcept override
Check if the ROS publisher has at least one subscriber.


fkie_message_filters
Author(s): Timo Röhling
autogenerated on Mon Feb 28 2022 22:21:43