publication.cpp
Go to the documentation of this file.
00001 /*
00002  * Copyright (C) 2008, Morgan Quigley and Willow Garage, Inc.
00003  *
00004  * Redistribution and use in source and binary forms, with or without
00005  * modification, are permitted provided that the following conditions are met:
00006  *   * Redistributions of source code must retain the above copyright notice,
00007  *     this list of conditions and the following disclaimer.
00008  *   * Redistributions in binary form must reproduce the above copyright
00009  *     notice, this list of conditions and the following disclaimer in the
00010  *     documentation and/or other materials provided with the distribution.
00011  *   * Neither the names of Stanford University or Willow Garage, Inc. nor the names of its
00012  *     contributors may be used to endorse or promote products derived from
00013  *     this software without specific prior written permission.
00014  *
00015  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00016  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00017  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00018  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
00019  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00020  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00021  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00022  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00023  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00024  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00025  * POSSIBILITY OF SUCH DAMAGE.
00026  */
00027 
00028 #include "ros/publication.h"
00029 #include "ros/subscriber_link.h"
00030 #include "ros/connection.h"
00031 #include "ros/callback_queue_interface.h"
00032 #include "ros/single_subscriber_publisher.h"
00033 #include "ros/serialization.h"
00034 #include <std_msgs/Header.h>
00035 
00036 namespace ros
00037 {
00038 
00039 class PeerConnDisconnCallback : public CallbackInterface
00040 {
00041 public:
00042   PeerConnDisconnCallback(const SubscriberStatusCallback& callback, const SubscriberLinkPtr& sub_link, bool use_tracked_object, const VoidConstWPtr& tracked_object)
00043   : callback_(callback)
00044   , sub_link_(sub_link)
00045   , use_tracked_object_(use_tracked_object)
00046   , tracked_object_(tracked_object)
00047   {
00048   }
00049 
00050   virtual CallResult call()
00051   {
00052     VoidConstPtr tracker;
00053     if (use_tracked_object_)
00054     {
00055       tracker = tracked_object_.lock();
00056 
00057       if (!tracker)
00058       {
00059         return Invalid;
00060       }
00061     }
00062 
00063     SingleSubscriberPublisher pub(sub_link_);
00064     callback_(pub);
00065 
00066     return Success;
00067   }
00068 
00069 private:
00070   SubscriberStatusCallback callback_;
00071   SubscriberLinkPtr sub_link_;
00072   bool use_tracked_object_;
00073   VoidConstWPtr tracked_object_;
00074 };
00075 
00076 Publication::Publication(const std::string &name,
00077                          const std::string &datatype,
00078                          const std::string &_md5sum,
00079                          const std::string& message_definition,
00080                          size_t max_queue,
00081                          bool latch,
00082                          bool has_header)
00083 : name_(name),
00084   datatype_(datatype),
00085   md5sum_(_md5sum),
00086   message_definition_(message_definition),
00087   max_queue_(max_queue),
00088   seq_(0),
00089   dropped_(false),
00090   latch_(latch),
00091   has_header_(has_header),
00092   intraprocess_subscriber_count_(0)
00093 {
00094 }
00095 
00096 Publication::~Publication()
00097 {
00098   drop();
00099 }
00100 
00101 void Publication::addCallbacks(const SubscriberCallbacksPtr& callbacks)
00102 {
00103   boost::mutex::scoped_lock lock(callbacks_mutex_);
00104 
00105   callbacks_.push_back(callbacks);
00106 
00107   // Add connect callbacks for all current subscriptions if this publisher wants them
00108   if (callbacks->connect_ && callbacks->callback_queue_)
00109   {
00110     boost::mutex::scoped_lock lock(subscriber_links_mutex_);
00111     V_SubscriberLink::iterator it = subscriber_links_.begin();
00112     V_SubscriberLink::iterator end = subscriber_links_.end();
00113     for (; it != end; ++it)
00114     {
00115       const SubscriberLinkPtr& sub_link = *it;
00116       CallbackInterfacePtr cb(boost::make_shared<PeerConnDisconnCallback>(callbacks->connect_, sub_link, callbacks->has_tracked_object_, callbacks->tracked_object_));
00117       callbacks->callback_queue_->addCallback(cb, (uint64_t)callbacks.get());
00118     }
00119   }
00120 }
00121 
00122 void Publication::removeCallbacks(const SubscriberCallbacksPtr& callbacks)
00123 {
00124   boost::mutex::scoped_lock lock(callbacks_mutex_);
00125 
00126   V_Callback::iterator it = std::find(callbacks_.begin(), callbacks_.end(), callbacks);
00127   if (it != callbacks_.end())
00128   {
00129     const SubscriberCallbacksPtr& cb = *it;
00130     if (cb->callback_queue_)
00131     {
00132       cb->callback_queue_->removeByID((uint64_t)cb.get());
00133     }
00134     callbacks_.erase(it);
00135   }
00136 }
00137 
00138 void Publication::drop()
00139 {
00140   // grab a lock here, to ensure that no subscription callback will
00141   // be invoked after we return
00142   {
00143     boost::mutex::scoped_lock lock(publish_queue_mutex_);
00144     boost::mutex::scoped_lock lock2(subscriber_links_mutex_);
00145 
00146     if (dropped_)
00147     {
00148       return;
00149     }
00150 
00151     dropped_ = true;
00152   }
00153 
00154   dropAllConnections();
00155 }
00156 
00157 bool Publication::enqueueMessage(const SerializedMessage& m)
00158 {
00159   boost::mutex::scoped_lock lock(subscriber_links_mutex_);
00160   if (dropped_)
00161   {
00162     return false;
00163   }
00164 
00165   ROS_ASSERT(m.buf);
00166 
00167   uint32_t seq = incrementSequence();
00168   if (has_header_)
00169   {
00170     // If we have a header, we know it's immediately after the message length
00171     // Deserialize it, write the sequence, and then serialize it again.
00172     namespace ser = ros::serialization;
00173     std_msgs::Header header;
00174     ser::IStream istream(m.buf.get() + 4, m.num_bytes - 4);
00175     ser::deserialize(istream, header);
00176     header.seq = seq;
00177     ser::OStream ostream(m.buf.get() + 4, m.num_bytes - 4);
00178     ser::serialize(ostream, header);
00179   }
00180 
00181   for(V_SubscriberLink::iterator i = subscriber_links_.begin();
00182       i != subscriber_links_.end(); ++i)
00183   {
00184     const SubscriberLinkPtr& sub_link = (*i);
00185     sub_link->enqueueMessage(m, true, false);
00186   }
00187 
00188   if (latch_)
00189   {
00190     last_message_ = m;
00191   }
00192 
00193   return true;
00194 }
00195 
00196 void Publication::addSubscriberLink(const SubscriberLinkPtr& sub_link)
00197 {
00198   {
00199     boost::mutex::scoped_lock lock(subscriber_links_mutex_);
00200 
00201     if (dropped_)
00202     {
00203       return;
00204     }
00205 
00206     subscriber_links_.push_back(sub_link);
00207 
00208     if (sub_link->isIntraprocess())
00209     {
00210       ++intraprocess_subscriber_count_;
00211     }
00212   }
00213 
00214   if (latch_ && last_message_.buf)
00215   {
00216     sub_link->enqueueMessage(last_message_, true, true);
00217   }
00218 
00219   // This call invokes the subscribe callback if there is one.
00220   // This must happen *after* the push_back above, in case the
00221   // callback uses publish().
00222   peerConnect(sub_link);
00223 }
00224 
00225 void Publication::removeSubscriberLink(const SubscriberLinkPtr& sub_link)
00226 {
00227   SubscriberLinkPtr link;
00228   {
00229     boost::mutex::scoped_lock lock(subscriber_links_mutex_);
00230 
00231     if (dropped_)
00232     {
00233       return;
00234     }
00235 
00236     if (sub_link->isIntraprocess())
00237     {
00238       --intraprocess_subscriber_count_;
00239     }
00240 
00241     V_SubscriberLink::iterator it = std::find(subscriber_links_.begin(), subscriber_links_.end(), sub_link);
00242     if (it != subscriber_links_.end())
00243     {
00244       link = *it;
00245       subscriber_links_.erase(it);
00246     }
00247   }
00248 
00249   if (link)
00250   {
00251     peerDisconnect(link);
00252   }
00253 }
00254 
00255 XmlRpc::XmlRpcValue Publication::getStats()
00256 {
00257   XmlRpc::XmlRpcValue stats;
00258   stats[0] = name_;
00259   XmlRpc::XmlRpcValue conn_data;
00260   conn_data.setSize(0); // force to be an array, even if it's empty
00261 
00262   boost::mutex::scoped_lock lock(subscriber_links_mutex_);
00263 
00264   uint32_t cidx = 0;
00265   for (V_SubscriberLink::iterator c = subscriber_links_.begin();
00266        c != subscriber_links_.end(); ++c, cidx++)
00267   {
00268     const SubscriberLink::Stats& s = (*c)->getStats();
00269     conn_data[cidx][0] = (*c)->getConnectionID();
00270     // todo: figure out what to do here... the bytes_sent will wrap around
00271     // on some flows within a reasonable amount of time. xmlrpc++ doesn't
00272     // seem to give me a nice way to do 64-bit ints, perhaps that's a
00273     // limitation of xml-rpc, not sure. alternatively we could send the number
00274     // of KB transmitted to gain a few order of magnitude.
00275     conn_data[cidx][1] = (int)s.bytes_sent_;
00276     conn_data[cidx][2] = (int)s.message_data_sent_;
00277     conn_data[cidx][3] = (int)s.messages_sent_;
00278     conn_data[cidx][4] = 0; // not sure what is meant by connected
00279   }
00280 
00281   stats[1] = conn_data;
00282   return stats;
00283 }
00284 
00285 // Publisher : [(connection_id, destination_caller_id, direction, transport, topic_name, connected, connection_info_string)*]
00286 // e.g. [(2, '/listener', 'o', 'TCPROS', '/chatter', 1, 'TCPROS connection on port 55878 to [127.0.0.1:44273 on socket 7]')]
00287 void Publication::getInfo(XmlRpc::XmlRpcValue& info)
00288 {
00289   boost::mutex::scoped_lock lock(subscriber_links_mutex_);
00290 
00291   for (V_SubscriberLink::iterator c = subscriber_links_.begin();
00292        c != subscriber_links_.end(); ++c)
00293   {
00294     XmlRpc::XmlRpcValue curr_info;
00295     curr_info[0] = (int)(*c)->getConnectionID();
00296     curr_info[1] = (*c)->getDestinationCallerID();
00297     curr_info[2] = "o";
00298     curr_info[3] = (*c)->getTransportType();
00299     curr_info[4] = name_;
00300     curr_info[5] = true; // For length compatibility with rospy
00301     curr_info[6] = (*c)->getTransportInfo();
00302     info[info.size()] = curr_info;
00303   }
00304 }
00305 
00306 void Publication::dropAllConnections()
00307 {
00308   // Swap our publishers list with a local one so we can only lock for a short period of time, because a
00309   // side effect of our calling drop() on connections can be re-locking the publishers mutex
00310   V_SubscriberLink local_publishers;
00311 
00312   {
00313     boost::mutex::scoped_lock lock(subscriber_links_mutex_);
00314 
00315     local_publishers.swap(subscriber_links_);
00316   }
00317 
00318   for (V_SubscriberLink::iterator i = local_publishers.begin();
00319            i != local_publishers.end(); ++i)
00320   {
00321     (*i)->drop();
00322   }
00323 }
00324 
00325 void Publication::peerConnect(const SubscriberLinkPtr& sub_link)
00326 {
00327   V_Callback::iterator it = callbacks_.begin();
00328   V_Callback::iterator end = callbacks_.end();
00329   for (; it != end; ++it)
00330   {
00331     const SubscriberCallbacksPtr& cbs = *it;
00332     if (cbs->connect_ && cbs->callback_queue_)
00333     {
00334       CallbackInterfacePtr cb(boost::make_shared<PeerConnDisconnCallback>(cbs->connect_, sub_link, cbs->has_tracked_object_, cbs->tracked_object_));
00335       cbs->callback_queue_->addCallback(cb, (uint64_t)cbs.get());
00336     }
00337   }
00338 }
00339 
00340 void Publication::peerDisconnect(const SubscriberLinkPtr& sub_link)
00341 {
00342   V_Callback::iterator it = callbacks_.begin();
00343   V_Callback::iterator end = callbacks_.end();
00344   for (; it != end; ++it)
00345   {
00346     const SubscriberCallbacksPtr& cbs = *it;
00347     if (cbs->disconnect_ && cbs->callback_queue_)
00348     {
00349       CallbackInterfacePtr cb(boost::make_shared<PeerConnDisconnCallback>(cbs->disconnect_, sub_link, cbs->has_tracked_object_, cbs->tracked_object_));
00350       cbs->callback_queue_->addCallback(cb, (uint64_t)cbs.get());
00351     }
00352   }
00353 }
00354 
00355 size_t Publication::getNumCallbacks()
00356 {
00357   boost::mutex::scoped_lock lock(callbacks_mutex_);
00358   return callbacks_.size();
00359 }
00360 
00361 uint32_t Publication::incrementSequence()
00362 {
00363   boost::mutex::scoped_lock lock(seq_mutex_);
00364   uint32_t old_seq = seq_;
00365   ++seq_;
00366 
00367   return old_seq;
00368 }
00369 
00370 uint32_t Publication::getNumSubscribers()
00371 {
00372   boost::mutex::scoped_lock lock(subscriber_links_mutex_);
00373   return (uint32_t)subscriber_links_.size();
00374 }
00375 
00376 void Publication::getPublishTypes(bool& serialize, bool& nocopy, const std::type_info& ti)
00377 {
00378   boost::mutex::scoped_lock lock(subscriber_links_mutex_);
00379   V_SubscriberLink::const_iterator it = subscriber_links_.begin();
00380   V_SubscriberLink::const_iterator end = subscriber_links_.end();
00381   for (; it != end; ++it)
00382   {
00383     const SubscriberLinkPtr& sub = *it;
00384     bool s = false;
00385     bool n = false;
00386     sub->getPublishTypes(s, n, ti);
00387     serialize = serialize || s;
00388     nocopy = nocopy || n;
00389 
00390     if (serialize && nocopy)
00391     {
00392       break;
00393     }
00394   }
00395 }
00396 
00397 bool Publication::hasSubscribers()
00398 {
00399   boost::mutex::scoped_lock lock(subscriber_links_mutex_);
00400   return !subscriber_links_.empty();
00401 }
00402 
00403 void Publication::publish(SerializedMessage& m)
00404 {
00405   if (m.message)
00406   {
00407     boost::mutex::scoped_lock lock(subscriber_links_mutex_);
00408     V_SubscriberLink::const_iterator it = subscriber_links_.begin();
00409     V_SubscriberLink::const_iterator end = subscriber_links_.end();
00410     for (; it != end; ++it)
00411     {
00412       const SubscriberLinkPtr& sub = *it;
00413       if (sub->isIntraprocess())
00414       {
00415         sub->enqueueMessage(m, false, true);
00416       }
00417     }
00418 
00419     m.message.reset();
00420   }
00421 
00422   if (m.buf)
00423   {
00424     boost::mutex::scoped_lock lock(publish_queue_mutex_);
00425     publish_queue_.push_back(m);
00426   }
00427 }
00428 
00429 void Publication::processPublishQueue()
00430 {
00431   V_SerializedMessage queue;
00432   {
00433     boost::mutex::scoped_lock lock(publish_queue_mutex_);
00434 
00435     if (dropped_)
00436     {
00437       return;
00438     }
00439 
00440     queue.insert(queue.end(), publish_queue_.begin(), publish_queue_.end());
00441     publish_queue_.clear();
00442   }
00443 
00444   if (queue.empty())
00445   {
00446     return;
00447   }
00448 
00449   V_SerializedMessage::iterator it = queue.begin();
00450   V_SerializedMessage::iterator end = queue.end();
00451   for (; it != end; ++it)
00452   {
00453     enqueueMessage(*it);
00454   }
00455 }
00456 
00457 bool Publication::validateHeader(const Header& header, std::string& error_msg)
00458 {
00459   std::string md5sum, topic, client_callerid;
00460   if (!header.getValue("md5sum", md5sum)
00461    || !header.getValue("topic", topic)
00462    || !header.getValue("callerid", client_callerid))
00463   {
00464     std::string msg("Header from subscriber did not have the required elements: md5sum, topic, callerid");
00465 
00466     ROS_ERROR("%s", msg.c_str());
00467     error_msg = msg;
00468 
00469     return false;
00470   }
00471 
00472   // Check whether the topic has been deleted from
00473   // advertised_topics through a call to unadvertise(), which could
00474   // have happened while we were waiting for the subscriber to
00475   // provide the md5sum.
00476   if(isDropped())
00477   {
00478     std::string msg = std::string("received a tcpros connection for a nonexistent topic [") +
00479                 topic + std::string("] from [" + client_callerid +"].");
00480 
00481     ROS_ERROR("%s", msg.c_str());
00482     error_msg = msg;
00483 
00484     return false;
00485   }
00486 
00487   if (getMD5Sum() != md5sum &&
00488       (md5sum != std::string("*") && getMD5Sum() != std::string("*")))
00489   {
00490     std::string datatype;
00491     header.getValue("type", datatype);
00492 
00493     std::string msg = std::string("Client [") + client_callerid + std::string("] wants topic ") + topic +
00494                       std::string(" to have datatype/md5sum [") + datatype + "/" + md5sum +
00495                       std::string("], but our version has [") + getDataType() + "/" + getMD5Sum() +
00496                       std::string("]. Dropping connection.");
00497 
00498     ROS_ERROR("%s", msg.c_str());
00499     error_msg = msg;
00500 
00501     return false;
00502   }
00503 
00504   return true;
00505 }
00506 
00507 } // namespace ros


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim
autogenerated on Tue Mar 7 2017 03:44:47