00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028 #include "ros/publication.h"
00029 #include "ros/subscriber_link.h"
00030 #include "ros/connection.h"
00031 #include "ros/callback_queue_interface.h"
00032 #include "ros/single_subscriber_publisher.h"
00033 #include "ros/serialization.h"
00034 #include <std_msgs/Header.h>
00035
00036 namespace ros
00037 {
00038
00039 class PeerConnDisconnCallback : public CallbackInterface
00040 {
00041 public:
00042 PeerConnDisconnCallback(const SubscriberStatusCallback& callback, const SubscriberLinkPtr& sub_link, bool use_tracked_object, const VoidConstWPtr& tracked_object)
00043 : callback_(callback)
00044 , sub_link_(sub_link)
00045 , use_tracked_object_(use_tracked_object)
00046 , tracked_object_(tracked_object)
00047 {
00048 }
00049
00050 virtual CallResult call()
00051 {
00052 VoidConstPtr tracker;
00053 if (use_tracked_object_)
00054 {
00055 tracker = tracked_object_.lock();
00056
00057 if (!tracker)
00058 {
00059 return Invalid;
00060 }
00061 }
00062
00063 SingleSubscriberPublisher pub(sub_link_);
00064 callback_(pub);
00065
00066 return Success;
00067 }
00068
00069 private:
00070 SubscriberStatusCallback callback_;
00071 SubscriberLinkPtr sub_link_;
00072 bool use_tracked_object_;
00073 VoidConstWPtr tracked_object_;
00074 };
00075
00076 Publication::Publication(const std::string &name,
00077 const std::string &datatype,
00078 const std::string &_md5sum,
00079 const std::string& message_definition,
00080 size_t max_queue,
00081 bool latch,
00082 bool has_header)
00083 : name_(name),
00084 datatype_(datatype),
00085 md5sum_(_md5sum),
00086 message_definition_(message_definition),
00087 max_queue_(max_queue),
00088 seq_(0),
00089 dropped_(false),
00090 latch_(latch),
00091 has_header_(has_header),
00092 intraprocess_subscriber_count_(0)
00093 {
00094 }
00095
00096 Publication::~Publication()
00097 {
00098 drop();
00099 }
00100
00101 void Publication::addCallbacks(const SubscriberCallbacksPtr& callbacks)
00102 {
00103 boost::mutex::scoped_lock lock(callbacks_mutex_);
00104
00105 callbacks_.push_back(callbacks);
00106
00107
00108 if (callbacks->connect_ && callbacks->callback_queue_)
00109 {
00110 boost::mutex::scoped_lock lock(subscriber_links_mutex_);
00111 V_SubscriberLink::iterator it = subscriber_links_.begin();
00112 V_SubscriberLink::iterator end = subscriber_links_.end();
00113 for (; it != end; ++it)
00114 {
00115 const SubscriberLinkPtr& sub_link = *it;
00116 CallbackInterfacePtr cb(new PeerConnDisconnCallback(callbacks->connect_, sub_link, callbacks->has_tracked_object_, callbacks->tracked_object_));
00117 callbacks->callback_queue_->addCallback(cb, (uint64_t)callbacks.get());
00118 }
00119 }
00120 }
00121
00122 void Publication::removeCallbacks(const SubscriberCallbacksPtr& callbacks)
00123 {
00124 boost::mutex::scoped_lock lock(callbacks_mutex_);
00125
00126 V_Callback::iterator it = std::find(callbacks_.begin(), callbacks_.end(), callbacks);
00127 if (it != callbacks_.end())
00128 {
00129 const SubscriberCallbacksPtr& cb = *it;
00130 if (cb->callback_queue_)
00131 {
00132 cb->callback_queue_->removeByID((uint64_t)cb.get());
00133 }
00134 callbacks_.erase(it);
00135 }
00136 }
00137
00138 void Publication::drop()
00139 {
00140
00141
00142 {
00143 boost::mutex::scoped_lock lock(publish_queue_mutex_);
00144 boost::mutex::scoped_lock lock2(subscriber_links_mutex_);
00145
00146 if (dropped_)
00147 {
00148 return;
00149 }
00150
00151 dropped_ = true;
00152 }
00153
00154 dropAllConnections();
00155 }
00156
00157 bool Publication::enqueueMessage(const SerializedMessage& m)
00158 {
00159 boost::mutex::scoped_lock lock(subscriber_links_mutex_);
00160 if (dropped_)
00161 {
00162 return false;
00163 }
00164
00165 ROS_ASSERT(m.buf);
00166
00167 uint32_t seq = incrementSequence();
00168 if (has_header_)
00169 {
00170
00171
00172 namespace ser = ros::serialization;
00173 std_msgs::Header header;
00174 ser::IStream istream(m.buf.get() + 4, m.num_bytes - 4);
00175 ser::deserialize(istream, header);
00176 header.seq = seq;
00177 ser::OStream ostream(m.buf.get() + 4, m.num_bytes - 4);
00178 ser::serialize(ostream, header);
00179 }
00180
00181 for(V_SubscriberLink::iterator i = subscriber_links_.begin();
00182 i != subscriber_links_.end(); ++i)
00183 {
00184 const SubscriberLinkPtr& sub_link = (*i);
00185 sub_link->enqueueMessage(m, true, false);
00186 }
00187
00188 if (latch_)
00189 {
00190 last_message_ = m;
00191 }
00192
00193 return true;
00194 }
00195
00196 void Publication::addSubscriberLink(const SubscriberLinkPtr& sub_link)
00197 {
00198 {
00199 boost::mutex::scoped_lock lock(subscriber_links_mutex_);
00200
00201 if (dropped_)
00202 {
00203 return;
00204 }
00205
00206 subscriber_links_.push_back(sub_link);
00207
00208 if (sub_link->isIntraprocess())
00209 {
00210 ++intraprocess_subscriber_count_;
00211 }
00212 }
00213
00214 if (latch_ && last_message_.buf)
00215 {
00216 sub_link->enqueueMessage(last_message_, true, true);
00217 }
00218
00219
00220
00221
00222 peerConnect(sub_link);
00223 }
00224
00225 void Publication::removeSubscriberLink(const SubscriberLinkPtr& sub_link)
00226 {
00227 SubscriberLinkPtr link;
00228 {
00229 boost::mutex::scoped_lock lock(subscriber_links_mutex_);
00230
00231 if (dropped_)
00232 {
00233 return;
00234 }
00235
00236 if (sub_link->isIntraprocess())
00237 {
00238 --intraprocess_subscriber_count_;
00239 }
00240
00241 V_SubscriberLink::iterator it = std::find(subscriber_links_.begin(), subscriber_links_.end(), sub_link);
00242 if (it != subscriber_links_.end())
00243 {
00244 link = *it;
00245 subscriber_links_.erase(it);
00246 }
00247 }
00248
00249 if (link)
00250 {
00251 peerDisconnect(link);
00252 }
00253 }
00254
00255 XmlRpc::XmlRpcValue Publication::getStats()
00256 {
00257 XmlRpc::XmlRpcValue stats;
00258 stats[0] = name_;
00259 XmlRpc::XmlRpcValue conn_data;
00260 conn_data.setSize(0);
00261
00262 boost::mutex::scoped_lock lock(subscriber_links_mutex_);
00263
00264 uint32_t cidx = 0;
00265 for (V_SubscriberLink::iterator c = subscriber_links_.begin();
00266 c != subscriber_links_.end(); ++c, cidx++)
00267 {
00268 const SubscriberLink::Stats& s = (*c)->getStats();
00269 conn_data[cidx][0] = (*c)->getConnectionID();
00270
00271
00272
00273
00274
00275 conn_data[cidx][1] = (int)s.bytes_sent_;
00276 conn_data[cidx][2] = (int)s.message_data_sent_;
00277 conn_data[cidx][3] = (int)s.messages_sent_;
00278 conn_data[cidx][4] = 0;
00279 }
00280
00281 stats[1] = conn_data;
00282 return stats;
00283 }
00284
00285
00286
00287
00288
00289
00290 void Publication::getInfo(XmlRpc::XmlRpcValue& info)
00291 {
00292 boost::mutex::scoped_lock lock(subscriber_links_mutex_);
00293
00294 for (V_SubscriberLink::iterator c = subscriber_links_.begin();
00295 c != subscriber_links_.end(); ++c)
00296 {
00297 XmlRpc::XmlRpcValue curr_info;
00298 curr_info[0] = (int)(*c)->getConnectionID();
00299 curr_info[1] = (*c)->getDestinationCallerID();
00300 curr_info[2] = "o";
00301 curr_info[3] = (*c)->getTransportType();
00302 curr_info[4] = name_;
00303 info[info.size()] = curr_info;
00304 }
00305 }
00306
00307 void Publication::dropAllConnections()
00308 {
00309
00310
00311 V_SubscriberLink local_publishers;
00312
00313 {
00314 boost::mutex::scoped_lock lock(subscriber_links_mutex_);
00315
00316 local_publishers.swap(subscriber_links_);
00317 }
00318
00319 for (V_SubscriberLink::iterator i = local_publishers.begin();
00320 i != local_publishers.end(); ++i)
00321 {
00322 (*i)->drop();
00323 }
00324 }
00325
00326 void Publication::peerConnect(const SubscriberLinkPtr& sub_link)
00327 {
00328 V_Callback::iterator it = callbacks_.begin();
00329 V_Callback::iterator end = callbacks_.end();
00330 for (; it != end; ++it)
00331 {
00332 const SubscriberCallbacksPtr& cbs = *it;
00333 if (cbs->connect_ && cbs->callback_queue_)
00334 {
00335 CallbackInterfacePtr cb(new PeerConnDisconnCallback(cbs->connect_, sub_link, cbs->has_tracked_object_, cbs->tracked_object_));
00336 cbs->callback_queue_->addCallback(cb, (uint64_t)cbs.get());
00337 }
00338 }
00339 }
00340
00341 void Publication::peerDisconnect(const SubscriberLinkPtr& sub_link)
00342 {
00343 V_Callback::iterator it = callbacks_.begin();
00344 V_Callback::iterator end = callbacks_.end();
00345 for (; it != end; ++it)
00346 {
00347 const SubscriberCallbacksPtr& cbs = *it;
00348 if (cbs->disconnect_ && cbs->callback_queue_)
00349 {
00350 CallbackInterfacePtr cb(new PeerConnDisconnCallback(cbs->disconnect_, sub_link, cbs->has_tracked_object_, cbs->tracked_object_));
00351 cbs->callback_queue_->addCallback(cb, (uint64_t)cbs.get());
00352 }
00353 }
00354 }
00355
00356 size_t Publication::getNumCallbacks()
00357 {
00358 boost::mutex::scoped_lock lock(callbacks_mutex_);
00359 return callbacks_.size();
00360 }
00361
00362 uint32_t Publication::incrementSequence()
00363 {
00364 boost::mutex::scoped_lock lock(seq_mutex_);
00365 uint32_t old_seq = seq_;
00366 ++seq_;
00367
00368 return old_seq;
00369 }
00370
00371 uint32_t Publication::getNumSubscribers()
00372 {
00373 boost::mutex::scoped_lock lock(subscriber_links_mutex_);
00374 return (uint32_t)subscriber_links_.size();
00375 }
00376
00377 void Publication::getPublishTypes(bool& serialize, bool& nocopy, const std::type_info& ti)
00378 {
00379 boost::mutex::scoped_lock lock(subscriber_links_mutex_);
00380 V_SubscriberLink::const_iterator it = subscriber_links_.begin();
00381 V_SubscriberLink::const_iterator end = subscriber_links_.end();
00382 for (; it != end; ++it)
00383 {
00384 const SubscriberLinkPtr& sub = *it;
00385 bool s = false;
00386 bool n = false;
00387 sub->getPublishTypes(s, n, ti);
00388 serialize = serialize || s;
00389 nocopy = nocopy || n;
00390
00391 if (serialize && nocopy)
00392 {
00393 break;
00394 }
00395 }
00396 }
00397
00398 bool Publication::hasSubscribers()
00399 {
00400 boost::mutex::scoped_lock lock(subscriber_links_mutex_);
00401 return !subscriber_links_.empty();
00402 }
00403
00404 void Publication::publish(SerializedMessage& m)
00405 {
00406 if (m.message)
00407 {
00408 boost::mutex::scoped_lock lock(subscriber_links_mutex_);
00409 V_SubscriberLink::const_iterator it = subscriber_links_.begin();
00410 V_SubscriberLink::const_iterator end = subscriber_links_.end();
00411 for (; it != end; ++it)
00412 {
00413 const SubscriberLinkPtr& sub = *it;
00414 if (sub->isIntraprocess())
00415 {
00416 sub->enqueueMessage(m, false, true);
00417 }
00418 }
00419
00420 m.message.reset();
00421 }
00422
00423 if (m.buf)
00424 {
00425 boost::mutex::scoped_lock lock(publish_queue_mutex_);
00426 publish_queue_.push_back(m);
00427 }
00428 }
00429
00430 void Publication::processPublishQueue()
00431 {
00432 V_SerializedMessage queue;
00433 {
00434 boost::mutex::scoped_lock lock(publish_queue_mutex_);
00435
00436 if (dropped_)
00437 {
00438 return;
00439 }
00440
00441 queue.insert(queue.end(), publish_queue_.begin(), publish_queue_.end());
00442 publish_queue_.clear();
00443 }
00444
00445 if (queue.empty())
00446 {
00447 return;
00448 }
00449
00450 V_SerializedMessage::iterator it = queue.begin();
00451 V_SerializedMessage::iterator end = queue.end();
00452 for (; it != end; ++it)
00453 {
00454 enqueueMessage(*it);
00455 }
00456 }
00457
00458 bool Publication::validateHeader(const Header& header, std::string& error_msg)
00459 {
00460 std::string md5sum, topic, client_callerid;
00461 if (!header.getValue("md5sum", md5sum)
00462 || !header.getValue("topic", topic)
00463 || !header.getValue("callerid", client_callerid))
00464 {
00465 std::string msg("Header from subscriber did not have the required elements: md5sum, topic, callerid");
00466
00467 ROS_ERROR("%s", msg.c_str());
00468 error_msg = msg;
00469
00470 return false;
00471 }
00472
00473
00474
00475
00476
00477 if(isDropped())
00478 {
00479 std::string msg = std::string("received a tcpros connection for a nonexistent topic [") +
00480 topic + std::string("] from [" + client_callerid +"].");
00481
00482 ROS_ERROR("%s", msg.c_str());
00483 error_msg = msg;
00484
00485 return false;
00486 }
00487
00488 if (getMD5Sum() != md5sum &&
00489 (md5sum != std::string("*") && getMD5Sum() != std::string("*")))
00490 {
00491 std::string datatype;
00492 header.getValue("type", datatype);
00493
00494 std::string msg = std::string("Client [") + client_callerid + std::string("] wants topic ") + topic +
00495 std::string(" to have datatype/md5sum [") + datatype + "/" + md5sum +
00496 std::string("], but our version has [") + getDataType() + "/" + getMD5Sum() +
00497 std::string("]. Dropping connection.");
00498
00499 ROS_ERROR("%s", msg.c_str());
00500 error_msg = msg;
00501
00502 return false;
00503 }
00504
00505 return true;
00506 }
00507
00508 }