00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029 #ifndef SWRI_ROSCPP_SUBSCRIBER_IMPL_H_
00030 #define SWRI_ROSCPP_SUBSCRIBER_IMPL_H_
00031
00032 #include <std_msgs/Header.h>
00033 #include <ros/subscriber.h>
00034
00035 namespace swri
00036 {
00037 class Subscriber;
00038 class SubscriberImpl
00039 {
00040 protected:
00041 ros::Subscriber sub_;
00042 std::string unmapped_topic_;
00043 std::string mapped_topic_;
00044
00045 int message_count_;
00046
00047 ros::Time last_header_stamp_;
00048 ros::Time last_receive_time_;
00049
00050 ros::Duration total_latency_;
00051 ros::Duration min_latency_;
00052 ros::Duration max_latency_;
00053
00054 ros::Duration total_periods_;
00055 ros::Duration min_period_;
00056 ros::Duration max_period_;
00057
00058 ros::Duration timeout_;
00059 bool in_timeout_;
00060 int timeout_count_;
00061 bool blocking_timeout_;
00062
00063 void processHeader(const ros::Time &stamp)
00064 {
00065 ros::Time now = ros::Time::now();
00066
00067
00068
00069 checkTimeout(now);
00070
00071
00072 if (stamp >= last_header_stamp_) {
00073 message_count_++;
00074
00075 if (!stamp.isZero()) {
00076 ros::Duration latency = now - stamp;
00077 if (message_count_ == 1) {
00078 min_latency_ = latency;
00079 max_latency_ = latency;
00080 total_latency_ = latency;
00081 } else {
00082 min_latency_ = std::min(min_latency_, latency);
00083 max_latency_ = std::max(max_latency_, latency);
00084 total_latency_ += latency;
00085 }
00086 }
00087
00088 if (message_count_ > 1) {
00089 ros::Duration period = now - last_receive_time_;
00090 if (message_count_ == 2) {
00091 min_period_ = period;
00092 max_period_ = period;
00093 total_periods_ = period;
00094 } else if (message_count_ > 2) {
00095 min_period_ = std::min(min_period_, period);
00096 max_period_ = std::max(max_period_, period);
00097 total_periods_ += period;
00098 }
00099 }
00100 }
00101
00102 in_timeout_ = false;
00103
00104 last_receive_time_ = now;
00105 last_header_stamp_ = stamp;
00106 }
00107
00108 void checkTimeout(const ros::Time &now)
00109 {
00110 if (blocking_timeout_) {
00111 return;
00112 }
00113
00114 if (in_timeout_ || timeout_ <= ros::Duration(0.0)) {
00115 return;
00116 }
00117
00118 if (message_count_ == 0) {
00119 return;
00120 }
00121
00122 if (age(now) > timeout_) {
00123 in_timeout_ = true;
00124 timeout_count_++;
00125 }
00126 }
00127
00128
00129 public:
00130 SubscriberImpl() :
00131 unmapped_topic_("N/A"),
00132 mapped_topic_("N/A"),
00133 message_count_(0),
00134 timeout_(-1.0),
00135 in_timeout_(false),
00136 timeout_count_(0),
00137 blocking_timeout_(false)
00138 {
00139 resetStatistics();
00140 }
00141
00142 const std::string& mappedTopic() const
00143 {
00144 return mapped_topic_;
00145 }
00146
00147 const std::string& unmappedTopic() const
00148 {
00149 return unmapped_topic_;
00150 }
00151
00152 int numPublishers() const
00153 {
00154 return sub_.getNumPublishers();
00155 }
00156
00157 void resetStatistics()
00158 {
00159 message_count_ = 0;
00160 in_timeout_ = false;
00161 timeout_count_ = 0;
00162 }
00163
00164 int messageCount() const
00165 {
00166 return message_count_;
00167 }
00168
00169 ros::Duration age(const ros::Time &now) const
00170 {
00171 if (message_count_ < 1) {
00172 return ros::DURATION_MAX;
00173 } else if (last_header_stamp_.isValid()) {
00174 return now - last_header_stamp_;
00175 } else {
00176
00177
00178 return ros::Duration(0.0);
00179 }
00180 }
00181
00182 ros::Duration meanLatency() const
00183 {
00184 if (message_count_ < 1) {
00185 return ros::DURATION_MAX;
00186 } else {
00187 return ros::Duration(total_latency_.toSec() / message_count_);
00188 }
00189 }
00190
00191 ros::Duration minLatency() const
00192 {
00193 if (message_count_ < 1) {
00194 return ros::DURATION_MAX;
00195 } else {
00196 return min_latency_;
00197 }
00198 }
00199
00200 ros::Duration maxLatency() const
00201 {
00202 if (message_count_ < 1) {
00203 return ros::DURATION_MAX;
00204 } else {
00205 return max_latency_;
00206 }
00207 }
00208
00209 double meanFrequencyHz() const
00210 {
00211 if (message_count_ < 2) {
00212 return 0.0;
00213 } else {
00214 return 1e9 / meanPeriod().toNSec();
00215 }
00216 }
00217
00218 ros::Duration meanPeriod() const
00219 {
00220 if (message_count_ < 2) {
00221 return ros::DURATION_MAX;
00222 } else {
00223 return ros::Duration(total_periods_.toSec() / (message_count_ - 1));
00224 }
00225 }
00226
00227 ros::Duration minPeriod() const
00228 {
00229 if (message_count_ < 2) {
00230 return ros::DURATION_MAX;
00231 } else {
00232 return min_period_;
00233 }
00234 }
00235
00236 ros::Duration maxPeriod() const
00237 {
00238 if (message_count_ < 2) {
00239 return ros::DURATION_MAX;
00240 } else {
00241 return max_period_;
00242 }
00243 }
00244
00245 void setTimeout(const ros::Duration &time_out)
00246 {
00247 timeout_ = time_out;
00248 in_timeout_ = false;
00249 timeout_count_ = 0;
00250 }
00251
00252 bool blockTimeouts(bool block) {
00253 if (block) {
00254 in_timeout_ = false;
00255 }
00256
00257 bool old_block = blocking_timeout_;
00258 blocking_timeout_ = block;
00259 return old_block;
00260 }
00261
00262 bool timeoutsBlocked() const {
00263 return blocking_timeout_;
00264 }
00265
00266 ros::Duration timeout() const
00267 {
00268 return timeout_;
00269 }
00270
00271 bool timeoutEnabled() const
00272 {
00273 return timeout_ > ros::Duration(0.0);
00274 }
00275
00276 bool inTimeout()
00277 {
00278 checkTimeout(ros::Time::now());
00279 return in_timeout_;
00280 }
00281
00282 int timeoutCount()
00283 {
00284 checkTimeout(ros::Time::now());
00285 return timeout_count_;
00286 }
00287 };
00288
00289 struct TrueType
00290 {
00291 static const bool value = true;
00292 };
00293
00294 template<class M , class T>
00295 class TypedSubscriberImpl : public SubscriberImpl
00296 {
00297 T *obj_;
00298 void (T::*callback_)(const boost::shared_ptr< M const > &);
00299
00300 public:
00301 TypedSubscriberImpl(
00302 ros::NodeHandle &nh,
00303 const std::string &topic,
00304 uint32_t queue_size,
00305 void(T::*fp)(const boost::shared_ptr< M const > &),
00306 T *obj,
00307 const ros::TransportHints &transport_hints)
00308 {
00309 unmapped_topic_ = topic;
00310 mapped_topic_ = nh.resolveName(topic);
00311
00312 if (unmapped_topic_ == mapped_topic_) {
00313 ROS_INFO("Subscribing to '%s'.", mapped_topic_.c_str());
00314 } else {
00315 ROS_INFO("Subscribing to '%s' at '%s'.",
00316 unmapped_topic_.c_str(),
00317 mapped_topic_.c_str());
00318 }
00319
00320 callback_ = fp;
00321 obj_ = obj;
00322
00323 sub_ = nh.subscribe(mapped_topic_, queue_size,
00324 &TypedSubscriberImpl::handleMessage<M>,
00325 this,
00326 transport_hints);
00327 }
00328
00329
00330 template <class M2>
00331 typename boost::enable_if< ros::message_traits::HasHeader<M2>, void>::type
00332 handleMessage(const boost::shared_ptr< M const> &msg)
00333 {
00334 processHeader(msg->header.stamp);
00335 (obj_->*callback_)(msg);
00336 }
00337
00338
00339 template <class M2>
00340 typename boost::disable_if< ros::message_traits::HasHeader<M2>, void>::type
00341 handleMessage(const boost::shared_ptr< M const> &msg)
00342 {
00343 processHeader(ros::Time::now());
00344 (obj_->*callback_)(msg);
00345 }
00346 };
00347
00348 template<class M>
00349 class BindSubscriberImpl : public SubscriberImpl
00350 {
00351 boost::function<void(const boost::shared_ptr< M const> &)> callback_;
00352
00353
00354 public:
00355 BindSubscriberImpl(
00356 ros::NodeHandle &nh,
00357 const std::string &topic,
00358 uint32_t queue_size,
00359 const boost::function<void(const boost::shared_ptr< M const> &)> &callback,
00360 const ros::TransportHints &transport_hints)
00361 {
00362 unmapped_topic_ = topic;
00363 mapped_topic_ = nh.resolveName(topic);
00364
00365 if (unmapped_topic_ == mapped_topic_) {
00366 ROS_INFO("Subscribing to '%s'.", mapped_topic_.c_str());
00367 } else {
00368 ROS_INFO("Subscribing to '%s' at '%s'.",
00369 unmapped_topic_.c_str(),
00370 mapped_topic_.c_str());
00371 }
00372
00373 callback_ = callback;
00374
00375 sub_ = nh.subscribe(mapped_topic_, queue_size,
00376 &BindSubscriberImpl::handleMessage<M>,
00377 this,
00378 transport_hints);
00379 }
00380
00381
00382 template <class M2>
00383 typename boost::enable_if< ros::message_traits::HasHeader<M2>, void>::type
00384 handleMessage(const boost::shared_ptr< M const> &msg)
00385 {
00386 processHeader(msg->header.stamp);
00387 callback_(msg);
00388 }
00389
00390
00391 template <class M2>
00392 typename boost::disable_if< ros::message_traits::HasHeader<M2>, void>::type
00393 handleMessage(const boost::shared_ptr< M const> &msg)
00394 {
00395 processHeader(ros::Time::now());
00396 callback_(msg);
00397 }
00398 };
00399
00400 template<class M>
00401 class StorageSubscriberImpl : public SubscriberImpl
00402 {
00403 boost::shared_ptr< M const > *dest_;
00404
00405 public:
00406 StorageSubscriberImpl(
00407 ros::NodeHandle &nh,
00408 const std::string &topic,
00409 boost::shared_ptr< M const > *dest,
00410 const ros::TransportHints &transport_hints)
00411 {
00412 unmapped_topic_ = topic;
00413 mapped_topic_ = nh.resolveName(topic);
00414
00415 if (unmapped_topic_ == mapped_topic_) {
00416 ROS_INFO("Subscribing to '%s'.", mapped_topic_.c_str());
00417 } else {
00418 ROS_INFO("Subscribing to '%s' at '%s'.",
00419 unmapped_topic_.c_str(),
00420 mapped_topic_.c_str());
00421 }
00422
00423 dest_ = dest;
00424
00425 sub_ = nh.subscribe(mapped_topic_, 2,
00426 &StorageSubscriberImpl::handleMessage<M>,
00427 this,
00428 transport_hints);
00429 }
00430
00431
00432 template <class M2>
00433 typename boost::enable_if< ros::message_traits::HasHeader<M2>, void>::type
00434 handleMessage(const boost::shared_ptr< M const> &msg)
00435 {
00436 processHeader(msg->header.stamp);
00437 *dest_ = msg;
00438 }
00439
00440
00441 template <class M2>
00442 typename boost::disable_if< ros::message_traits::HasHeader<M2>, void>::type
00443 handleMessage(const boost::shared_ptr< M const> &msg)
00444 {
00445 processHeader(ros::Time::now());
00446 *dest_ = msg;
00447 }
00448 };
00449 }
00450 #endif // SWRI_ROSCPP_SUBSCRIBER_IMPL_H_