$search
00001 /* 00002 * Copyright (C) 2008, Morgan Quigley and Willow Garage, Inc. 00003 * 00004 * Redistribution and use in source and binary forms, with or without 00005 * modification, are permitted provided that the following conditions are met: 00006 * * Redistributions of source code must retain the above copyright notice, 00007 * this list of conditions and the following disclaimer. 00008 * * Redistributions in binary form must reproduce the above copyright 00009 * notice, this list of conditions and the following disclaimer in the 00010 * documentation and/or other materials provided with the distribution. 00011 * * Neither the names of Stanford University or Willow Garage, Inc. nor the names of its 00012 * contributors may be used to endorse or promote products derived from 00013 * this software without specific prior written permission. 00014 * 00015 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 00016 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 00017 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 00018 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 00019 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 00020 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 00021 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 00022 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 00023 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 00024 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00025 * POSSIBILITY OF SUCH DAMAGE. 00026 */ 00027 00028 #include "ros/publication.h" 00029 #include "ros/subscriber_link.h" 00030 #include "ros/connection.h" 00031 #include "ros/callback_queue_interface.h" 00032 #include "ros/single_subscriber_publisher.h" 00033 #include "ros/serialization.h" 00034 #include <std_msgs/Header.h> 00035 00036 namespace ros 00037 { 00038 00039 class PeerConnDisconnCallback : public CallbackInterface 00040 { 00041 public: 00042 PeerConnDisconnCallback(const SubscriberStatusCallback& callback, const SubscriberLinkPtr& sub_link, bool use_tracked_object, const VoidConstWPtr& tracked_object) 00043 : callback_(callback) 00044 , sub_link_(sub_link) 00045 , use_tracked_object_(use_tracked_object) 00046 , tracked_object_(tracked_object) 00047 { 00048 } 00049 00050 virtual CallResult call() 00051 { 00052 VoidConstPtr tracker; 00053 if (use_tracked_object_) 00054 { 00055 tracker = tracked_object_.lock(); 00056 00057 if (!tracker) 00058 { 00059 return Invalid; 00060 } 00061 } 00062 00063 SingleSubscriberPublisher pub(sub_link_); 00064 callback_(pub); 00065 00066 return Success; 00067 } 00068 00069 private: 00070 SubscriberStatusCallback callback_; 00071 SubscriberLinkPtr sub_link_; 00072 bool use_tracked_object_; 00073 VoidConstWPtr tracked_object_; 00074 }; 00075 00076 Publication::Publication(const std::string &name, 00077 const std::string &datatype, 00078 const std::string &_md5sum, 00079 const std::string& message_definition, 00080 size_t max_queue, 00081 bool latch, 00082 bool has_header) 00083 : name_(name), 00084 datatype_(datatype), 00085 md5sum_(_md5sum), 00086 message_definition_(message_definition), 00087 max_queue_(max_queue), 00088 seq_(0), 00089 dropped_(false), 00090 latch_(latch), 00091 has_header_(has_header), 00092 intraprocess_subscriber_count_(0) 00093 { 00094 } 00095 00096 Publication::~Publication() 00097 { 00098 drop(); 00099 } 00100 00101 void Publication::addCallbacks(const SubscriberCallbacksPtr& callbacks) 00102 { 00103 boost::mutex::scoped_lock lock(callbacks_mutex_); 00104 00105 callbacks_.push_back(callbacks); 00106 00107 // Add connect callbacks for all current subscriptions if this publisher wants them 00108 if (callbacks->connect_ && callbacks->callback_queue_) 00109 { 00110 boost::mutex::scoped_lock lock(subscriber_links_mutex_); 00111 V_SubscriberLink::iterator it = subscriber_links_.begin(); 00112 V_SubscriberLink::iterator end = subscriber_links_.end(); 00113 for (; it != end; ++it) 00114 { 00115 const SubscriberLinkPtr& sub_link = *it; 00116 CallbackInterfacePtr cb(new PeerConnDisconnCallback(callbacks->connect_, sub_link, callbacks->has_tracked_object_, callbacks->tracked_object_)); 00117 callbacks->callback_queue_->addCallback(cb, (uint64_t)callbacks.get()); 00118 } 00119 } 00120 } 00121 00122 void Publication::removeCallbacks(const SubscriberCallbacksPtr& callbacks) 00123 { 00124 boost::mutex::scoped_lock lock(callbacks_mutex_); 00125 00126 V_Callback::iterator it = std::find(callbacks_.begin(), callbacks_.end(), callbacks); 00127 if (it != callbacks_.end()) 00128 { 00129 const SubscriberCallbacksPtr& cb = *it; 00130 if (cb->callback_queue_) 00131 { 00132 cb->callback_queue_->removeByID((uint64_t)cb.get()); 00133 } 00134 callbacks_.erase(it); 00135 } 00136 } 00137 00138 void Publication::drop() 00139 { 00140 // grab a lock here, to ensure that no subscription callback will 00141 // be invoked after we return 00142 { 00143 boost::mutex::scoped_lock lock(publish_queue_mutex_); 00144 boost::mutex::scoped_lock lock2(subscriber_links_mutex_); 00145 00146 if (dropped_) 00147 { 00148 return; 00149 } 00150 00151 dropped_ = true; 00152 } 00153 00154 dropAllConnections(); 00155 } 00156 00157 bool Publication::enqueueMessage(const SerializedMessage& m) 00158 { 00159 boost::mutex::scoped_lock lock(subscriber_links_mutex_); 00160 if (dropped_) 00161 { 00162 return false; 00163 } 00164 00165 ROS_ASSERT(m.buf); 00166 00167 uint32_t seq = incrementSequence(); 00168 if (has_header_) 00169 { 00170 // If we have a header, we know it's immediately after the message length 00171 // Deserialize it, write the sequence, and then serialize it again. 00172 namespace ser = ros::serialization; 00173 std_msgs::Header header; 00174 ser::IStream istream(m.buf.get() + 4, m.num_bytes - 4); 00175 ser::deserialize(istream, header); 00176 header.seq = seq; 00177 ser::OStream ostream(m.buf.get() + 4, m.num_bytes - 4); 00178 ser::serialize(ostream, header); 00179 } 00180 00181 for(V_SubscriberLink::iterator i = subscriber_links_.begin(); 00182 i != subscriber_links_.end(); ++i) 00183 { 00184 const SubscriberLinkPtr& sub_link = (*i); 00185 sub_link->enqueueMessage(m, true, false); 00186 } 00187 00188 if (latch_) 00189 { 00190 last_message_ = m; 00191 } 00192 00193 return true; 00194 } 00195 00196 void Publication::addSubscriberLink(const SubscriberLinkPtr& sub_link) 00197 { 00198 { 00199 boost::mutex::scoped_lock lock(subscriber_links_mutex_); 00200 00201 if (dropped_) 00202 { 00203 return; 00204 } 00205 00206 subscriber_links_.push_back(sub_link); 00207 00208 if (sub_link->isIntraprocess()) 00209 { 00210 ++intraprocess_subscriber_count_; 00211 } 00212 } 00213 00214 if (latch_ && last_message_.buf) 00215 { 00216 sub_link->enqueueMessage(last_message_, true, true); 00217 } 00218 00219 // This call invokes the subscribe callback if there is one. 00220 // This must happen *after* the push_back above, in case the 00221 // callback uses publish(). 00222 peerConnect(sub_link); 00223 } 00224 00225 void Publication::removeSubscriberLink(const SubscriberLinkPtr& sub_link) 00226 { 00227 SubscriberLinkPtr link; 00228 { 00229 boost::mutex::scoped_lock lock(subscriber_links_mutex_); 00230 00231 if (dropped_) 00232 { 00233 return; 00234 } 00235 00236 if (sub_link->isIntraprocess()) 00237 { 00238 --intraprocess_subscriber_count_; 00239 } 00240 00241 V_SubscriberLink::iterator it = std::find(subscriber_links_.begin(), subscriber_links_.end(), sub_link); 00242 if (it != subscriber_links_.end()) 00243 { 00244 link = *it; 00245 subscriber_links_.erase(it); 00246 } 00247 } 00248 00249 if (link) 00250 { 00251 peerDisconnect(link); 00252 } 00253 } 00254 00255 XmlRpc::XmlRpcValue Publication::getStats() 00256 { 00257 XmlRpc::XmlRpcValue stats; 00258 stats[0] = name_; 00259 XmlRpc::XmlRpcValue conn_data; 00260 conn_data.setSize(0); // force to be an array, even if it's empty 00261 00262 boost::mutex::scoped_lock lock(subscriber_links_mutex_); 00263 00264 uint32_t cidx = 0; 00265 for (V_SubscriberLink::iterator c = subscriber_links_.begin(); 00266 c != subscriber_links_.end(); ++c, cidx++) 00267 { 00268 const SubscriberLink::Stats& s = (*c)->getStats(); 00269 conn_data[cidx][0] = (*c)->getConnectionID(); 00270 // todo: figure out what to do here... the bytes_sent will wrap around 00271 // on some flows within a reasonable amount of time. xmlrpc++ doesn't 00272 // seem to give me a nice way to do 64-bit ints, perhaps that's a 00273 // limitation of xml-rpc, not sure. alternatively we could send the number 00274 // of KB transmitted to gain a few order of magnitude. 00275 conn_data[cidx][1] = (int)s.bytes_sent_; 00276 conn_data[cidx][2] = (int)s.message_data_sent_; 00277 conn_data[cidx][3] = (int)s.messages_sent_; 00278 conn_data[cidx][4] = 0; // not sure what is meant by connected 00279 } 00280 00281 stats[1] = conn_data; 00282 return stats; 00283 } 00284 00285 // rospy returns values like this: 00286 // [(1, "('127.0.0.1', 62334)", 'o', 'TCPROS', '/chatter')] 00287 // 00288 // We're outputting something like this: 00289 // (0, (127.0.0.1, 62707), o, TCPROS, /chatter) 00290 void Publication::getInfo(XmlRpc::XmlRpcValue& info) 00291 { 00292 boost::mutex::scoped_lock lock(subscriber_links_mutex_); 00293 00294 for (V_SubscriberLink::iterator c = subscriber_links_.begin(); 00295 c != subscriber_links_.end(); ++c) 00296 { 00297 XmlRpc::XmlRpcValue curr_info; 00298 curr_info[0] = (int)(*c)->getConnectionID(); 00299 curr_info[1] = (*c)->getDestinationCallerID(); 00300 curr_info[2] = "o"; 00301 curr_info[3] = (*c)->getTransportType(); 00302 curr_info[4] = name_; 00303 info[info.size()] = curr_info; 00304 } 00305 } 00306 00307 void Publication::dropAllConnections() 00308 { 00309 // Swap our publishers list with a local one so we can only lock for a short period of time, because a 00310 // side effect of our calling drop() on connections can be re-locking the publishers mutex 00311 V_SubscriberLink local_publishers; 00312 00313 { 00314 boost::mutex::scoped_lock lock(subscriber_links_mutex_); 00315 00316 local_publishers.swap(subscriber_links_); 00317 } 00318 00319 for (V_SubscriberLink::iterator i = local_publishers.begin(); 00320 i != local_publishers.end(); ++i) 00321 { 00322 (*i)->drop(); 00323 } 00324 } 00325 00326 void Publication::peerConnect(const SubscriberLinkPtr& sub_link) 00327 { 00328 V_Callback::iterator it = callbacks_.begin(); 00329 V_Callback::iterator end = callbacks_.end(); 00330 for (; it != end; ++it) 00331 { 00332 const SubscriberCallbacksPtr& cbs = *it; 00333 if (cbs->connect_ && cbs->callback_queue_) 00334 { 00335 CallbackInterfacePtr cb(new PeerConnDisconnCallback(cbs->connect_, sub_link, cbs->has_tracked_object_, cbs->tracked_object_)); 00336 cbs->callback_queue_->addCallback(cb, (uint64_t)cbs.get()); 00337 } 00338 } 00339 } 00340 00341 void Publication::peerDisconnect(const SubscriberLinkPtr& sub_link) 00342 { 00343 V_Callback::iterator it = callbacks_.begin(); 00344 V_Callback::iterator end = callbacks_.end(); 00345 for (; it != end; ++it) 00346 { 00347 const SubscriberCallbacksPtr& cbs = *it; 00348 if (cbs->disconnect_ && cbs->callback_queue_) 00349 { 00350 CallbackInterfacePtr cb(new PeerConnDisconnCallback(cbs->disconnect_, sub_link, cbs->has_tracked_object_, cbs->tracked_object_)); 00351 cbs->callback_queue_->addCallback(cb, (uint64_t)cbs.get()); 00352 } 00353 } 00354 } 00355 00356 size_t Publication::getNumCallbacks() 00357 { 00358 boost::mutex::scoped_lock lock(callbacks_mutex_); 00359 return callbacks_.size(); 00360 } 00361 00362 uint32_t Publication::incrementSequence() 00363 { 00364 boost::mutex::scoped_lock lock(seq_mutex_); 00365 uint32_t old_seq = seq_; 00366 ++seq_; 00367 00368 return old_seq; 00369 } 00370 00371 uint32_t Publication::getNumSubscribers() 00372 { 00373 boost::mutex::scoped_lock lock(subscriber_links_mutex_); 00374 return (uint32_t)subscriber_links_.size(); 00375 } 00376 00377 void Publication::getPublishTypes(bool& serialize, bool& nocopy, const std::type_info& ti) 00378 { 00379 boost::mutex::scoped_lock lock(subscriber_links_mutex_); 00380 V_SubscriberLink::const_iterator it = subscriber_links_.begin(); 00381 V_SubscriberLink::const_iterator end = subscriber_links_.end(); 00382 for (; it != end; ++it) 00383 { 00384 const SubscriberLinkPtr& sub = *it; 00385 bool s = false; 00386 bool n = false; 00387 sub->getPublishTypes(s, n, ti); 00388 serialize = serialize || s; 00389 nocopy = nocopy || n; 00390 00391 if (serialize && nocopy) 00392 { 00393 break; 00394 } 00395 } 00396 } 00397 00398 bool Publication::hasSubscribers() 00399 { 00400 boost::mutex::scoped_lock lock(subscriber_links_mutex_); 00401 return !subscriber_links_.empty(); 00402 } 00403 00404 void Publication::publish(SerializedMessage& m) 00405 { 00406 if (m.message) 00407 { 00408 boost::mutex::scoped_lock lock(subscriber_links_mutex_); 00409 V_SubscriberLink::const_iterator it = subscriber_links_.begin(); 00410 V_SubscriberLink::const_iterator end = subscriber_links_.end(); 00411 for (; it != end; ++it) 00412 { 00413 const SubscriberLinkPtr& sub = *it; 00414 if (sub->isIntraprocess()) 00415 { 00416 sub->enqueueMessage(m, false, true); 00417 } 00418 } 00419 00420 m.message.reset(); 00421 } 00422 00423 if (m.buf) 00424 { 00425 boost::mutex::scoped_lock lock(publish_queue_mutex_); 00426 publish_queue_.push_back(m); 00427 } 00428 } 00429 00430 void Publication::processPublishQueue() 00431 { 00432 V_SerializedMessage queue; 00433 { 00434 boost::mutex::scoped_lock lock(publish_queue_mutex_); 00435 00436 if (dropped_) 00437 { 00438 return; 00439 } 00440 00441 queue.insert(queue.end(), publish_queue_.begin(), publish_queue_.end()); 00442 publish_queue_.clear(); 00443 } 00444 00445 if (queue.empty()) 00446 { 00447 return; 00448 } 00449 00450 V_SerializedMessage::iterator it = queue.begin(); 00451 V_SerializedMessage::iterator end = queue.end(); 00452 for (; it != end; ++it) 00453 { 00454 enqueueMessage(*it); 00455 } 00456 } 00457 00458 bool Publication::validateHeader(const Header& header, std::string& error_msg) 00459 { 00460 std::string md5sum, topic, client_callerid; 00461 if (!header.getValue("md5sum", md5sum) 00462 || !header.getValue("topic", topic) 00463 || !header.getValue("callerid", client_callerid)) 00464 { 00465 std::string msg("Header from subscriber did not have the required elements: md5sum, topic, callerid"); 00466 00467 ROS_ERROR("%s", msg.c_str()); 00468 error_msg = msg; 00469 00470 return false; 00471 } 00472 00473 // Check whether the topic has been deleted from 00474 // advertised_topics through a call to unadvertise(), which could 00475 // have happened while we were waiting for the subscriber to 00476 // provide the md5sum. 00477 if(isDropped()) 00478 { 00479 std::string msg = std::string("received a tcpros connection for a nonexistent topic [") + 00480 topic + std::string("] from [" + client_callerid +"]."); 00481 00482 ROS_ERROR("%s", msg.c_str()); 00483 error_msg = msg; 00484 00485 return false; 00486 } 00487 00488 if (getMD5Sum() != md5sum && 00489 (md5sum != std::string("*") && getMD5Sum() != std::string("*"))) 00490 { 00491 std::string datatype; 00492 header.getValue("type", datatype); 00493 00494 std::string msg = std::string("Client [") + client_callerid + std::string("] wants topic ") + topic + 00495 std::string(" to have datatype/md5sum [") + datatype + "/" + md5sum + 00496 std::string("], but our version has [") + getDataType() + "/" + getMD5Sum() + 00497 std::string("]. Dropping connection."); 00498 00499 ROS_ERROR("%s", msg.c_str()); 00500 error_msg = msg; 00501 00502 return false; 00503 } 00504 00505 return true; 00506 } 00507 00508 } // namespace ros