intraprocess_publisher_link.cpp
Go to the documentation of this file.
00001 /*
00002  * Software License Agreement (BSD License)
00003  *
00004  *  Copyright (c) 2008, Willow Garage, Inc.
00005  *  All rights reserved.
00006  *
00007  *  Redistribution and use in source and binary forms, with or without
00008  *  modification, are permitted provided that the following conditions
00009  *  are met:
00010  *
00011  *   * Redistributions of source code must retain the above copyright
00012  *     notice, this list of conditions and the following disclaimer.
00013  *   * Redistributions in binary form must reproduce the above
00014  *     copyright notice, this list of conditions and the following
00015  *     disclaimer in the documentation and/or other materials provided
00016  *     with the distribution.
00017  *   * Neither the name of Willow Garage, Inc. nor the names of its
00018  *     contributors may be used to endorse or promote products derived
00019  *     from this software without specific prior written permission.
00020  *
00021  *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022  *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023  *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024  *  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025  *  COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026  *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027  *  BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028  *  LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029  *  CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030  *  LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031  *  ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032  *  POSSIBILITY OF SUCH DAMAGE.
00033  */
00034 
00035 #include "ros/intraprocess_publisher_link.h"
00036 #include "ros/intraprocess_subscriber_link.h"
00037 #include "ros/subscription.h"
00038 #include "ros/header.h"
00039 #include "ros/connection.h"
00040 #include "ros/transport/transport.h"
00041 #include "ros/this_node.h"
00042 #include "ros/connection_manager.h"
00043 #include "ros/file_log.h"
00044 
00045 #include <boost/bind.hpp>
00046 
00047 #include <sstream>
00048 
00049 namespace ros
00050 {
00051 
00052 IntraProcessPublisherLink::IntraProcessPublisherLink(const SubscriptionPtr& parent, const std::string& xmlrpc_uri, const TransportHints& transport_hints)
00053 : PublisherLink(parent, xmlrpc_uri, transport_hints)
00054 , dropped_(false)
00055 {
00056 }
00057 
00058 IntraProcessPublisherLink::~IntraProcessPublisherLink()
00059 {
00060 }
00061 
00062 void IntraProcessPublisherLink::setPublisher(const IntraProcessSubscriberLinkPtr& publisher)
00063 {
00064   publisher_ = publisher;
00065 
00066   SubscriptionPtr parent = parent_.lock();
00067   ROS_ASSERT(parent);
00068 
00069   Header header;
00070   M_stringPtr values = header.getValues();
00071   (*values)["callerid"] = this_node::getName();
00072   (*values)["topic"] = parent->getName();
00073   (*values)["type"] = publisher->getDataType();
00074   (*values)["md5sum"] = publisher->getMD5Sum();
00075   (*values)["message_definition"] = publisher->getMessageDefinition();
00076   (*values)["latching"] = publisher->isLatching() ? "1" : "0";
00077   setHeader(header);
00078 }
00079 
00080 void IntraProcessPublisherLink::drop()
00081 {
00082   {
00083     boost::recursive_mutex::scoped_lock lock(drop_mutex_);
00084     if (dropped_)
00085     {
00086       return;
00087     }
00088 
00089     dropped_ = true;
00090   }
00091 
00092   if (publisher_)
00093   {
00094     publisher_->drop();
00095     publisher_.reset();
00096   }
00097 
00098   if (SubscriptionPtr parent = parent_.lock())
00099   {
00100     ROSCPP_LOG_DEBUG("Connection to local publisher on topic [%s] dropped", parent->getName().c_str());
00101 
00102     parent->removePublisherLink(shared_from_this());
00103   }
00104 }
00105 
00106 void IntraProcessPublisherLink::handleMessage(const SerializedMessage& m, bool ser, bool nocopy)
00107 {
00108   boost::recursive_mutex::scoped_lock lock(drop_mutex_);
00109   if (dropped_)
00110   {
00111     return;
00112   }
00113 
00114   stats_.bytes_received_ += m.num_bytes;
00115   stats_.messages_received_++;
00116 
00117   SubscriptionPtr parent = parent_.lock();
00118 
00119   if (parent)
00120   {
00121     stats_.drops_ += parent->handleMessage(m, ser, nocopy, header_.getValues(), shared_from_this());
00122   }
00123 }
00124 
00125 std::string IntraProcessPublisherLink::getTransportType()
00126 {
00127   return std::string("INTRAPROCESS");
00128 }
00129 
00130 void IntraProcessPublisherLink::getPublishTypes(bool& ser, bool& nocopy, const std::type_info& ti)
00131 {
00132   boost::recursive_mutex::scoped_lock lock(drop_mutex_);
00133   if (dropped_)
00134   {
00135     ser = false;
00136     nocopy = false;
00137     return;
00138   }
00139 
00140   SubscriptionPtr parent = parent_.lock();
00141   if (parent)
00142   {
00143     parent->getPublishTypes(ser, nocopy, ti);
00144   }
00145   else
00146   {
00147     ser = true;
00148     nocopy = false;
00149   }
00150 }
00151 
00152 } // namespace ros
00153 


roscpp
Author(s): Morgan Quigley mquigley@cs.stanford.edu, Josh Faust jfaust@willowgarage.com, Brian Gerkey gerkey@willowgarage.com, Troy Straszheim straszheim@willowgarage.com
autogenerated on Sat Dec 28 2013 17:35:52