00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #include "ros/intraprocess_publisher_link.h"
00036 #include "ros/intraprocess_subscriber_link.h"
00037 #include "ros/subscription.h"
00038 #include "ros/header.h"
00039 #include "ros/connection.h"
00040 #include "ros/transport/transport.h"
00041 #include "ros/this_node.h"
00042 #include "ros/connection_manager.h"
00043 #include "ros/file_log.h"
00044
00045 #include <boost/bind.hpp>
00046
00047 #include <sstream>
00048
00049 namespace ros
00050 {
00051
00052 IntraProcessPublisherLink::IntraProcessPublisherLink(const SubscriptionPtr& parent, const std::string& xmlrpc_uri, const TransportHints& transport_hints)
00053 : PublisherLink(parent, xmlrpc_uri, transport_hints)
00054 , dropped_(false)
00055 {
00056 }
00057
00058 IntraProcessPublisherLink::~IntraProcessPublisherLink()
00059 {
00060 }
00061
00062 void IntraProcessPublisherLink::setPublisher(const IntraProcessSubscriberLinkPtr& publisher)
00063 {
00064 publisher_ = publisher;
00065
00066 SubscriptionPtr parent = parent_.lock();
00067 ROS_ASSERT(parent);
00068
00069 Header header;
00070 M_stringPtr values = header.getValues();
00071 (*values)["callerid"] = this_node::getName();
00072 (*values)["topic"] = parent->getName();
00073 (*values)["type"] = publisher->getDataType();
00074 (*values)["md5sum"] = publisher->getMD5Sum();
00075 (*values)["message_definition"] = publisher->getMessageDefinition();
00076 (*values)["latching"] = publisher->isLatching() ? "1" : "0";
00077 setHeader(header);
00078 }
00079
00080 void IntraProcessPublisherLink::drop()
00081 {
00082 {
00083 boost::recursive_mutex::scoped_lock lock(drop_mutex_);
00084 if (dropped_)
00085 {
00086 return;
00087 }
00088
00089 dropped_ = true;
00090 }
00091
00092 if (publisher_)
00093 {
00094 publisher_->drop();
00095 publisher_.reset();
00096 }
00097
00098 if (SubscriptionPtr parent = parent_.lock())
00099 {
00100 ROSCPP_LOG_DEBUG("Connection to local publisher on topic [%s] dropped", parent->getName().c_str());
00101
00102 parent->removePublisherLink(shared_from_this());
00103 }
00104 }
00105
00106 void IntraProcessPublisherLink::handleMessage(const SerializedMessage& m, bool ser, bool nocopy)
00107 {
00108 boost::recursive_mutex::scoped_lock lock(drop_mutex_);
00109 if (dropped_)
00110 {
00111 return;
00112 }
00113
00114 stats_.bytes_received_ += m.num_bytes;
00115 stats_.messages_received_++;
00116
00117 SubscriptionPtr parent = parent_.lock();
00118
00119 if (parent)
00120 {
00121 stats_.drops_ += parent->handleMessage(m, ser, nocopy, header_.getValues(), shared_from_this());
00122 }
00123 }
00124
00125 std::string IntraProcessPublisherLink::getTransportType()
00126 {
00127 return std::string("INTRAPROCESS");
00128 }
00129
00130 void IntraProcessPublisherLink::getPublishTypes(bool& ser, bool& nocopy, const std::type_info& ti)
00131 {
00132 boost::recursive_mutex::scoped_lock lock(drop_mutex_);
00133 if (dropped_)
00134 {
00135 ser = false;
00136 nocopy = false;
00137 return;
00138 }
00139
00140 SubscriptionPtr parent = parent_.lock();
00141 if (parent)
00142 {
00143 parent->getPublishTypes(ser, nocopy, ti);
00144 }
00145 else
00146 {
00147 ser = true;
00148 nocopy = false;
00149 }
00150 }
00151
00152 }
00153