00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028 #include "ros/publication.h"
00029 #include "ros/subscriber_link.h"
00030 #include "ros/connection.h"
00031 #include "ros/callback_queue_interface.h"
00032 #include "ros/single_subscriber_publisher.h"
00033 #include "ros/serialization.h"
00034 #include <std_msgs/Header.h>
00035
00036 namespace ros
00037 {
00038
00039 class PeerConnDisconnCallback : public CallbackInterface
00040 {
00041 public:
00042 PeerConnDisconnCallback(const SubscriberStatusCallback& callback, const SubscriberLinkPtr& sub_link, bool use_tracked_object, const VoidConstWPtr& tracked_object)
00043 : callback_(callback)
00044 , sub_link_(sub_link)
00045 , use_tracked_object_(use_tracked_object)
00046 , tracked_object_(tracked_object)
00047 {
00048 }
00049
00050 virtual CallResult call()
00051 {
00052 VoidConstPtr tracker;
00053 if (use_tracked_object_)
00054 {
00055 tracker = tracked_object_.lock();
00056
00057 if (!tracker)
00058 {
00059 return Invalid;
00060 }
00061 }
00062
00063 SingleSubscriberPublisher pub(sub_link_);
00064 callback_(pub);
00065
00066 return Success;
00067 }
00068
00069 private:
00070 SubscriberStatusCallback callback_;
00071 SubscriberLinkPtr sub_link_;
00072 bool use_tracked_object_;
00073 VoidConstWPtr tracked_object_;
00074 };
00075
00076 Publication::Publication(const std::string &name,
00077 const std::string &datatype,
00078 const std::string &_md5sum,
00079 const std::string& message_definition,
00080 size_t max_queue,
00081 bool latch,
00082 bool has_header)
00083 : name_(name),
00084 datatype_(datatype),
00085 md5sum_(_md5sum),
00086 message_definition_(message_definition),
00087 max_queue_(max_queue),
00088 seq_(0),
00089 dropped_(false),
00090 latch_(latch),
00091 has_header_(has_header),
00092 intraprocess_subscriber_count_(0)
00093 {
00094 }
00095
00096 Publication::~Publication()
00097 {
00098 drop();
00099 }
00100
00101 void Publication::addCallbacks(const SubscriberCallbacksPtr& callbacks)
00102 {
00103 boost::mutex::scoped_lock lock(callbacks_mutex_);
00104
00105 callbacks_.push_back(callbacks);
00106
00107
00108 if (callbacks->connect_ && callbacks->callback_queue_)
00109 {
00110 boost::mutex::scoped_lock lock(subscriber_links_mutex_);
00111 V_SubscriberLink::iterator it = subscriber_links_.begin();
00112 V_SubscriberLink::iterator end = subscriber_links_.end();
00113 for (; it != end; ++it)
00114 {
00115 const SubscriberLinkPtr& sub_link = *it;
00116 CallbackInterfacePtr cb(boost::make_shared<PeerConnDisconnCallback>(callbacks->connect_, sub_link, callbacks->has_tracked_object_, callbacks->tracked_object_));
00117 callbacks->callback_queue_->addCallback(cb, (uint64_t)callbacks.get());
00118 }
00119 }
00120 }
00121
00122 void Publication::removeCallbacks(const SubscriberCallbacksPtr& callbacks)
00123 {
00124 boost::mutex::scoped_lock lock(callbacks_mutex_);
00125
00126 V_Callback::iterator it = std::find(callbacks_.begin(), callbacks_.end(), callbacks);
00127 if (it != callbacks_.end())
00128 {
00129 const SubscriberCallbacksPtr& cb = *it;
00130 if (cb->callback_queue_)
00131 {
00132 cb->callback_queue_->removeByID((uint64_t)cb.get());
00133 }
00134 callbacks_.erase(it);
00135 }
00136 }
00137
00138 void Publication::drop()
00139 {
00140
00141
00142 {
00143 boost::mutex::scoped_lock lock(publish_queue_mutex_);
00144 boost::mutex::scoped_lock lock2(subscriber_links_mutex_);
00145
00146 if (dropped_)
00147 {
00148 return;
00149 }
00150
00151 dropped_ = true;
00152 }
00153
00154 dropAllConnections();
00155 }
00156
00157 bool Publication::enqueueMessage(const SerializedMessage& m)
00158 {
00159 boost::mutex::scoped_lock lock(subscriber_links_mutex_);
00160 if (dropped_)
00161 {
00162 return false;
00163 }
00164
00165 ROS_ASSERT(m.buf);
00166
00167 uint32_t seq = incrementSequence();
00168 if (has_header_)
00169 {
00170
00171
00172 namespace ser = ros::serialization;
00173 std_msgs::Header header;
00174 ser::IStream istream(m.buf.get() + 4, m.num_bytes - 4);
00175 ser::deserialize(istream, header);
00176 header.seq = seq;
00177 ser::OStream ostream(m.buf.get() + 4, m.num_bytes - 4);
00178 ser::serialize(ostream, header);
00179 }
00180
00181 for(V_SubscriberLink::iterator i = subscriber_links_.begin();
00182 i != subscriber_links_.end(); ++i)
00183 {
00184 const SubscriberLinkPtr& sub_link = (*i);
00185 sub_link->enqueueMessage(m, true, false);
00186 }
00187
00188 if (latch_)
00189 {
00190 last_message_ = m;
00191 }
00192
00193 return true;
00194 }
00195
00196 void Publication::addSubscriberLink(const SubscriberLinkPtr& sub_link)
00197 {
00198 {
00199 boost::mutex::scoped_lock lock(subscriber_links_mutex_);
00200
00201 if (dropped_)
00202 {
00203 return;
00204 }
00205
00206 subscriber_links_.push_back(sub_link);
00207
00208 if (sub_link->isIntraprocess())
00209 {
00210 ++intraprocess_subscriber_count_;
00211 }
00212 }
00213
00214 if (latch_ && last_message_.buf)
00215 {
00216 sub_link->enqueueMessage(last_message_, true, true);
00217 }
00218
00219
00220
00221
00222 peerConnect(sub_link);
00223 }
00224
00225 void Publication::removeSubscriberLink(const SubscriberLinkPtr& sub_link)
00226 {
00227 SubscriberLinkPtr link;
00228 {
00229 boost::mutex::scoped_lock lock(subscriber_links_mutex_);
00230
00231 if (dropped_)
00232 {
00233 return;
00234 }
00235
00236 if (sub_link->isIntraprocess())
00237 {
00238 --intraprocess_subscriber_count_;
00239 }
00240
00241 V_SubscriberLink::iterator it = std::find(subscriber_links_.begin(), subscriber_links_.end(), sub_link);
00242 if (it != subscriber_links_.end())
00243 {
00244 link = *it;
00245 subscriber_links_.erase(it);
00246 }
00247 }
00248
00249 if (link)
00250 {
00251 peerDisconnect(link);
00252 }
00253 }
00254
00255 XmlRpc::XmlRpcValue Publication::getStats()
00256 {
00257 XmlRpc::XmlRpcValue stats;
00258 stats[0] = name_;
00259 XmlRpc::XmlRpcValue conn_data;
00260 conn_data.setSize(0);
00261
00262 boost::mutex::scoped_lock lock(subscriber_links_mutex_);
00263
00264 uint32_t cidx = 0;
00265 for (V_SubscriberLink::iterator c = subscriber_links_.begin();
00266 c != subscriber_links_.end(); ++c, cidx++)
00267 {
00268 const SubscriberLink::Stats& s = (*c)->getStats();
00269 conn_data[cidx][0] = (*c)->getConnectionID();
00270
00271
00272
00273
00274
00275 conn_data[cidx][1] = (int)s.bytes_sent_;
00276 conn_data[cidx][2] = (int)s.message_data_sent_;
00277 conn_data[cidx][3] = (int)s.messages_sent_;
00278 conn_data[cidx][4] = 0;
00279 }
00280
00281 stats[1] = conn_data;
00282 return stats;
00283 }
00284
00285
00286
00287 void Publication::getInfo(XmlRpc::XmlRpcValue& info)
00288 {
00289 boost::mutex::scoped_lock lock(subscriber_links_mutex_);
00290
00291 for (V_SubscriberLink::iterator c = subscriber_links_.begin();
00292 c != subscriber_links_.end(); ++c)
00293 {
00294 XmlRpc::XmlRpcValue curr_info;
00295 curr_info[0] = (int)(*c)->getConnectionID();
00296 curr_info[1] = (*c)->getDestinationCallerID();
00297 curr_info[2] = "o";
00298 curr_info[3] = (*c)->getTransportType();
00299 curr_info[4] = name_;
00300 curr_info[5] = true;
00301 curr_info[6] = (*c)->getTransportInfo();
00302 info[info.size()] = curr_info;
00303 }
00304 }
00305
00306 void Publication::dropAllConnections()
00307 {
00308
00309
00310 V_SubscriberLink local_publishers;
00311
00312 {
00313 boost::mutex::scoped_lock lock(subscriber_links_mutex_);
00314
00315 local_publishers.swap(subscriber_links_);
00316 }
00317
00318 for (V_SubscriberLink::iterator i = local_publishers.begin();
00319 i != local_publishers.end(); ++i)
00320 {
00321 (*i)->drop();
00322 }
00323 }
00324
00325 void Publication::peerConnect(const SubscriberLinkPtr& sub_link)
00326 {
00327 V_Callback::iterator it = callbacks_.begin();
00328 V_Callback::iterator end = callbacks_.end();
00329 for (; it != end; ++it)
00330 {
00331 const SubscriberCallbacksPtr& cbs = *it;
00332 if (cbs->connect_ && cbs->callback_queue_)
00333 {
00334 CallbackInterfacePtr cb(boost::make_shared<PeerConnDisconnCallback>(cbs->connect_, sub_link, cbs->has_tracked_object_, cbs->tracked_object_));
00335 cbs->callback_queue_->addCallback(cb, (uint64_t)cbs.get());
00336 }
00337 }
00338 }
00339
00340 void Publication::peerDisconnect(const SubscriberLinkPtr& sub_link)
00341 {
00342 V_Callback::iterator it = callbacks_.begin();
00343 V_Callback::iterator end = callbacks_.end();
00344 for (; it != end; ++it)
00345 {
00346 const SubscriberCallbacksPtr& cbs = *it;
00347 if (cbs->disconnect_ && cbs->callback_queue_)
00348 {
00349 CallbackInterfacePtr cb(boost::make_shared<PeerConnDisconnCallback>(cbs->disconnect_, sub_link, cbs->has_tracked_object_, cbs->tracked_object_));
00350 cbs->callback_queue_->addCallback(cb, (uint64_t)cbs.get());
00351 }
00352 }
00353 }
00354
00355 size_t Publication::getNumCallbacks()
00356 {
00357 boost::mutex::scoped_lock lock(callbacks_mutex_);
00358 return callbacks_.size();
00359 }
00360
00361 uint32_t Publication::incrementSequence()
00362 {
00363 boost::mutex::scoped_lock lock(seq_mutex_);
00364 uint32_t old_seq = seq_;
00365 ++seq_;
00366
00367 return old_seq;
00368 }
00369
00370 uint32_t Publication::getNumSubscribers()
00371 {
00372 boost::mutex::scoped_lock lock(subscriber_links_mutex_);
00373 return (uint32_t)subscriber_links_.size();
00374 }
00375
00376 void Publication::getPublishTypes(bool& serialize, bool& nocopy, const std::type_info& ti)
00377 {
00378 boost::mutex::scoped_lock lock(subscriber_links_mutex_);
00379 V_SubscriberLink::const_iterator it = subscriber_links_.begin();
00380 V_SubscriberLink::const_iterator end = subscriber_links_.end();
00381 for (; it != end; ++it)
00382 {
00383 const SubscriberLinkPtr& sub = *it;
00384 bool s = false;
00385 bool n = false;
00386 sub->getPublishTypes(s, n, ti);
00387 serialize = serialize || s;
00388 nocopy = nocopy || n;
00389
00390 if (serialize && nocopy)
00391 {
00392 break;
00393 }
00394 }
00395 }
00396
00397 bool Publication::hasSubscribers()
00398 {
00399 boost::mutex::scoped_lock lock(subscriber_links_mutex_);
00400 return !subscriber_links_.empty();
00401 }
00402
00403 void Publication::publish(SerializedMessage& m)
00404 {
00405 if (m.message)
00406 {
00407 boost::mutex::scoped_lock lock(subscriber_links_mutex_);
00408 V_SubscriberLink::const_iterator it = subscriber_links_.begin();
00409 V_SubscriberLink::const_iterator end = subscriber_links_.end();
00410 for (; it != end; ++it)
00411 {
00412 const SubscriberLinkPtr& sub = *it;
00413 if (sub->isIntraprocess())
00414 {
00415 sub->enqueueMessage(m, false, true);
00416 }
00417 }
00418
00419 m.message.reset();
00420 }
00421
00422 if (m.buf)
00423 {
00424 boost::mutex::scoped_lock lock(publish_queue_mutex_);
00425 publish_queue_.push_back(m);
00426 }
00427 }
00428
00429 void Publication::processPublishQueue()
00430 {
00431 V_SerializedMessage queue;
00432 {
00433 boost::mutex::scoped_lock lock(publish_queue_mutex_);
00434
00435 if (dropped_)
00436 {
00437 return;
00438 }
00439
00440 queue.insert(queue.end(), publish_queue_.begin(), publish_queue_.end());
00441 publish_queue_.clear();
00442 }
00443
00444 if (queue.empty())
00445 {
00446 return;
00447 }
00448
00449 V_SerializedMessage::iterator it = queue.begin();
00450 V_SerializedMessage::iterator end = queue.end();
00451 for (; it != end; ++it)
00452 {
00453 enqueueMessage(*it);
00454 }
00455 }
00456
00457 bool Publication::validateHeader(const Header& header, std::string& error_msg)
00458 {
00459 std::string md5sum, topic, client_callerid;
00460 if (!header.getValue("md5sum", md5sum)
00461 || !header.getValue("topic", topic)
00462 || !header.getValue("callerid", client_callerid))
00463 {
00464 std::string msg("Header from subscriber did not have the required elements: md5sum, topic, callerid");
00465
00466 ROS_ERROR("%s", msg.c_str());
00467 error_msg = msg;
00468
00469 return false;
00470 }
00471
00472
00473
00474
00475
00476 if(isDropped())
00477 {
00478 std::string msg = std::string("received a tcpros connection for a nonexistent topic [") +
00479 topic + std::string("] from [" + client_callerid +"].");
00480
00481 ROS_ERROR("%s", msg.c_str());
00482 error_msg = msg;
00483
00484 return false;
00485 }
00486
00487 if (getMD5Sum() != md5sum &&
00488 (md5sum != std::string("*") && getMD5Sum() != std::string("*")))
00489 {
00490 std::string datatype;
00491 header.getValue("type", datatype);
00492
00493 std::string msg = std::string("Client [") + client_callerid + std::string("] wants topic ") + topic +
00494 std::string(" to have datatype/md5sum [") + datatype + "/" + md5sum +
00495 std::string("], but our version has [") + getDataType() + "/" + getMD5Sum() +
00496 std::string("]. Dropping connection.");
00497
00498 ROS_ERROR("%s", msg.c_str());
00499 error_msg = msg;
00500
00501 return false;
00502 }
00503
00504 return true;
00505 }
00506
00507 }