00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #include "ros/service_client_link.h"
00036 #include "ros/service_publication.h"
00037 #include "ros/header.h"
00038 #include "ros/connection.h"
00039 #include "ros/service_manager.h"
00040 #include "ros/transport/transport.h"
00041 #include "ros/this_node.h"
00042 #include "ros/file_log.h"
00043
00044 #include <boost/bind.hpp>
00045
00046 namespace ros
00047 {
00048
00049 ServiceClientLink::ServiceClientLink()
00050 : persistent_(false)
00051 {
00052 }
00053
00054 ServiceClientLink::~ServiceClientLink()
00055 {
00056 if (connection_)
00057 {
00058 if (connection_->isSendingHeaderError())
00059 {
00060 connection_->removeDropListener(dropped_conn_);
00061 }
00062 else
00063 {
00064 connection_->drop(Connection::Destructing);
00065 }
00066 }
00067 }
00068
00069 bool ServiceClientLink::initialize(const ConnectionPtr& connection)
00070 {
00071 connection_ = connection;
00072 dropped_conn_ = connection_->addDropListener(boost::bind(&ServiceClientLink::onConnectionDropped, this, _1));
00073
00074 return true;
00075 }
00076
00077 bool ServiceClientLink::handleHeader(const Header& header)
00078 {
00079 std::string md5sum, service, client_callerid;
00080 if (!header.getValue("md5sum", md5sum)
00081 || !header.getValue("service", service)
00082 || !header.getValue("callerid", client_callerid))
00083 {
00084 std::string msg("bogus tcpros header. did not have the "
00085 "required elements: md5sum, service, callerid");
00086
00087 ROS_ERROR("%s", msg.c_str());
00088 connection_->sendHeaderError(msg);
00089
00090 return false;
00091 }
00092
00093 std::string persistent;
00094 if (header.getValue("persistent", persistent))
00095 {
00096 if (persistent == "1" || persistent == "true")
00097 {
00098 persistent_ = true;
00099 }
00100 }
00101
00102 ROSCPP_LOG_DEBUG("Service client [%s] wants service [%s] with md5sum [%s]", client_callerid.c_str(), service.c_str(), md5sum.c_str());
00103 ServicePublicationPtr ss = ServiceManager::instance()->lookupServicePublication(service);
00104 if (!ss)
00105 {
00106 std::string msg = std::string("received a tcpros connection for a "
00107 "nonexistent service [") +
00108 service + std::string("].");
00109
00110 ROS_ERROR("%s", msg.c_str());
00111 connection_->sendHeaderError(msg);
00112
00113 return false;
00114 }
00115 if (ss->getMD5Sum() != md5sum &&
00116 (md5sum != std::string("*") && ss->getMD5Sum() != std::string("*")))
00117 {
00118 std::string msg = std::string("client wants service ") + service +
00119 std::string(" to have md5sum ") + md5sum +
00120 std::string(", but it has ") + ss->getMD5Sum() +
00121 std::string(". Dropping connection.");
00122
00123 ROS_ERROR("%s", msg.c_str());
00124 connection_->sendHeaderError(msg);
00125
00126 return false;
00127 }
00128
00129
00130
00131
00132
00133 if(ss->isDropped())
00134 {
00135 std::string msg = std::string("received a tcpros connection for a "
00136 "nonexistent service [") +
00137 service + std::string("].");
00138
00139 ROS_ERROR("%s", msg.c_str());
00140 connection_->sendHeaderError(msg);
00141
00142 return false;
00143 }
00144 else
00145 {
00146 parent_ = ServicePublicationWPtr(ss);
00147
00148
00149 M_string m;
00150 m["request_type"] = ss->getRequestDataType();
00151 m["response_type"] = ss->getResponseDataType();
00152 m["type"] = ss->getDataType();
00153 m["md5sum"] = ss->getMD5Sum();
00154 m["callerid"] = this_node::getName();
00155 connection_->writeHeader(m, boost::bind(&ServiceClientLink::onHeaderWritten, this, _1));
00156
00157 ss->addServiceClientLink(shared_from_this());
00158 }
00159
00160 return true;
00161 }
00162
00163 void ServiceClientLink::onConnectionDropped(const ConnectionPtr& conn)
00164 {
00165 ROS_ASSERT(conn == connection_);
00166
00167 if (ServicePublicationPtr parent = parent_.lock())
00168 {
00169 parent->removeServiceClientLink(shared_from_this());
00170 }
00171 }
00172
00173 void ServiceClientLink::onHeaderWritten(const ConnectionPtr& conn)
00174 {
00175 connection_->read(4, boost::bind(&ServiceClientLink::onRequestLength, this, _1, _2, _3, _4));
00176 }
00177
00178 void ServiceClientLink::onRequestLength(const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success)
00179 {
00180 if (!success)
00181 return;
00182
00183 ROS_ASSERT(conn == connection_);
00184 ROS_ASSERT(size == 4);
00185
00186 uint32_t len = *((uint32_t*)buffer.get());
00187
00188 if (len > 1000000000)
00189 {
00190 ROS_ERROR("a message of over a gigabyte was " \
00191 "predicted in tcpros. that seems highly " \
00192 "unlikely, so I'll assume protocol " \
00193 "synchronization is lost.");
00194 conn->drop(Connection::Destructing);
00195 return;
00196 }
00197
00198 connection_->read(len, boost::bind(&ServiceClientLink::onRequest, this, _1, _2, _3, _4));
00199 }
00200
00201 void ServiceClientLink::onRequest(const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success)
00202 {
00203 if (!success)
00204 return;
00205
00206 ROS_ASSERT(conn == connection_);
00207
00208 if (ServicePublicationPtr parent = parent_.lock())
00209 {
00210 parent->processRequest(buffer, size, shared_from_this());
00211 }
00212 else
00213 {
00214 ROS_BREAK();
00215 }
00216 }
00217
00218 void ServiceClientLink::onResponseWritten(const ConnectionPtr& conn)
00219 {
00220 ROS_ASSERT(conn == connection_);
00221
00222 if (persistent_)
00223 {
00224 connection_->read(4, boost::bind(&ServiceClientLink::onRequestLength, this, _1, _2, _3, _4));
00225 }
00226 else
00227 {
00228 connection_->drop(Connection::Destructing);
00229 }
00230 }
00231
00232 void ServiceClientLink::processResponse(bool ok, const SerializedMessage& res)
00233 {
00234 connection_->write(res.buf, res.num_bytes, boost::bind(&ServiceClientLink::onResponseWritten, this, _1));
00235 }
00236
00237
00238 }
00239