Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029 #ifndef SWRI_ROSCPP_SUBSCRIBER_IMPL_H_
00030 #define SWRI_ROSCPP_SUBSCRIBER_IMPL_H_
00031
00032 #include <std_msgs/Header.h>
00033 #include <ros/subscriber.h>
00034
00035 namespace swri
00036 {
00037 class Subscriber;
00038 class SubscriberImpl
00039 {
00040 protected:
00041 ros::Subscriber sub_;
00042 std::string unmapped_topic_;
00043 std::string mapped_topic_;
00044
00045 int message_count_;
00046
00047 ros::Time last_header_stamp_;
00048 ros::Time last_receive_time_;
00049
00050 ros::Duration total_latency_;
00051 ros::Duration min_latency_;
00052 ros::Duration max_latency_;
00053
00054 ros::Duration total_periods_;
00055 ros::Duration min_period_;
00056 ros::Duration max_period_;
00057
00058 ros::Duration timeout_;
00059 bool in_timeout_;
00060 int timeout_count_;
00061 bool blocking_timeout_;
00062
00063 void processHeader(const ros::Time &stamp)
00064 {
00065 ros::Time now = ros::Time::now();
00066
00067
00068
00069 checkTimeout(now);
00070
00071 message_count_++;
00072
00073 ros::Duration latency = now - stamp;
00074 if (message_count_ == 1) {
00075 min_latency_ = latency;
00076 max_latency_ = latency;
00077 total_latency_ = latency;
00078 } else {
00079 min_latency_ = std::min(min_latency_, latency);
00080 max_latency_ = std::max(max_latency_, latency);
00081 total_latency_ += latency;
00082 }
00083
00084 if (message_count_ > 1) {
00085 ros::Duration period = now - last_receive_time_;
00086 if (message_count_ == 2) {
00087 min_period_ = period;
00088 max_period_ = period;
00089 total_periods_ = period;
00090 } else if (message_count_ > 2) {
00091 min_period_ = std::min(min_period_, period);
00092 max_period_ = std::max(max_period_, period);
00093 total_periods_ += period;
00094 }
00095 }
00096
00097
00098 in_timeout_ = false;
00099
00100 last_receive_time_ = now;
00101 last_header_stamp_ = stamp;
00102 }
00103
00104 void checkTimeout(const ros::Time &now)
00105 {
00106 if (blocking_timeout_) {
00107 return;
00108 }
00109
00110 if (in_timeout_ || timeout_ <= ros::Duration(0.0)) {
00111 return;
00112 }
00113
00114 if (message_count_ == 0) {
00115 return;
00116 }
00117
00118 if (age(now) > timeout_) {
00119 in_timeout_ = true;
00120 timeout_count_++;
00121 }
00122 }
00123
00124
00125 public:
00126 SubscriberImpl()
00127 {
00128 unmapped_topic_ = "N/A";
00129 mapped_topic_ = "N/A";
00130 timeout_ = ros::Duration(-1.0);
00131 blocking_timeout_ = false;
00132 resetStatistics();
00133 }
00134
00135 const std::string& mappedTopic() const
00136 {
00137 return mapped_topic_;
00138 }
00139
00140 const std::string& unmappedTopic() const
00141 {
00142 return unmapped_topic_;
00143 }
00144
00145 int numPublishers() const
00146 {
00147 return sub_.getNumPublishers();
00148 }
00149
00150 void resetStatistics()
00151 {
00152 message_count_ = 0;
00153 in_timeout_ = false;
00154 timeout_count_ = 0;
00155 }
00156
00157 int messageCount() const
00158 {
00159 return message_count_;
00160 }
00161
00162 ros::Duration age(const ros::Time &now) const
00163 {
00164 if (message_count_ < 1) {
00165 return ros::DURATION_MAX;
00166 } else {
00167 return now - last_header_stamp_;
00168 }
00169 }
00170
00171 ros::Duration meanLatency() const
00172 {
00173 if (message_count_ < 1) {
00174 return ros::DURATION_MAX;
00175 } else {
00176 return ros::Duration(total_latency_.toSec() / message_count_);
00177 }
00178 }
00179
00180 ros::Duration minLatency() const
00181 {
00182 if (message_count_ < 1) {
00183 return ros::DURATION_MAX;
00184 } else {
00185 return min_latency_;
00186 }
00187 }
00188
00189 ros::Duration maxLatency() const
00190 {
00191 if (message_count_ < 1) {
00192 return ros::DURATION_MAX;
00193 } else {
00194 return max_latency_;
00195 }
00196 }
00197
00198 double meanFrequencyHz() const
00199 {
00200 if (message_count_ < 2) {
00201 return 0.0;
00202 } else {
00203 return 1e9 / meanPeriod().toNSec();
00204 }
00205 }
00206
00207 ros::Duration meanPeriod() const
00208 {
00209 if (message_count_ < 2) {
00210 return ros::DURATION_MAX;
00211 } else {
00212 return ros::Duration(total_periods_.toSec() / (message_count_ - 1));
00213 }
00214 }
00215
00216 ros::Duration minPeriod() const
00217 {
00218 if (message_count_ < 2) {
00219 return ros::DURATION_MAX;
00220 } else {
00221 return min_period_;
00222 }
00223 }
00224
00225 ros::Duration maxPeriod() const
00226 {
00227 if (message_count_ < 2) {
00228 return ros::DURATION_MAX;
00229 } else {
00230 return max_period_;
00231 }
00232 }
00233
00234 void setTimeout(const ros::Duration &time_out)
00235 {
00236 timeout_ = time_out;
00237 in_timeout_ = false;
00238 timeout_count_ = 0;
00239 }
00240
00241 bool blockTimeouts(bool block) {
00242 if (block) {
00243 in_timeout_ = false;
00244 }
00245
00246 bool old_block = blocking_timeout_;
00247 blocking_timeout_ = block;
00248 return old_block;
00249 }
00250
00251 bool timeoutsBlocked() const {
00252 return blocking_timeout_;
00253 }
00254
00255 ros::Duration timeout() const
00256 {
00257 return timeout_;
00258 }
00259
00260 bool timeoutEnabled() const
00261 {
00262 return timeout_ > ros::Duration(0.0);
00263 }
00264
00265 bool inTimeout()
00266 {
00267 checkTimeout(ros::Time::now());
00268 return in_timeout_;
00269 }
00270
00271 int timeoutCount()
00272 {
00273 checkTimeout(ros::Time::now());
00274 return timeout_count_;
00275 }
00276 };
00277
00278 struct TrueType
00279 {
00280 static const bool value = true;
00281 };
00282
00283 template<class M , class T>
00284 class TypedSubscriberImpl : public SubscriberImpl
00285 {
00286 T *obj_;
00287 void (T::*callback_)(const boost::shared_ptr< M const > &);
00288
00289 public:
00290 TypedSubscriberImpl(
00291 ros::NodeHandle &nh,
00292 const std::string &topic,
00293 uint32_t queue_size,
00294 void(T::*fp)(const boost::shared_ptr< M const > &),
00295 T *obj,
00296 const ros::TransportHints &transport_hints)
00297 {
00298 unmapped_topic_ = topic;
00299 mapped_topic_ = nh.resolveName(topic);
00300
00301 if (unmapped_topic_ == mapped_topic_) {
00302 ROS_INFO("Subscribing to '%s'.", mapped_topic_.c_str());
00303 } else {
00304 ROS_INFO("Subscribing to '%s' at '%s'.",
00305 unmapped_topic_.c_str(),
00306 mapped_topic_.c_str());
00307 }
00308
00309 callback_ = fp;
00310 obj_ = obj;
00311
00312 sub_ = nh.subscribe(mapped_topic_, queue_size,
00313 &TypedSubscriberImpl::handleMessage<M>,
00314 this,
00315 transport_hints);
00316 }
00317
00318
00319 template <class M2>
00320 typename boost::enable_if< ros::message_traits::HasHeader<M2>, void>::type
00321 handleMessage(const boost::shared_ptr< M const> &msg)
00322 {
00323 processHeader(msg->header.stamp);
00324 (obj_->*callback_)(msg);
00325 }
00326
00327
00328 template <class M2>
00329 typename boost::disable_if< ros::message_traits::HasHeader<M2>, void>::type
00330 handleMessage(const boost::shared_ptr< M const> &msg)
00331 {
00332 processHeader(ros::Time::now());
00333 (obj_->*callback_)(msg);
00334 }
00335 };
00336
00337 template<class M>
00338 class BindSubscriberImpl : public SubscriberImpl
00339 {
00340 boost::function<void(const boost::shared_ptr< M const> &)> callback_;
00341
00342
00343 public:
00344 BindSubscriberImpl(
00345 ros::NodeHandle &nh,
00346 const std::string &topic,
00347 uint32_t queue_size,
00348 const boost::function<void(const boost::shared_ptr< M const> &)> &callback,
00349 const ros::TransportHints &transport_hints)
00350 {
00351 unmapped_topic_ = topic;
00352 mapped_topic_ = nh.resolveName(topic);
00353
00354 if (unmapped_topic_ == mapped_topic_) {
00355 ROS_INFO("Subscribing to '%s'.", mapped_topic_.c_str());
00356 } else {
00357 ROS_INFO("Subscribing to '%s' at '%s'.",
00358 unmapped_topic_.c_str(),
00359 mapped_topic_.c_str());
00360 }
00361
00362 callback_ = callback;
00363
00364 sub_ = nh.subscribe(mapped_topic_, queue_size,
00365 &BindSubscriberImpl::handleMessage<M>,
00366 this,
00367 transport_hints);
00368 }
00369
00370
00371 template <class M2>
00372 typename boost::enable_if< ros::message_traits::HasHeader<M2>, void>::type
00373 handleMessage(const boost::shared_ptr< M const> &msg)
00374 {
00375 processHeader(msg->header.stamp);
00376 callback_(msg);
00377 }
00378
00379
00380 template <class M2>
00381 typename boost::disable_if< ros::message_traits::HasHeader<M2>, void>::type
00382 handleMessage(const boost::shared_ptr< M const> &msg)
00383 {
00384 processHeader(ros::Time::now());
00385 callback_(msg);
00386 }
00387 };
00388
00389 template<class M>
00390 class StorageSubscriberImpl : public SubscriberImpl
00391 {
00392 boost::shared_ptr< M const > *dest_;
00393
00394 public:
00395 StorageSubscriberImpl(
00396 ros::NodeHandle &nh,
00397 const std::string &topic,
00398 boost::shared_ptr< M const > *dest,
00399 const ros::TransportHints &transport_hints)
00400 {
00401 unmapped_topic_ = topic;
00402 mapped_topic_ = nh.resolveName(topic);
00403
00404 if (unmapped_topic_ == mapped_topic_) {
00405 ROS_INFO("Subscribing to '%s'.", mapped_topic_.c_str());
00406 } else {
00407 ROS_INFO("Subscribing to '%s' at '%s'.",
00408 unmapped_topic_.c_str(),
00409 mapped_topic_.c_str());
00410 }
00411
00412 dest_ = dest;
00413
00414 sub_ = nh.subscribe(mapped_topic_, 2,
00415 &StorageSubscriberImpl::handleMessage<M>,
00416 this,
00417 transport_hints);
00418 }
00419
00420
00421 template <class M2>
00422 typename boost::enable_if< ros::message_traits::HasHeader<M2>, void>::type
00423 handleMessage(const boost::shared_ptr< M const> &msg)
00424 {
00425 processHeader(msg->header.stamp);
00426 *dest_ = msg;
00427 }
00428
00429
00430 template <class M2>
00431 typename boost::disable_if< ros::message_traits::HasHeader<M2>, void>::type
00432 handleMessage(const boost::shared_ptr< M const> &msg)
00433 {
00434 processHeader(ros::Time::now());
00435 *dest_ = msg;
00436 }
00437 };
00438 }
00439 #endif // SWRI_ROSCPP_SUBSCRIBER_IMPL_H_