00001 00002 /* 00003 * Copyright (C) 2008, Morgan Quigley and Willow Garage, Inc. 00004 * 00005 * Redistribution and use in source and binary forms, with or without 00006 * modification, are permitted provided that the following conditions are met: 00007 * * Redistributions of source code must retain the above copyright notice, 00008 * this list of conditions and the following disclaimer. 00009 * * Redistributions in binary form must reproduce the above copyright 00010 * notice, this list of conditions and the following disclaimer in the 00011 * documentation and/or other materials provided with the distribution. 00012 * * Neither the names of Stanford University or Willow Garage, Inc. nor the names of its 00013 * contributors may be used to endorse or promote products derived from 00014 * this software without specific prior written permission. 00015 * 00016 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 00017 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 00018 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 00019 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 00020 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 00021 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 00022 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 00023 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 00024 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 00025 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00026 * POSSIBILITY OF SUCH DAMAGE. 00027 */ 00028 00029 #include "ros/intraprocess_subscriber_link.h" 00030 #include "ros/intraprocess_publisher_link.h" 00031 #include "ros/publication.h" 00032 #include "ros/header.h" 00033 #include "ros/connection.h" 00034 #include "ros/transport/transport.h" 00035 #include "ros/this_node.h" 00036 #include "ros/connection_manager.h" 00037 #include "ros/topic_manager.h" 00038 #include "ros/file_log.h" 00039 00040 #include <boost/bind.hpp> 00041 00042 namespace ros 00043 { 00044 00045 IntraProcessSubscriberLink::IntraProcessSubscriberLink(const PublicationPtr& parent) 00046 : dropped_(false) 00047 { 00048 ROS_ASSERT(parent); 00049 parent_ = parent; 00050 topic_ = parent->getName(); 00051 } 00052 00053 IntraProcessSubscriberLink::~IntraProcessSubscriberLink() 00054 { 00055 } 00056 00057 void IntraProcessSubscriberLink::setSubscriber(const IntraProcessPublisherLinkPtr& subscriber) 00058 { 00059 subscriber_ = subscriber; 00060 connection_id_ = ConnectionManager::instance()->getNewConnectionID(); 00061 destination_caller_id_ = this_node::getName(); 00062 } 00063 00064 bool IntraProcessSubscriberLink::isLatching() 00065 { 00066 if (PublicationPtr parent = parent_.lock()) 00067 { 00068 return parent->isLatching(); 00069 } 00070 00071 return false; 00072 } 00073 00074 void IntraProcessSubscriberLink::enqueueMessage(const SerializedMessage& m, bool ser, bool nocopy) 00075 { 00076 boost::recursive_mutex::scoped_lock lock(drop_mutex_); 00077 if (dropped_) 00078 { 00079 return; 00080 } 00081 00082 ROS_ASSERT(subscriber_); 00083 subscriber_->handleMessage(m, ser, nocopy); 00084 } 00085 00086 std::string IntraProcessSubscriberLink::getTransportType() 00087 { 00088 return std::string("INTRAPROCESS"); 00089 } 00090 00091 void IntraProcessSubscriberLink::drop() 00092 { 00093 { 00094 boost::recursive_mutex::scoped_lock lock(drop_mutex_); 00095 if (dropped_) 00096 { 00097 return; 00098 } 00099 00100 dropped_ = true; 00101 } 00102 00103 if (subscriber_) 00104 { 00105 subscriber_->drop(); 00106 subscriber_.reset(); 00107 } 00108 00109 if (PublicationPtr parent = parent_.lock()) 00110 { 00111 ROSCPP_LOG_DEBUG("Connection to local subscriber on topic [%s] dropped", topic_.c_str()); 00112 00113 parent->removeSubscriberLink(shared_from_this()); 00114 } 00115 } 00116 00117 void IntraProcessSubscriberLink::getPublishTypes(bool& ser, bool& nocopy, const std::type_info& ti) 00118 { 00119 boost::recursive_mutex::scoped_lock lock(drop_mutex_); 00120 if (dropped_) 00121 { 00122 return; 00123 } 00124 00125 subscriber_->getPublishTypes(ser, nocopy, ti); 00126 } 00127 00128 } // namespace ros