00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #include "ros/service_client_link.h"
00036 #include "ros/service_publication.h"
00037 #include "ros/header.h"
00038 #include "ros/connection.h"
00039 #include "ros/service_manager.h"
00040 #include "ros/transport/transport.h"
00041 #include "ros/this_node.h"
00042 #include "ros/file_log.h"
00043
00044 #include <boost/bind.hpp>
00045
00046 namespace ros
00047 {
00048
00049 ServiceClientLink::ServiceClientLink()
00050 : persistent_(false)
00051 {
00052 }
00053
00054 ServiceClientLink::~ServiceClientLink()
00055 {
00056 if (connection_)
00057 {
00058 if (connection_->isSendingHeaderError())
00059 {
00060 connection_->removeDropListener(dropped_conn_);
00061 }
00062 else
00063 {
00064 connection_->drop(Connection::Destructing);
00065 }
00066 }
00067 }
00068
00069 bool ServiceClientLink::initialize(const ConnectionPtr& connection)
00070 {
00071 connection_ = connection;
00072 dropped_conn_ = connection_->addDropListener(boost::bind(&ServiceClientLink::onConnectionDropped, this, _1));
00073
00074 return true;
00075 }
00076
00077 bool ServiceClientLink::handleHeader(const Header& header)
00078 {
00079 std::string md5sum, service, client_callerid;
00080 if (!header.getValue("md5sum", md5sum)
00081 || !header.getValue("service", service)
00082 || !header.getValue("callerid", client_callerid))
00083 {
00084 std::string msg("bogus tcpros header. did not have the "
00085 "required elements: md5sum, service, callerid");
00086
00087 ROS_ERROR("%s", msg.c_str());
00088 connection_->sendHeaderError(msg);
00089
00090 return false;
00091 }
00092
00093 std::string persistent;
00094 if (header.getValue("persistent", persistent))
00095 {
00096 if (persistent == "1" || persistent == "true")
00097 {
00098 persistent_ = true;
00099 }
00100 }
00101
00102 ROSCPP_LOG_DEBUG("Service client [%s] wants service [%s] with md5sum [%s]", client_callerid.c_str(), service.c_str(), md5sum.c_str());
00103 ServicePublicationPtr ss = ServiceManager::instance()->lookupServicePublication(service);
00104 if (!ss)
00105 {
00106 std::string msg = std::string("received a tcpros connection for a "
00107 "nonexistent service [") +
00108 service + std::string("].");
00109
00110 ROS_ERROR("%s", msg.c_str());
00111 connection_->sendHeaderError(msg);
00112
00113 return false;
00114 }
00115 if (ss->getMD5Sum() != md5sum &&
00116 (md5sum != std::string("*") && ss->getMD5Sum() != std::string("*")))
00117 {
00118 std::string msg = std::string("client wants service ") + service +
00119 std::string(" to have md5sum ") + md5sum +
00120 std::string(", but it has ") + ss->getMD5Sum() +
00121 std::string(". Dropping connection.");
00122
00123 ROS_ERROR("%s", msg.c_str());
00124 connection_->sendHeaderError(msg);
00125
00126 return false;
00127 }
00128
00129
00130
00131
00132
00133 if(ss->isDropped())
00134 {
00135 std::string msg = std::string("received a tcpros connection for a "
00136 "nonexistent service [") +
00137 service + std::string("].");
00138
00139 ROS_ERROR("%s", msg.c_str());
00140 connection_->sendHeaderError(msg);
00141
00142 return false;
00143 }
00144 else
00145 {
00146 parent_ = ServicePublicationWPtr(ss);
00147
00148
00149 M_string m;
00150 m["request_type"] = ss->getRequestDataType();
00151 m["response_type"] = ss->getResponseDataType();
00152 m["type"] = ss->getDataType();
00153 m["md5sum"] = ss->getMD5Sum();
00154 m["callerid"] = this_node::getName();
00155 connection_->writeHeader(m, boost::bind(&ServiceClientLink::onHeaderWritten, this, _1));
00156
00157 ss->addServiceClientLink(shared_from_this());
00158 }
00159
00160 return true;
00161 }
00162
00163 void ServiceClientLink::onConnectionDropped(const ConnectionPtr& conn)
00164 {
00165 ROS_ASSERT(conn == connection_);
00166
00167 if (ServicePublicationPtr parent = parent_.lock())
00168 {
00169 parent->removeServiceClientLink(shared_from_this());
00170 }
00171 }
00172
00173 void ServiceClientLink::onHeaderWritten(const ConnectionPtr& conn)
00174 {
00175 (void)conn;
00176 connection_->read(4, boost::bind(&ServiceClientLink::onRequestLength, this, _1, _2, _3, _4));
00177 }
00178
00179 void ServiceClientLink::onRequestLength(const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success)
00180 {
00181 if (!success)
00182 return;
00183
00184 ROS_ASSERT(conn == connection_);
00185 ROS_ASSERT(size == 4);
00186
00187 uint32_t len = *((uint32_t*)buffer.get());
00188
00189 if (len > 1000000000)
00190 {
00191 ROS_ERROR("a message of over a gigabyte was " \
00192 "predicted in tcpros. that seems highly " \
00193 "unlikely, so I'll assume protocol " \
00194 "synchronization is lost.");
00195 conn->drop(Connection::Destructing);
00196 return;
00197 }
00198
00199 connection_->read(len, boost::bind(&ServiceClientLink::onRequest, this, _1, _2, _3, _4));
00200 }
00201
00202 void ServiceClientLink::onRequest(const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success)
00203 {
00204 if (!success)
00205 return;
00206
00207 ROS_ASSERT(conn == connection_);
00208
00209 if (ServicePublicationPtr parent = parent_.lock())
00210 {
00211 parent->processRequest(buffer, size, shared_from_this());
00212 }
00213 else
00214 {
00215 ROS_BREAK();
00216 }
00217 }
00218
00219 void ServiceClientLink::onResponseWritten(const ConnectionPtr& conn)
00220 {
00221 ROS_ASSERT(conn == connection_);
00222
00223 if (persistent_)
00224 {
00225 connection_->read(4, boost::bind(&ServiceClientLink::onRequestLength, this, _1, _2, _3, _4));
00226 }
00227 else
00228 {
00229 connection_->drop(Connection::Destructing);
00230 }
00231 }
00232
00233 void ServiceClientLink::processResponse(bool ok, const SerializedMessage& res)
00234 {
00235 (void)ok;
00236 connection_->write(res.buf, res.num_bytes, boost::bind(&ServiceClientLink::onResponseWritten, this, _1));
00237 }
00238
00239
00240 }
00241