00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #include "ros/service_server_link.h"
00036 #include "ros/header.h"
00037 #include "ros/connection.h"
00038 #include "ros/service_manager.h"
00039 #include "ros/transport/transport.h"
00040 #include "ros/this_node.h"
00041 #include "ros/file_log.h"
00042
00043 #include <boost/bind.hpp>
00044
00045 #include <sstream>
00046
00047 namespace ros
00048 {
00049
00050 ServiceServerLink::ServiceServerLink(const std::string& service_name, bool persistent, const std::string& request_md5sum,
00051 const std::string& response_md5sum, const M_string& header_values)
00052 : service_name_(service_name)
00053 , persistent_(persistent)
00054 , request_md5sum_(request_md5sum)
00055 , response_md5sum_(response_md5sum)
00056 , extra_outgoing_header_values_(header_values)
00057 , header_written_(false)
00058 , header_read_(false)
00059 , dropped_(false)
00060 {
00061 }
00062
00063 ServiceServerLink::~ServiceServerLink()
00064 {
00065 ROS_ASSERT(connection_->isDropped());
00066
00067 clearCalls();
00068 }
00069
00070 void ServiceServerLink::cancelCall(const CallInfoPtr& info)
00071 {
00072 CallInfoPtr local = info;
00073 {
00074 boost::mutex::scoped_lock lock(local->finished_mutex_);
00075 local->finished_ = true;
00076 local->finished_condition_.notify_all();
00077 }
00078
00079 if (boost::this_thread::get_id() != info->caller_thread_id_)
00080 {
00081 while (!local->call_finished_)
00082 {
00083 boost::this_thread::yield();
00084 }
00085 }
00086 }
00087
00088 void ServiceServerLink::clearCalls()
00089 {
00090 CallInfoPtr local_current;
00091
00092 {
00093 boost::mutex::scoped_lock lock(call_queue_mutex_);
00094 local_current = current_call_;
00095 }
00096
00097 if (local_current)
00098 {
00099 cancelCall(local_current);
00100 }
00101
00102 boost::mutex::scoped_lock lock(call_queue_mutex_);
00103
00104 while (!call_queue_.empty())
00105 {
00106 CallInfoPtr info = call_queue_.front();
00107
00108 cancelCall(info);
00109
00110 call_queue_.pop();
00111 }
00112 }
00113
00114 bool ServiceServerLink::initialize(const ConnectionPtr& connection)
00115 {
00116 connection_ = connection;
00117 connection_->addDropListener(boost::bind(&ServiceServerLink::onConnectionDropped, this, _1));
00118 connection_->setHeaderReceivedCallback(boost::bind(&ServiceServerLink::onHeaderReceived, this, _1, _2));
00119
00120 M_string header;
00121 header["service"] = service_name_;
00122 header["md5sum"] = request_md5sum_;
00123 header["callerid"] = this_node::getName();
00124 header["persistent"] = persistent_ ? "1" : "0";
00125 header.insert(extra_outgoing_header_values_.begin(), extra_outgoing_header_values_.end());
00126
00127 connection_->writeHeader(header, boost::bind(&ServiceServerLink::onHeaderWritten, this, _1));
00128
00129 return true;
00130 }
00131
00132 void ServiceServerLink::onHeaderWritten(const ConnectionPtr& conn)
00133 {
00134 header_written_ = true;
00135 }
00136
00137 bool ServiceServerLink::onHeaderReceived(const ConnectionPtr& conn, const Header& header)
00138 {
00139 std::string md5sum, type;
00140 if (!header.getValue("md5sum", md5sum))
00141 {
00142 ROS_ERROR("TCPROS header from service server did not have required element: md5sum");
00143 return false;
00144 }
00145
00146 bool empty = false;
00147 {
00148 boost::mutex::scoped_lock lock(call_queue_mutex_);
00149 empty = call_queue_.empty();
00150
00151 if (empty)
00152 {
00153 header_read_ = true;
00154 }
00155 }
00156
00157 if (!empty)
00158 {
00159 processNextCall();
00160
00161 header_read_ = true;
00162 }
00163
00164 return true;
00165 }
00166
00167 void ServiceServerLink::onConnectionDropped(const ConnectionPtr& conn)
00168 {
00169 ROS_ASSERT(conn == connection_);
00170 ROSCPP_LOG_DEBUG("Service client from [%s] for [%s] dropped", conn->getRemoteString().c_str(), service_name_.c_str());
00171
00172 dropped_ = true;
00173 clearCalls();
00174
00175 ServiceManager::instance()->removeServiceServerLink(shared_from_this());
00176 }
00177
00178 void ServiceServerLink::onRequestWritten(const ConnectionPtr& conn)
00179 {
00180
00181 connection_->read(5, boost::bind(&ServiceServerLink::onResponseOkAndLength, this, _1, _2, _3, _4));
00182 }
00183
00184 void ServiceServerLink::onResponseOkAndLength(const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success)
00185 {
00186 ROS_ASSERT(conn == connection_);
00187 ROS_ASSERT(size == 5);
00188
00189 if (!success)
00190 return;
00191
00192 uint8_t ok = buffer[0];
00193 uint32_t len = *((uint32_t*)(buffer.get() + 1));
00194
00195 if (len > 1000000000)
00196 {
00197 ROS_ERROR("a message of over a gigabyte was " \
00198 "predicted in tcpros. that seems highly " \
00199 "unlikely, so I'll assume protocol " \
00200 "synchronization is lost.");
00201 conn->drop(Connection::Destructing);
00202
00203 return;
00204 }
00205
00206 {
00207 boost::mutex::scoped_lock lock(call_queue_mutex_);
00208 if ( ok != 0 ) {
00209 current_call_->success_ = true;
00210 } else {
00211 current_call_->success_ = false;
00212 }
00213 }
00214
00215 if (len > 0)
00216 {
00217 connection_->read(len, boost::bind(&ServiceServerLink::onResponse, this, _1, _2, _3, _4));
00218 }
00219 else
00220 {
00221 onResponse(conn, boost::shared_array<uint8_t>(), 0, true);
00222 }
00223 }
00224
00225 void ServiceServerLink::onResponse(const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success)
00226 {
00227 ROS_ASSERT(conn == connection_);
00228
00229 if (!success)
00230 return;
00231
00232 {
00233 boost::mutex::scoped_lock queue_lock(call_queue_mutex_);
00234
00235 if (current_call_->success_)
00236 {
00237 *current_call_->resp_ = SerializedMessage(buffer, size);
00238 }
00239 }
00240
00241 callFinished();
00242 }
00243
00244 void ServiceServerLink::callFinished()
00245 {
00246 CallInfoPtr saved_call;
00247 ServiceServerLinkPtr self;
00248 {
00249 boost::mutex::scoped_lock queue_lock(call_queue_mutex_);
00250 boost::mutex::scoped_lock finished_lock(current_call_->finished_mutex_);
00251
00252 ROS_DEBUG_NAMED("superdebug", "Client to service [%s] call finished with success=[%s]", service_name_.c_str(), current_call_->success_ ? "true" : "false");
00253
00254 current_call_->finished_ = true;
00255 current_call_->finished_condition_.notify_all();
00256
00257 saved_call = current_call_;
00258 current_call_ = CallInfoPtr();
00259
00260
00261
00262
00263 self = shared_from_this();
00264 }
00265
00266 saved_call = CallInfoPtr();
00267
00268 processNextCall();
00269 }
00270
00271 void ServiceServerLink::processNextCall()
00272 {
00273 bool empty = false;
00274 {
00275 boost::mutex::scoped_lock lock(call_queue_mutex_);
00276
00277 if (current_call_)
00278 {
00279 return;
00280 }
00281
00282 if (!call_queue_.empty())
00283 {
00284 ROS_DEBUG_NAMED("superdebug", "[%s] Client to service [%s] processing next service call", persistent_ ? "persistent" : "non-persistent", service_name_.c_str());
00285
00286 current_call_ = call_queue_.front();
00287 call_queue_.pop();
00288 }
00289 else
00290 {
00291 empty = true;
00292 }
00293 }
00294
00295 if (empty)
00296 {
00297 if (!persistent_)
00298 {
00299 ROS_DEBUG_NAMED("superdebug", "Dropping non-persistent client to service [%s]", service_name_.c_str());
00300 connection_->drop(Connection::Destructing);
00301 }
00302 else
00303 {
00304 ROS_DEBUG_NAMED("superdebug", "Keeping persistent client to service [%s]", service_name_.c_str());
00305 }
00306 }
00307 else
00308 {
00309 SerializedMessage request;
00310
00311 {
00312 boost::mutex::scoped_lock lock(call_queue_mutex_);
00313 request = current_call_->req_;
00314 }
00315
00316 connection_->write(request.buf, request.num_bytes, boost::bind(&ServiceServerLink::onRequestWritten, this, _1));
00317 }
00318 }
00319
00320 bool ServiceServerLink::call(const SerializedMessage& req, SerializedMessage& resp)
00321 {
00322 CallInfoPtr info(new CallInfo);
00323 info->req_ = req;
00324 info->resp_ = &resp;
00325 info->success_ = false;
00326 info->finished_ = false;
00327 info->call_finished_ = false;
00328 info->caller_thread_id_ = boost::this_thread::get_id();
00329
00330
00331
00332 bool immediate = false;
00333 {
00334 boost::mutex::scoped_lock lock(call_queue_mutex_);
00335
00336 if (connection_->isDropped())
00337 {
00338 ROSCPP_LOG_DEBUG("ServiceServerLink::call called on dropped connection for service [%s]", service_name_.c_str());
00339 info->call_finished_ = true;
00340 return false;
00341 }
00342
00343 if (call_queue_.empty() && header_written_ && header_read_)
00344 {
00345 immediate = true;
00346 }
00347
00348 call_queue_.push(info);
00349 }
00350
00351 if (immediate)
00352 {
00353 processNextCall();
00354 }
00355
00356 {
00357 boost::mutex::scoped_lock lock(info->finished_mutex_);
00358
00359 while (!info->finished_)
00360 {
00361 info->finished_condition_.wait(lock);
00362 }
00363 }
00364
00365 info->call_finished_ = true;
00366
00367 return info->success_;
00368 }
00369
00370 bool ServiceServerLink::isValid() const
00371 {
00372 return !dropped_;
00373 }
00374
00375 }
00376