00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 
00031 
00032 
00033 
00034 
00035 #include "ros/service_client_link.h"
00036 #include "ros/service_publication.h"
00037 #include "ros/header.h"
00038 #include "ros/connection.h"
00039 #include "ros/service_manager.h"
00040 #include "ros/transport/transport.h"
00041 #include "ros/this_node.h"
00042 #include "ros/file_log.h"
00043 
00044 #include <boost/bind.hpp>
00045 
00046 namespace ros
00047 {
00048 
00049 ServiceClientLink::ServiceClientLink()
00050 : persistent_(false)
00051 {
00052 }
00053 
00054 ServiceClientLink::~ServiceClientLink()
00055 {
00056   if (connection_)
00057   {
00058     if (connection_->isSendingHeaderError())
00059     {
00060       connection_->removeDropListener(dropped_conn_);
00061     }
00062     else
00063     {
00064       connection_->drop(Connection::Destructing);
00065     }
00066   }
00067 }
00068 
00069 bool ServiceClientLink::initialize(const ConnectionPtr& connection)
00070 {
00071   connection_ = connection;
00072   dropped_conn_ = connection_->addDropListener(boost::bind(&ServiceClientLink::onConnectionDropped, this, _1));
00073 
00074   return true;
00075 }
00076 
00077 bool ServiceClientLink::handleHeader(const Header& header)
00078 {
00079   std::string md5sum, service, client_callerid;
00080   if (!header.getValue("md5sum", md5sum)
00081    || !header.getValue("service", service)
00082    || !header.getValue("callerid", client_callerid))
00083   {
00084     std::string msg("bogus tcpros header. did not have the "
00085                           "required elements: md5sum, service, callerid");
00086 
00087     ROS_ERROR("%s", msg.c_str());
00088     connection_->sendHeaderError(msg);
00089 
00090     return false;
00091   }
00092 
00093   std::string persistent;
00094   if (header.getValue("persistent", persistent))
00095   {
00096     if (persistent == "1" || persistent == "true")
00097     {
00098       persistent_ = true;
00099     }
00100   }
00101 
00102   ROSCPP_LOG_DEBUG("Service client [%s] wants service [%s] with md5sum [%s]", client_callerid.c_str(), service.c_str(), md5sum.c_str());
00103   ServicePublicationPtr ss = ServiceManager::instance()->lookupServicePublication(service);
00104   if (!ss)
00105   {
00106     std::string msg = std::string("received a tcpros connection for a "
00107                              "nonexistent service [") +
00108             service + std::string("].");
00109 
00110     ROS_ERROR("%s", msg.c_str());
00111     connection_->sendHeaderError(msg);
00112 
00113     return false;
00114   }
00115   if (ss->getMD5Sum() != md5sum &&
00116       (md5sum != std::string("*") && ss->getMD5Sum() != std::string("*")))
00117   {
00118     std::string msg = std::string("client wants service ") + service +
00119             std::string(" to have md5sum ") + md5sum +
00120             std::string(", but it has ") + ss->getMD5Sum() +
00121             std::string(". Dropping connection.");
00122 
00123     ROS_ERROR("%s", msg.c_str());
00124     connection_->sendHeaderError(msg);
00125 
00126     return false;
00127   }
00128 
00129   
00130   
00131   
00132   
00133   if(ss->isDropped())
00134   {
00135     std::string msg = std::string("received a tcpros connection for a "
00136                              "nonexistent service [") +
00137             service + std::string("].");
00138 
00139     ROS_ERROR("%s", msg.c_str());
00140     connection_->sendHeaderError(msg);
00141 
00142     return false;
00143   }
00144   else
00145   {
00146     parent_ = ServicePublicationWPtr(ss);
00147 
00148     
00149     M_string m;
00150     m["request_type"] = ss->getRequestDataType();
00151     m["response_type"] = ss->getResponseDataType();
00152     m["type"] = ss->getDataType();
00153     m["md5sum"] = ss->getMD5Sum();
00154     m["callerid"] = this_node::getName();
00155     connection_->writeHeader(m, boost::bind(&ServiceClientLink::onHeaderWritten, this, _1));
00156 
00157     ss->addServiceClientLink(shared_from_this());
00158   }
00159 
00160   return true;
00161 }
00162 
00163 void ServiceClientLink::onConnectionDropped(const ConnectionPtr& conn)
00164 {
00165   ROS_ASSERT(conn == connection_);
00166 
00167   if (ServicePublicationPtr parent = parent_.lock())
00168   {
00169     parent->removeServiceClientLink(shared_from_this());
00170   }
00171 }
00172 
00173 void ServiceClientLink::onHeaderWritten(const ConnectionPtr& conn)
00174 {
00175   (void)conn;
00176   connection_->read(4, boost::bind(&ServiceClientLink::onRequestLength, this, _1, _2, _3, _4));
00177 }
00178 
00179 void ServiceClientLink::onRequestLength(const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success)
00180 {
00181   if (!success)
00182     return;
00183 
00184   ROS_ASSERT(conn == connection_);
00185   ROS_ASSERT(size == 4);
00186 
00187   uint32_t len = *((uint32_t*)buffer.get());
00188 
00189   if (len > 1000000000)
00190   {
00191     ROS_ERROR("a message of over a gigabyte was " \
00192                 "predicted in tcpros. that seems highly " \
00193                 "unlikely, so I'll assume protocol " \
00194                 "synchronization is lost.");
00195     conn->drop(Connection::Destructing);
00196     return;
00197   }
00198 
00199   connection_->read(len, boost::bind(&ServiceClientLink::onRequest, this, _1, _2, _3, _4));
00200 }
00201 
00202 void ServiceClientLink::onRequest(const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success)
00203 {
00204   if (!success)
00205     return;
00206 
00207   ROS_ASSERT(conn == connection_);
00208 
00209   if (ServicePublicationPtr parent = parent_.lock())
00210   {
00211     parent->processRequest(buffer, size, shared_from_this());
00212   }
00213   else
00214   {
00215     ROS_BREAK();
00216   }
00217 }
00218 
00219 void ServiceClientLink::onResponseWritten(const ConnectionPtr& conn)
00220 {
00221   ROS_ASSERT(conn == connection_);
00222 
00223   if (persistent_)
00224   {
00225     connection_->read(4, boost::bind(&ServiceClientLink::onRequestLength, this, _1, _2, _3, _4));
00226   }
00227   else
00228   {
00229     connection_->drop(Connection::Destructing);
00230   }
00231 }
00232 
00233 void ServiceClientLink::processResponse(bool ok, const SerializedMessage& res)
00234 {
00235   (void)ok;
00236   connection_->write(res.buf, res.num_bytes, boost::bind(&ServiceClientLink::onResponseWritten, this, _1));
00237 }
00238 
00239 
00240 } 
00241