publication.cpp
Go to the documentation of this file.
00001 /*
00002  * Copyright (C) 2008, Morgan Quigley and Willow Garage, Inc.
00003  *
00004  * Redistribution and use in source and binary forms, with or without
00005  * modification, are permitted provided that the following conditions are met:
00006  *   * Redistributions of source code must retain the above copyright notice,
00007  *     this list of conditions and the following disclaimer.
00008  *   * Redistributions in binary form must reproduce the above copyright
00009  *     notice, this list of conditions and the following disclaimer in the
00010  *     documentation and/or other materials provided with the distribution.
00011  *   * Neither the names of Stanford University or Willow Garage, Inc. nor the names of its
00012  *     contributors may be used to endorse or promote products derived from
00013  *     this software without specific prior written permission.
00014  *
00015  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00016  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00017  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00018  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
00019  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00020  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00021  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00022  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00023  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00024  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00025  * POSSIBILITY OF SUCH DAMAGE.
00026  */
00027 
00028 #include "ros/publication.h"
00029 #include "ros/subscriber_link.h"
00030 #include "ros/connection.h"
00031 #include "ros/callback_queue_interface.h"
00032 #include "ros/single_subscriber_publisher.h"
00033 #include "ros/serialization.h"
00034 #include <std_msgs/Header.h>
00035 
00036 namespace ros
00037 {
00038 
00039 class PeerConnDisconnCallback : public CallbackInterface
00040 {
00041 public:
00042   PeerConnDisconnCallback(const SubscriberStatusCallback& callback, const SubscriberLinkPtr& sub_link, bool use_tracked_object, const VoidConstWPtr& tracked_object)
00043   : callback_(callback)
00044   , sub_link_(sub_link)
00045   , use_tracked_object_(use_tracked_object)
00046   , tracked_object_(tracked_object)
00047   {
00048   }
00049 
00050   virtual CallResult call()
00051   {
00052     VoidConstPtr tracker;
00053     if (use_tracked_object_)
00054     {
00055       tracker = tracked_object_.lock();
00056 
00057       if (!tracker)
00058       {
00059         return Invalid;
00060       }
00061     }
00062 
00063     SingleSubscriberPublisher pub(sub_link_);
00064     callback_(pub);
00065 
00066     return Success;
00067   }
00068 
00069 private:
00070   SubscriberStatusCallback callback_;
00071   SubscriberLinkPtr sub_link_;
00072   bool use_tracked_object_;
00073   VoidConstWPtr tracked_object_;
00074 };
00075 
00076 Publication::Publication(const std::string &name,
00077                          const std::string &datatype,
00078                          const std::string &_md5sum,
00079                          const std::string& message_definition,
00080                          size_t max_queue,
00081                          bool latch,
00082                          bool has_header)
00083 : name_(name),
00084   datatype_(datatype),
00085   md5sum_(_md5sum),
00086   message_definition_(message_definition),
00087   max_queue_(max_queue),
00088   seq_(0),
00089   dropped_(false),
00090   latch_(latch),
00091   has_header_(has_header),
00092   intraprocess_subscriber_count_(0)
00093 {
00094 }
00095 
00096 Publication::~Publication()
00097 {
00098   drop();
00099 }
00100 
00101 void Publication::addCallbacks(const SubscriberCallbacksPtr& callbacks)
00102 {
00103   boost::mutex::scoped_lock lock(callbacks_mutex_);
00104 
00105   callbacks_.push_back(callbacks);
00106 
00107   // Add connect callbacks for all current subscriptions if this publisher wants them
00108   if (callbacks->connect_ && callbacks->callback_queue_)
00109   {
00110     boost::mutex::scoped_lock lock(subscriber_links_mutex_);
00111     V_SubscriberLink::iterator it = subscriber_links_.begin();
00112     V_SubscriberLink::iterator end = subscriber_links_.end();
00113     for (; it != end; ++it)
00114     {
00115       const SubscriberLinkPtr& sub_link = *it;
00116       CallbackInterfacePtr cb(new PeerConnDisconnCallback(callbacks->connect_, sub_link, callbacks->has_tracked_object_, callbacks->tracked_object_));
00117       callbacks->callback_queue_->addCallback(cb, (uint64_t)callbacks.get());
00118     }
00119   }
00120 }
00121 
00122 void Publication::removeCallbacks(const SubscriberCallbacksPtr& callbacks)
00123 {
00124   boost::mutex::scoped_lock lock(callbacks_mutex_);
00125 
00126   V_Callback::iterator it = std::find(callbacks_.begin(), callbacks_.end(), callbacks);
00127   if (it != callbacks_.end())
00128   {
00129     const SubscriberCallbacksPtr& cb = *it;
00130     if (cb->callback_queue_)
00131     {
00132       cb->callback_queue_->removeByID((uint64_t)cb.get());
00133     }
00134     callbacks_.erase(it);
00135   }
00136 }
00137 
00138 void Publication::drop()
00139 {
00140   // grab a lock here, to ensure that no subscription callback will
00141   // be invoked after we return
00142   {
00143     boost::mutex::scoped_lock lock(publish_queue_mutex_);
00144     boost::mutex::scoped_lock lock2(subscriber_links_mutex_);
00145 
00146     if (dropped_)
00147     {
00148       return;
00149     }
00150 
00151     dropped_ = true;
00152   }
00153 
00154   dropAllConnections();
00155 }
00156 
00157 bool Publication::enqueueMessage(const SerializedMessage& m)
00158 {
00159   boost::mutex::scoped_lock lock(subscriber_links_mutex_);
00160   if (dropped_)
00161   {
00162     return false;
00163   }
00164 
00165   ROS_ASSERT(m.buf);
00166 
00167   uint32_t seq = incrementSequence();
00168   if (has_header_)
00169   {
00170     // If we have a header, we know it's immediately after the message length
00171     // Deserialize it, write the sequence, and then serialize it again.
00172     namespace ser = ros::serialization;
00173     std_msgs::Header header;
00174     ser::IStream istream(m.buf.get() + 4, m.num_bytes - 4);
00175     ser::deserialize(istream, header);
00176     header.seq = seq;
00177     ser::OStream ostream(m.buf.get() + 4, m.num_bytes - 4);
00178     ser::serialize(ostream, header);
00179   }
00180 
00181   for(V_SubscriberLink::iterator i = subscriber_links_.begin();
00182       i != subscriber_links_.end(); ++i)
00183   {
00184     const SubscriberLinkPtr& sub_link = (*i);
00185     sub_link->enqueueMessage(m, true, false);
00186   }
00187 
00188   if (latch_)
00189   {
00190     last_message_ = m;
00191   }
00192 
00193   return true;
00194 }
00195 
00196 void Publication::addSubscriberLink(const SubscriberLinkPtr& sub_link)
00197 {
00198   {
00199     boost::mutex::scoped_lock lock(subscriber_links_mutex_);
00200 
00201     if (dropped_)
00202     {
00203       return;
00204     }
00205 
00206     subscriber_links_.push_back(sub_link);
00207 
00208     if (sub_link->isIntraprocess())
00209     {
00210       ++intraprocess_subscriber_count_;
00211     }
00212   }
00213 
00214   if (latch_ && last_message_.buf)
00215   {
00216     sub_link->enqueueMessage(last_message_, true, true);
00217   }
00218 
00219   // This call invokes the subscribe callback if there is one.
00220   // This must happen *after* the push_back above, in case the
00221   // callback uses publish().
00222   peerConnect(sub_link);
00223 }
00224 
00225 void Publication::removeSubscriberLink(const SubscriberLinkPtr& sub_link)
00226 {
00227   SubscriberLinkPtr link;
00228   {
00229     boost::mutex::scoped_lock lock(subscriber_links_mutex_);
00230 
00231     if (dropped_)
00232     {
00233       return;
00234     }
00235 
00236     if (sub_link->isIntraprocess())
00237     {
00238       --intraprocess_subscriber_count_;
00239     }
00240 
00241     V_SubscriberLink::iterator it = std::find(subscriber_links_.begin(), subscriber_links_.end(), sub_link);
00242     if (it != subscriber_links_.end())
00243     {
00244       link = *it;
00245       subscriber_links_.erase(it);
00246     }
00247   }
00248 
00249   if (link)
00250   {
00251     peerDisconnect(link);
00252   }
00253 }
00254 
00255 XmlRpc::XmlRpcValue Publication::getStats()
00256 {
00257   XmlRpc::XmlRpcValue stats;
00258   stats[0] = name_;
00259   XmlRpc::XmlRpcValue conn_data;
00260   conn_data.setSize(0); // force to be an array, even if it's empty
00261 
00262   boost::mutex::scoped_lock lock(subscriber_links_mutex_);
00263 
00264   uint32_t cidx = 0;
00265   for (V_SubscriberLink::iterator c = subscriber_links_.begin();
00266        c != subscriber_links_.end(); ++c, cidx++)
00267   {
00268     const SubscriberLink::Stats& s = (*c)->getStats();
00269     conn_data[cidx][0] = (*c)->getConnectionID();
00270     // todo: figure out what to do here... the bytes_sent will wrap around
00271     // on some flows within a reasonable amount of time. xmlrpc++ doesn't
00272     // seem to give me a nice way to do 64-bit ints, perhaps that's a
00273     // limitation of xml-rpc, not sure. alternatively we could send the number
00274     // of KB transmitted to gain a few order of magnitude.
00275     conn_data[cidx][1] = (int)s.bytes_sent_;
00276     conn_data[cidx][2] = (int)s.message_data_sent_;
00277     conn_data[cidx][3] = (int)s.messages_sent_;
00278     conn_data[cidx][4] = 0; // not sure what is meant by connected
00279   }
00280 
00281   stats[1] = conn_data;
00282   return stats;
00283 }
00284 
00285 // rospy returns values like this:
00286 // [(1, "('127.0.0.1', 62334)", 'o', 'TCPROS', '/chatter')]
00287 //
00288 // We're outputting something like this:
00289 // (0, (127.0.0.1, 62707), o, TCPROS, /chatter)
00290 void Publication::getInfo(XmlRpc::XmlRpcValue& info)
00291 {
00292   boost::mutex::scoped_lock lock(subscriber_links_mutex_);
00293 
00294   for (V_SubscriberLink::iterator c = subscriber_links_.begin();
00295        c != subscriber_links_.end(); ++c)
00296   {
00297     XmlRpc::XmlRpcValue curr_info;
00298     curr_info[0] = (int)(*c)->getConnectionID();
00299     curr_info[1] = (*c)->getDestinationCallerID();
00300     curr_info[2] = "o";
00301     curr_info[3] = (*c)->getTransportType();
00302     curr_info[4] = name_;
00303     info[info.size()] = curr_info;
00304   }
00305 }
00306 
00307 void Publication::dropAllConnections()
00308 {
00309   // Swap our publishers list with a local one so we can only lock for a short period of time, because a
00310   // side effect of our calling drop() on connections can be re-locking the publishers mutex
00311   V_SubscriberLink local_publishers;
00312 
00313   {
00314     boost::mutex::scoped_lock lock(subscriber_links_mutex_);
00315 
00316     local_publishers.swap(subscriber_links_);
00317   }
00318 
00319   for (V_SubscriberLink::iterator i = local_publishers.begin();
00320            i != local_publishers.end(); ++i)
00321   {
00322     (*i)->drop();
00323   }
00324 }
00325 
00326 void Publication::peerConnect(const SubscriberLinkPtr& sub_link)
00327 {
00328   V_Callback::iterator it = callbacks_.begin();
00329   V_Callback::iterator end = callbacks_.end();
00330   for (; it != end; ++it)
00331   {
00332     const SubscriberCallbacksPtr& cbs = *it;
00333     if (cbs->connect_ && cbs->callback_queue_)
00334     {
00335       CallbackInterfacePtr cb(new PeerConnDisconnCallback(cbs->connect_, sub_link, cbs->has_tracked_object_, cbs->tracked_object_));
00336       cbs->callback_queue_->addCallback(cb, (uint64_t)cbs.get());
00337     }
00338   }
00339 }
00340 
00341 void Publication::peerDisconnect(const SubscriberLinkPtr& sub_link)
00342 {
00343   V_Callback::iterator it = callbacks_.begin();
00344   V_Callback::iterator end = callbacks_.end();
00345   for (; it != end; ++it)
00346   {
00347     const SubscriberCallbacksPtr& cbs = *it;
00348     if (cbs->disconnect_ && cbs->callback_queue_)
00349     {
00350       CallbackInterfacePtr cb(new PeerConnDisconnCallback(cbs->disconnect_, sub_link, cbs->has_tracked_object_, cbs->tracked_object_));
00351       cbs->callback_queue_->addCallback(cb, (uint64_t)cbs.get());
00352     }
00353   }
00354 }
00355 
00356 size_t Publication::getNumCallbacks()
00357 {
00358   boost::mutex::scoped_lock lock(callbacks_mutex_);
00359   return callbacks_.size();
00360 }
00361 
00362 uint32_t Publication::incrementSequence()
00363 {
00364   boost::mutex::scoped_lock lock(seq_mutex_);
00365   uint32_t old_seq = seq_;
00366   ++seq_;
00367 
00368   return old_seq;
00369 }
00370 
00371 uint32_t Publication::getNumSubscribers()
00372 {
00373   boost::mutex::scoped_lock lock(subscriber_links_mutex_);
00374   return (uint32_t)subscriber_links_.size();
00375 }
00376 
00377 void Publication::getPublishTypes(bool& serialize, bool& nocopy, const std::type_info& ti)
00378 {
00379   boost::mutex::scoped_lock lock(subscriber_links_mutex_);
00380   V_SubscriberLink::const_iterator it = subscriber_links_.begin();
00381   V_SubscriberLink::const_iterator end = subscriber_links_.end();
00382   for (; it != end; ++it)
00383   {
00384     const SubscriberLinkPtr& sub = *it;
00385     bool s = false;
00386     bool n = false;
00387     sub->getPublishTypes(s, n, ti);
00388     serialize = serialize || s;
00389     nocopy = nocopy || n;
00390 
00391     if (serialize && nocopy)
00392     {
00393       break;
00394     }
00395   }
00396 }
00397 
00398 bool Publication::hasSubscribers()
00399 {
00400   boost::mutex::scoped_lock lock(subscriber_links_mutex_);
00401   return !subscriber_links_.empty();
00402 }
00403 
00404 void Publication::publish(SerializedMessage& m)
00405 {
00406   if (m.message)
00407   {
00408     boost::mutex::scoped_lock lock(subscriber_links_mutex_);
00409     V_SubscriberLink::const_iterator it = subscriber_links_.begin();
00410     V_SubscriberLink::const_iterator end = subscriber_links_.end();
00411     for (; it != end; ++it)
00412     {
00413       const SubscriberLinkPtr& sub = *it;
00414       if (sub->isIntraprocess())
00415       {
00416         sub->enqueueMessage(m, false, true);
00417       }
00418     }
00419 
00420     m.message.reset();
00421   }
00422 
00423   if (m.buf)
00424   {
00425     boost::mutex::scoped_lock lock(publish_queue_mutex_);
00426     publish_queue_.push_back(m);
00427   }
00428 }
00429 
00430 void Publication::processPublishQueue()
00431 {
00432   V_SerializedMessage queue;
00433   {
00434     boost::mutex::scoped_lock lock(publish_queue_mutex_);
00435 
00436     if (dropped_)
00437     {
00438       return;
00439     }
00440 
00441     queue.insert(queue.end(), publish_queue_.begin(), publish_queue_.end());
00442     publish_queue_.clear();
00443   }
00444 
00445   if (queue.empty())
00446   {
00447     return;
00448   }
00449 
00450   V_SerializedMessage::iterator it = queue.begin();
00451   V_SerializedMessage::iterator end = queue.end();
00452   for (; it != end; ++it)
00453   {
00454     enqueueMessage(*it);
00455   }
00456 }
00457 
00458 bool Publication::validateHeader(const Header& header, std::string& error_msg)
00459 {
00460   std::string md5sum, topic, client_callerid;
00461   if (!header.getValue("md5sum", md5sum)
00462    || !header.getValue("topic", topic)
00463    || !header.getValue("callerid", client_callerid))
00464   {
00465     std::string msg("Header from subscriber did not have the required elements: md5sum, topic, callerid");
00466 
00467     ROS_ERROR("%s", msg.c_str());
00468     error_msg = msg;
00469 
00470     return false;
00471   }
00472 
00473   // Check whether the topic has been deleted from
00474   // advertised_topics through a call to unadvertise(), which could
00475   // have happened while we were waiting for the subscriber to
00476   // provide the md5sum.
00477   if(isDropped())
00478   {
00479     std::string msg = std::string("received a tcpros connection for a nonexistent topic [") +
00480                 topic + std::string("] from [" + client_callerid +"].");
00481 
00482     ROS_ERROR("%s", msg.c_str());
00483     error_msg = msg;
00484 
00485     return false;
00486   }
00487 
00488   if (getMD5Sum() != md5sum &&
00489       (md5sum != std::string("*") && getMD5Sum() != std::string("*")))
00490   {
00491     std::string datatype;
00492     header.getValue("type", datatype);
00493 
00494     std::string msg = std::string("Client [") + client_callerid + std::string("] wants topic ") + topic +
00495                       std::string(" to have datatype/md5sum [") + datatype + "/" + md5sum +
00496                       std::string("], but our version has [") + getDataType() + "/" + getMD5Sum() +
00497                       std::string("]. Dropping connection.");
00498 
00499     ROS_ERROR("%s", msg.c_str());
00500     error_msg = msg;
00501 
00502     return false;
00503   }
00504 
00505   return true;
00506 }
00507 
00508 } // namespace ros


roscpp
Author(s): Morgan Quigley mquigley@cs.stanford.edu, Josh Faust jfaust@willowgarage.com, Brian Gerkey gerkey@willowgarage.com, Troy Straszheim straszheim@willowgarage.com
autogenerated on Sat Dec 28 2013 17:35:52