00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #include "ros/service_server_link.h"
00036 #include "ros/header.h"
00037 #include "ros/connection.h"
00038 #include "ros/service_manager.h"
00039 #include "ros/transport/transport.h"
00040 #include "ros/this_node.h"
00041 #include "ros/file_log.h"
00042
00043 #include <boost/bind.hpp>
00044
00045 #include <sstream>
00046
00047 namespace ros
00048 {
00049
00050 ServiceServerLink::ServiceServerLink(const std::string& service_name, bool persistent, const std::string& request_md5sum,
00051 const std::string& response_md5sum, const M_string& header_values)
00052 : service_name_(service_name)
00053 , persistent_(persistent)
00054 , request_md5sum_(request_md5sum)
00055 , response_md5sum_(response_md5sum)
00056 , extra_outgoing_header_values_(header_values)
00057 , header_written_(false)
00058 , header_read_(false)
00059 , dropped_(false)
00060 {
00061 }
00062
00063 ServiceServerLink::~ServiceServerLink()
00064 {
00065 ROS_ASSERT(connection_->isDropped());
00066
00067 clearCalls();
00068 }
00069
00070 void ServiceServerLink::cancelCall(const CallInfoPtr& info)
00071 {
00072 CallInfoPtr local = info;
00073 {
00074 boost::mutex::scoped_lock lock(local->finished_mutex_);
00075 local->finished_ = true;
00076 local->finished_condition_.notify_all();
00077 }
00078
00079 if (boost::this_thread::get_id() != info->caller_thread_id_)
00080 {
00081 while (!local->call_finished_)
00082 {
00083 boost::this_thread::yield();
00084 }
00085 }
00086 }
00087
00088 void ServiceServerLink::clearCalls()
00089 {
00090 CallInfoPtr local_current;
00091
00092 {
00093 boost::mutex::scoped_lock lock(call_queue_mutex_);
00094 local_current = current_call_;
00095 }
00096
00097 if (local_current)
00098 {
00099 cancelCall(local_current);
00100 }
00101
00102 boost::mutex::scoped_lock lock(call_queue_mutex_);
00103
00104 while (!call_queue_.empty())
00105 {
00106 CallInfoPtr info = call_queue_.front();
00107
00108 cancelCall(info);
00109
00110 call_queue_.pop();
00111 }
00112 }
00113
00114 bool ServiceServerLink::initialize(const ConnectionPtr& connection)
00115 {
00116 connection_ = connection;
00117 connection_->addDropListener(boost::bind(&ServiceServerLink::onConnectionDropped, this, _1));
00118 connection_->setHeaderReceivedCallback(boost::bind(&ServiceServerLink::onHeaderReceived, this, _1, _2));
00119
00120 M_string header;
00121 header["service"] = service_name_;
00122 header["md5sum"] = request_md5sum_;
00123 header["callerid"] = this_node::getName();
00124 header["persistent"] = persistent_ ? "1" : "0";
00125 header.insert(extra_outgoing_header_values_.begin(), extra_outgoing_header_values_.end());
00126
00127 connection_->writeHeader(header, boost::bind(&ServiceServerLink::onHeaderWritten, this, _1));
00128
00129 return true;
00130 }
00131
00132 void ServiceServerLink::onHeaderWritten(const ConnectionPtr& conn)
00133 {
00134 header_written_ = true;
00135 }
00136
00137 bool ServiceServerLink::onHeaderReceived(const ConnectionPtr& conn, const Header& header)
00138 {
00139 std::string md5sum, type;
00140 if (!header.getValue("md5sum", md5sum))
00141 {
00142 ROS_ERROR("TCPROS header from service server did not have required element: md5sum");
00143 return false;
00144 }
00145
00146 bool empty = false;
00147 {
00148 boost::mutex::scoped_lock lock(call_queue_mutex_);
00149 empty = call_queue_.empty();
00150
00151 if (empty)
00152 {
00153 header_read_ = true;
00154 }
00155 }
00156
00157 if (!empty)
00158 {
00159 processNextCall();
00160
00161 header_read_ = true;
00162 }
00163
00164 return true;
00165 }
00166
00167 void ServiceServerLink::onConnectionDropped(const ConnectionPtr& conn)
00168 {
00169 ROS_ASSERT(conn == connection_);
00170 ROSCPP_LOG_DEBUG("Service client from [%s] for [%s] dropped", conn->getRemoteString().c_str(), service_name_.c_str());
00171
00172 dropped_ = true;
00173 clearCalls();
00174
00175 ServiceManager::instance()->removeServiceServerLink(shared_from_this());
00176 }
00177
00178 void ServiceServerLink::onRequestWritten(const ConnectionPtr& conn)
00179 {
00180
00181 connection_->read(5, boost::bind(&ServiceServerLink::onResponseOkAndLength, this, _1, _2, _3, _4));
00182 }
00183
00184 void ServiceServerLink::onResponseOkAndLength(const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success)
00185 {
00186 ROS_ASSERT(conn == connection_);
00187 ROS_ASSERT(size == 5);
00188
00189 if (!success)
00190 return;
00191
00192 uint8_t ok = buffer[0];
00193 uint32_t len = *((uint32_t*)(buffer.get() + 1));
00194
00195 if (len > 1000000000)
00196 {
00197 ROS_ERROR("a message of over a gigabyte was " \
00198 "predicted in tcpros. that seems highly " \
00199 "unlikely, so I'll assume protocol " \
00200 "synchronization is lost.");
00201 conn->drop(Connection::Destructing);
00202
00203 return;
00204 }
00205
00206 {
00207 boost::mutex::scoped_lock lock(call_queue_mutex_);
00208 if ( ok != 0 ) {
00209 current_call_->success_ = true;
00210 } else {
00211 current_call_->success_ = false;
00212 }
00213 }
00214
00215 if (len > 0)
00216 {
00217 connection_->read(len, boost::bind(&ServiceServerLink::onResponse, this, _1, _2, _3, _4));
00218 }
00219 else
00220 {
00221 onResponse(conn, boost::shared_array<uint8_t>(), 0, true);
00222 }
00223 }
00224
00225 void ServiceServerLink::onResponse(const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success)
00226 {
00227 ROS_ASSERT(conn == connection_);
00228
00229 if (!success)
00230 return;
00231
00232 {
00233 boost::mutex::scoped_lock queue_lock(call_queue_mutex_);
00234
00235 if (current_call_->success_)
00236 {
00237 *current_call_->resp_ = SerializedMessage(buffer, size);
00238 }
00239 else
00240 {
00241 current_call_->exception_string_ = std::string(reinterpret_cast<char*>(buffer.get()), size);
00242 }
00243 }
00244
00245 callFinished();
00246 }
00247
00248 void ServiceServerLink::callFinished()
00249 {
00250 CallInfoPtr saved_call;
00251 ServiceServerLinkPtr self;
00252 {
00253 boost::mutex::scoped_lock queue_lock(call_queue_mutex_);
00254 boost::mutex::scoped_lock finished_lock(current_call_->finished_mutex_);
00255
00256 ROS_DEBUG_NAMED("superdebug", "Client to service [%s] call finished with success=[%s]", service_name_.c_str(), current_call_->success_ ? "true" : "false");
00257
00258 current_call_->finished_ = true;
00259 current_call_->finished_condition_.notify_all();
00260
00261 saved_call = current_call_;
00262 current_call_ = CallInfoPtr();
00263
00264
00265
00266
00267 self = shared_from_this();
00268 }
00269
00270 saved_call = CallInfoPtr();
00271
00272 processNextCall();
00273 }
00274
00275 void ServiceServerLink::processNextCall()
00276 {
00277 bool empty = false;
00278 {
00279 boost::mutex::scoped_lock lock(call_queue_mutex_);
00280
00281 if (current_call_)
00282 {
00283 return;
00284 }
00285
00286 if (!call_queue_.empty())
00287 {
00288 ROS_DEBUG_NAMED("superdebug", "[%s] Client to service [%s] processing next service call", persistent_ ? "persistent" : "non-persistent", service_name_.c_str());
00289
00290 current_call_ = call_queue_.front();
00291 call_queue_.pop();
00292 }
00293 else
00294 {
00295 empty = true;
00296 }
00297 }
00298
00299 if (empty)
00300 {
00301 if (!persistent_)
00302 {
00303 ROS_DEBUG_NAMED("superdebug", "Dropping non-persistent client to service [%s]", service_name_.c_str());
00304 connection_->drop(Connection::Destructing);
00305 }
00306 else
00307 {
00308 ROS_DEBUG_NAMED("superdebug", "Keeping persistent client to service [%s]", service_name_.c_str());
00309 }
00310 }
00311 else
00312 {
00313 SerializedMessage request;
00314
00315 {
00316 boost::mutex::scoped_lock lock(call_queue_mutex_);
00317 request = current_call_->req_;
00318 }
00319
00320 connection_->write(request.buf, request.num_bytes, boost::bind(&ServiceServerLink::onRequestWritten, this, _1));
00321 }
00322 }
00323
00324 bool ServiceServerLink::call(const SerializedMessage& req, SerializedMessage& resp)
00325 {
00326 CallInfoPtr info(new CallInfo);
00327 info->req_ = req;
00328 info->resp_ = &resp;
00329 info->success_ = false;
00330 info->finished_ = false;
00331 info->call_finished_ = false;
00332 info->caller_thread_id_ = boost::this_thread::get_id();
00333
00334
00335
00336 bool immediate = false;
00337 {
00338 boost::mutex::scoped_lock lock(call_queue_mutex_);
00339
00340 if (connection_->isDropped())
00341 {
00342 ROSCPP_LOG_DEBUG("ServiceServerLink::call called on dropped connection for service [%s]", service_name_.c_str());
00343 info->call_finished_ = true;
00344 return false;
00345 }
00346
00347 if (call_queue_.empty() && header_written_ && header_read_)
00348 {
00349 immediate = true;
00350 }
00351
00352 call_queue_.push(info);
00353 }
00354
00355 if (immediate)
00356 {
00357 processNextCall();
00358 }
00359
00360 {
00361 boost::mutex::scoped_lock lock(info->finished_mutex_);
00362
00363 while (!info->finished_)
00364 {
00365 info->finished_condition_.wait(lock);
00366 }
00367 }
00368
00369 info->call_finished_ = true;
00370
00371 if (info->exception_string_.length() > 0)
00372 {
00373 ROS_ERROR("Service call failed: service [%s] responded with an error: %s", service_name_.c_str(), info->exception_string_.c_str());
00374 }
00375
00376 return info->success_;
00377 }
00378
00379 bool ServiceServerLink::isValid() const
00380 {
00381 return !dropped_;
00382 }
00383
00384 }
00385