00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #include "ros/service_server_link.h"
00036 #include "ros/header.h"
00037 #include "ros/connection.h"
00038 #include "ros/service_manager.h"
00039 #include "ros/transport/transport.h"
00040 #include "ros/this_node.h"
00041 #include "ros/file_log.h"
00042
00043 #include <boost/bind.hpp>
00044
00045 #include <sstream>
00046
00047 namespace ros
00048 {
00049
00050 ServiceServerLink::ServiceServerLink(const std::string& service_name, bool persistent, const std::string& request_md5sum,
00051 const std::string& response_md5sum, const M_string& header_values)
00052 : service_name_(service_name)
00053 , persistent_(persistent)
00054 , request_md5sum_(request_md5sum)
00055 , response_md5sum_(response_md5sum)
00056 , extra_outgoing_header_values_(header_values)
00057 , header_written_(false)
00058 , header_read_(false)
00059 , dropped_(false)
00060 {
00061 }
00062
00063 ServiceServerLink::~ServiceServerLink()
00064 {
00065 ROS_ASSERT(connection_->isDropped());
00066
00067 clearCalls();
00068 }
00069
00070 void ServiceServerLink::cancelCall(const CallInfoPtr& info)
00071 {
00072 CallInfoPtr local = info;
00073 {
00074 boost::mutex::scoped_lock lock(local->finished_mutex_);
00075 local->finished_ = true;
00076 local->finished_condition_.notify_all();
00077 }
00078
00079 if (boost::this_thread::get_id() != info->caller_thread_id_)
00080 {
00081 while (!local->call_finished_)
00082 {
00083 boost::this_thread::yield();
00084 }
00085 }
00086 }
00087
00088 void ServiceServerLink::clearCalls()
00089 {
00090 CallInfoPtr local_current;
00091
00092 {
00093 boost::mutex::scoped_lock lock(call_queue_mutex_);
00094 local_current = current_call_;
00095 }
00096
00097 if (local_current)
00098 {
00099 cancelCall(local_current);
00100 }
00101
00102 boost::mutex::scoped_lock lock(call_queue_mutex_);
00103
00104 while (!call_queue_.empty())
00105 {
00106 CallInfoPtr info = call_queue_.front();
00107
00108 cancelCall(info);
00109
00110 call_queue_.pop();
00111 }
00112 }
00113
00114 bool ServiceServerLink::initialize(const ConnectionPtr& connection)
00115 {
00116 connection_ = connection;
00117 connection_->addDropListener(boost::bind(&ServiceServerLink::onConnectionDropped, this, _1));
00118 connection_->setHeaderReceivedCallback(boost::bind(&ServiceServerLink::onHeaderReceived, this, _1, _2));
00119
00120 M_string header;
00121 header["service"] = service_name_;
00122 header["md5sum"] = request_md5sum_;
00123 header["callerid"] = this_node::getName();
00124 header["persistent"] = persistent_ ? "1" : "0";
00125 header.insert(extra_outgoing_header_values_.begin(), extra_outgoing_header_values_.end());
00126
00127 connection_->writeHeader(header, boost::bind(&ServiceServerLink::onHeaderWritten, this, _1));
00128
00129 return true;
00130 }
00131
00132 void ServiceServerLink::onHeaderWritten(const ConnectionPtr& conn)
00133 {
00134 header_written_ = true;
00135 }
00136
00137 bool ServiceServerLink::onHeaderReceived(const ConnectionPtr& conn, const Header& header)
00138 {
00139 std::string md5sum, type;
00140 if (!header.getValue("md5sum", md5sum))
00141 {
00142 ROS_ERROR("TCPROS header from service server did not have required element: md5sum");
00143 return false;
00144 }
00145
00146 bool empty = false;
00147 {
00148 boost::mutex::scoped_lock lock(call_queue_mutex_);
00149 empty = call_queue_.empty();
00150
00151 if (empty)
00152 {
00153 header_read_ = true;
00154 }
00155 }
00156
00157 if (!empty)
00158 {
00159 processNextCall();
00160
00161 header_read_ = true;
00162 }
00163
00164 return true;
00165 }
00166
00167 void ServiceServerLink::onConnectionDropped(const ConnectionPtr& conn)
00168 {
00169 ROS_ASSERT(conn == connection_);
00170 ROSCPP_LOG_DEBUG("Service client from [%s] for [%s] dropped", conn->getRemoteString().c_str(), service_name_.c_str());
00171
00172 dropped_ = true;
00173 clearCalls();
00174
00175 ServiceManager::instance()->removeServiceServerLink(shared_from_this());
00176 }
00177
00178 void ServiceServerLink::onRequestWritten(const ConnectionPtr& conn)
00179 {
00180
00181 connection_->read(5, boost::bind(&ServiceServerLink::onResponseOkAndLength, this, _1, _2, _3, _4));
00182 }
00183
00184 void ServiceServerLink::onResponseOkAndLength(const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success)
00185 {
00186 ROS_ASSERT(conn == connection_);
00187 ROS_ASSERT(size == 5);
00188
00189 if (!success)
00190 return;
00191
00192 uint8_t ok = buffer[0];
00193 uint32_t len = *((uint32_t*)(buffer.get() + 1));
00194
00195 if (len > 1000000000)
00196 {
00197 ROS_ERROR("a message of over a gigabyte was " \
00198 "predicted in tcpros. that seems highly " \
00199 "unlikely, so I'll assume protocol " \
00200 "synchronization is lost.");
00201 conn->drop(Connection::Destructing);
00202
00203 return;
00204 }
00205
00206 {
00207 boost::mutex::scoped_lock lock(call_queue_mutex_);
00208 current_call_->success_ = ok;
00209 }
00210
00211 if (len > 0)
00212 {
00213 connection_->read(len, boost::bind(&ServiceServerLink::onResponse, this, _1, _2, _3, _4));
00214 }
00215 else
00216 {
00217 onResponse(conn, boost::shared_array<uint8_t>(), 0, true);
00218 }
00219 }
00220
00221 void ServiceServerLink::onResponse(const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success)
00222 {
00223 ROS_ASSERT(conn == connection_);
00224
00225 if (!success)
00226 return;
00227
00228 {
00229 boost::mutex::scoped_lock queue_lock(call_queue_mutex_);
00230
00231 if (current_call_->success_)
00232 {
00233 *current_call_->resp_ = SerializedMessage(buffer, size);
00234 }
00235 }
00236
00237 callFinished();
00238 }
00239
00240 void ServiceServerLink::callFinished()
00241 {
00242 CallInfoPtr saved_call;
00243 ServiceServerLinkPtr self;
00244 {
00245 boost::mutex::scoped_lock queue_lock(call_queue_mutex_);
00246 boost::mutex::scoped_lock finished_lock(current_call_->finished_mutex_);
00247
00248 ROS_DEBUG_NAMED("superdebug", "Client to service [%s] call finished with success=[%s]", service_name_.c_str(), current_call_->success_ ? "true" : "false");
00249
00250 current_call_->finished_ = true;
00251 current_call_->finished_condition_.notify_all();
00252
00253 saved_call = current_call_;
00254 current_call_ = CallInfoPtr();
00255
00256
00257
00258
00259 self = shared_from_this();
00260 }
00261
00262 saved_call = CallInfoPtr();
00263
00264 processNextCall();
00265 }
00266
00267 void ServiceServerLink::processNextCall()
00268 {
00269 bool empty = false;
00270 {
00271 boost::mutex::scoped_lock lock(call_queue_mutex_);
00272
00273 if (current_call_)
00274 {
00275 return;
00276 }
00277
00278 if (!call_queue_.empty())
00279 {
00280 ROS_DEBUG_NAMED("superdebug", "[%s] Client to service [%s] processing next service call", persistent_ ? "persistent" : "non-persistent", service_name_.c_str());
00281
00282 current_call_ = call_queue_.front();
00283 call_queue_.pop();
00284 }
00285 else
00286 {
00287 empty = true;
00288 }
00289 }
00290
00291 if (empty)
00292 {
00293 if (!persistent_)
00294 {
00295 ROS_DEBUG_NAMED("superdebug", "Dropping non-persistent client to service [%s]", service_name_.c_str());
00296 connection_->drop(Connection::Destructing);
00297 }
00298 else
00299 {
00300 ROS_DEBUG_NAMED("superdebug", "Keeping persistent client to service [%s]", service_name_.c_str());
00301 }
00302 }
00303 else
00304 {
00305 SerializedMessage request;
00306
00307 {
00308 boost::mutex::scoped_lock lock(call_queue_mutex_);
00309 request = current_call_->req_;
00310 }
00311
00312 connection_->write(request.buf, request.num_bytes, boost::bind(&ServiceServerLink::onRequestWritten, this, _1));
00313 }
00314 }
00315
00316 bool ServiceServerLink::call(const SerializedMessage& req, SerializedMessage& resp)
00317 {
00318 CallInfoPtr info(new CallInfo);
00319 info->req_ = req;
00320 info->resp_ = &resp;
00321 info->success_ = false;
00322 info->finished_ = false;
00323 info->call_finished_ = false;
00324 info->caller_thread_id_ = boost::this_thread::get_id();
00325
00326
00327
00328 bool immediate = false;
00329 {
00330 boost::mutex::scoped_lock lock(call_queue_mutex_);
00331
00332 if (connection_->isDropped())
00333 {
00334
00335 return false;
00336 }
00337
00338 if (call_queue_.empty() && header_written_ && header_read_)
00339 {
00340 immediate = true;
00341 }
00342
00343 call_queue_.push(info);
00344 }
00345
00346 if (immediate)
00347 {
00348 processNextCall();
00349 }
00350
00351 {
00352 boost::mutex::scoped_lock lock(info->finished_mutex_);
00353
00354 while (!info->finished_)
00355 {
00356 info->finished_condition_.wait(lock);
00357 }
00358 }
00359
00360 info->call_finished_ = true;
00361
00362 return info->success_;
00363 }
00364
00365 bool ServiceServerLink::isValid() const
00366 {
00367 return !dropped_;
00368 }
00369
00370 }
00371