00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #include "ros/service_server_link.h"
00036 #include "ros/header.h"
00037 #include "ros/connection.h"
00038 #include "ros/service_manager.h"
00039 #include "ros/transport/transport.h"
00040 #include "ros/this_node.h"
00041 #include "ros/file_log.h"
00042
00043 #include <boost/bind.hpp>
00044
00045 #include <sstream>
00046
00047 namespace ros
00048 {
00049
00050 ServiceServerLink::ServiceServerLink(const std::string& service_name, bool persistent, const std::string& request_md5sum,
00051 const std::string& response_md5sum, const M_string& header_values)
00052 : service_name_(service_name)
00053 , persistent_(persistent)
00054 , request_md5sum_(request_md5sum)
00055 , response_md5sum_(response_md5sum)
00056 , extra_outgoing_header_values_(header_values)
00057 , header_written_(false)
00058 , header_read_(false)
00059 , dropped_(false)
00060 {
00061 }
00062
00063 ServiceServerLink::~ServiceServerLink()
00064 {
00065 ROS_ASSERT(connection_->isDropped());
00066
00067 clearCalls();
00068 }
00069
00070 void ServiceServerLink::cancelCall(const CallInfoPtr& info)
00071 {
00072 CallInfoPtr local = info;
00073 {
00074 boost::mutex::scoped_lock lock(local->finished_mutex_);
00075 local->finished_ = true;
00076 local->finished_condition_.notify_all();
00077 }
00078
00079 if (boost::this_thread::get_id() != info->caller_thread_id_)
00080 {
00081 while (!local->call_finished_)
00082 {
00083 boost::this_thread::yield();
00084 }
00085 }
00086 }
00087
00088 void ServiceServerLink::clearCalls()
00089 {
00090 CallInfoPtr local_current;
00091
00092 {
00093 boost::mutex::scoped_lock lock(call_queue_mutex_);
00094 local_current = current_call_;
00095 }
00096
00097 if (local_current)
00098 {
00099 cancelCall(local_current);
00100 }
00101
00102 boost::mutex::scoped_lock lock(call_queue_mutex_);
00103
00104 while (!call_queue_.empty())
00105 {
00106 CallInfoPtr info = call_queue_.front();
00107
00108 cancelCall(info);
00109
00110 call_queue_.pop();
00111 }
00112 }
00113
00114 bool ServiceServerLink::initialize(const ConnectionPtr& connection)
00115 {
00116 connection_ = connection;
00117 connection_->addDropListener(boost::bind(&ServiceServerLink::onConnectionDropped, this, _1));
00118 connection_->setHeaderReceivedCallback(boost::bind(&ServiceServerLink::onHeaderReceived, this, _1, _2));
00119
00120 M_string header;
00121 header["service"] = service_name_;
00122 header["md5sum"] = request_md5sum_;
00123 header["callerid"] = this_node::getName();
00124 header["persistent"] = persistent_ ? "1" : "0";
00125 header.insert(extra_outgoing_header_values_.begin(), extra_outgoing_header_values_.end());
00126
00127 connection_->writeHeader(header, boost::bind(&ServiceServerLink::onHeaderWritten, this, _1));
00128
00129 return true;
00130 }
00131
00132 void ServiceServerLink::onHeaderWritten(const ConnectionPtr& conn)
00133 {
00134 (void)conn;
00135 header_written_ = true;
00136 }
00137
00138 bool ServiceServerLink::onHeaderReceived(const ConnectionPtr& conn, const Header& header)
00139 {
00140 (void)conn;
00141 std::string md5sum, type;
00142 if (!header.getValue("md5sum", md5sum))
00143 {
00144 ROS_ERROR("TCPROS header from service server did not have required element: md5sum");
00145 return false;
00146 }
00147
00148 bool empty = false;
00149 {
00150 boost::mutex::scoped_lock lock(call_queue_mutex_);
00151 empty = call_queue_.empty();
00152
00153 if (empty)
00154 {
00155 header_read_ = true;
00156 }
00157 }
00158
00159 if (!empty)
00160 {
00161 processNextCall();
00162
00163 header_read_ = true;
00164 }
00165
00166 return true;
00167 }
00168
00169 void ServiceServerLink::onConnectionDropped(const ConnectionPtr& conn)
00170 {
00171 ROS_ASSERT(conn == connection_);
00172 ROSCPP_LOG_DEBUG("Service client from [%s] for [%s] dropped", conn->getRemoteString().c_str(), service_name_.c_str());
00173
00174 dropped_ = true;
00175 clearCalls();
00176
00177 ServiceManager::instance()->removeServiceServerLink(shared_from_this());
00178 }
00179
00180 void ServiceServerLink::onRequestWritten(const ConnectionPtr& conn)
00181 {
00182 (void)conn;
00183
00184 connection_->read(5, boost::bind(&ServiceServerLink::onResponseOkAndLength, this, _1, _2, _3, _4));
00185 }
00186
00187 void ServiceServerLink::onResponseOkAndLength(const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success)
00188 {
00189 ROS_ASSERT(conn == connection_);
00190 ROS_ASSERT(size == 5);
00191
00192 if (!success)
00193 return;
00194
00195 uint8_t ok = buffer[0];
00196 uint32_t len = *((uint32_t*)(buffer.get() + 1));
00197
00198 if (len > 1000000000)
00199 {
00200 ROS_ERROR("a message of over a gigabyte was " \
00201 "predicted in tcpros. that seems highly " \
00202 "unlikely, so I'll assume protocol " \
00203 "synchronization is lost.");
00204 conn->drop(Connection::Destructing);
00205
00206 return;
00207 }
00208
00209 {
00210 boost::mutex::scoped_lock lock(call_queue_mutex_);
00211 if ( ok != 0 ) {
00212 current_call_->success_ = true;
00213 } else {
00214 current_call_->success_ = false;
00215 }
00216 }
00217
00218 if (len > 0)
00219 {
00220 connection_->read(len, boost::bind(&ServiceServerLink::onResponse, this, _1, _2, _3, _4));
00221 }
00222 else
00223 {
00224 onResponse(conn, boost::shared_array<uint8_t>(), 0, true);
00225 }
00226 }
00227
00228 void ServiceServerLink::onResponse(const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success)
00229 {
00230 ROS_ASSERT(conn == connection_);
00231
00232 if (!success)
00233 return;
00234
00235 {
00236 boost::mutex::scoped_lock queue_lock(call_queue_mutex_);
00237
00238 if (current_call_->success_)
00239 {
00240 *current_call_->resp_ = SerializedMessage(buffer, size);
00241 }
00242 else
00243 {
00244 current_call_->exception_string_ = std::string(reinterpret_cast<char*>(buffer.get()), size);
00245 }
00246 }
00247
00248 callFinished();
00249 }
00250
00251 void ServiceServerLink::callFinished()
00252 {
00253 CallInfoPtr saved_call;
00254 ServiceServerLinkPtr self;
00255 {
00256 boost::mutex::scoped_lock queue_lock(call_queue_mutex_);
00257 boost::mutex::scoped_lock finished_lock(current_call_->finished_mutex_);
00258
00259 ROS_DEBUG_NAMED("superdebug", "Client to service [%s] call finished with success=[%s]", service_name_.c_str(), current_call_->success_ ? "true" : "false");
00260
00261 current_call_->finished_ = true;
00262 current_call_->finished_condition_.notify_all();
00263
00264 saved_call = current_call_;
00265 current_call_ = CallInfoPtr();
00266
00267
00268
00269
00270 self = shared_from_this();
00271 }
00272
00273 saved_call = CallInfoPtr();
00274
00275 processNextCall();
00276 }
00277
00278 void ServiceServerLink::processNextCall()
00279 {
00280 bool empty = false;
00281 {
00282 boost::mutex::scoped_lock lock(call_queue_mutex_);
00283
00284 if (current_call_)
00285 {
00286 return;
00287 }
00288
00289 if (!call_queue_.empty())
00290 {
00291 ROS_DEBUG_NAMED("superdebug", "[%s] Client to service [%s] processing next service call", persistent_ ? "persistent" : "non-persistent", service_name_.c_str());
00292
00293 current_call_ = call_queue_.front();
00294 call_queue_.pop();
00295 }
00296 else
00297 {
00298 empty = true;
00299 }
00300 }
00301
00302 if (empty)
00303 {
00304 if (!persistent_)
00305 {
00306 ROS_DEBUG_NAMED("superdebug", "Dropping non-persistent client to service [%s]", service_name_.c_str());
00307 connection_->drop(Connection::Destructing);
00308 }
00309 else
00310 {
00311 ROS_DEBUG_NAMED("superdebug", "Keeping persistent client to service [%s]", service_name_.c_str());
00312 }
00313 }
00314 else
00315 {
00316 SerializedMessage request;
00317
00318 {
00319 boost::mutex::scoped_lock lock(call_queue_mutex_);
00320 request = current_call_->req_;
00321 }
00322
00323 connection_->write(request.buf, request.num_bytes, boost::bind(&ServiceServerLink::onRequestWritten, this, _1));
00324 }
00325 }
00326
00327 bool ServiceServerLink::call(const SerializedMessage& req, SerializedMessage& resp)
00328 {
00329 CallInfoPtr info(boost::make_shared<CallInfo>());
00330 info->req_ = req;
00331 info->resp_ = &resp;
00332 info->success_ = false;
00333 info->finished_ = false;
00334 info->call_finished_ = false;
00335 info->caller_thread_id_ = boost::this_thread::get_id();
00336
00337
00338
00339 bool immediate = false;
00340 {
00341 boost::mutex::scoped_lock lock(call_queue_mutex_);
00342
00343 if (connection_->isDropped())
00344 {
00345 ROSCPP_LOG_DEBUG("ServiceServerLink::call called on dropped connection for service [%s]", service_name_.c_str());
00346 info->call_finished_ = true;
00347 return false;
00348 }
00349
00350 if (call_queue_.empty() && header_written_ && header_read_)
00351 {
00352 immediate = true;
00353 }
00354
00355 call_queue_.push(info);
00356 }
00357
00358 if (immediate)
00359 {
00360 processNextCall();
00361 }
00362
00363 {
00364 boost::mutex::scoped_lock lock(info->finished_mutex_);
00365
00366 while (!info->finished_)
00367 {
00368 info->finished_condition_.wait(lock);
00369 }
00370 }
00371
00372 info->call_finished_ = true;
00373
00374 if (info->exception_string_.length() > 0)
00375 {
00376 ROS_ERROR("Service call failed: service [%s] responded with an error: %s", service_name_.c_str(), info->exception_string_.c_str());
00377 }
00378
00379 return info->success_;
00380 }
00381
00382 bool ServiceServerLink::isValid() const
00383 {
00384 return !dropped_;
00385 }
00386
00387 }
00388