$search
00001 /* 00002 * Software License Agreement (BSD License) 00003 * 00004 * Copyright (c) 2008, Willow Garage, Inc. 00005 * All rights reserved. 00006 * 00007 * Redistribution and use in source and binary forms, with or without 00008 * modification, are permitted provided that the following conditions 00009 * are met: 00010 * 00011 * * Redistributions of source code must retain the above copyright 00012 * notice, this list of conditions and the following disclaimer. 00013 * * Redistributions in binary form must reproduce the above 00014 * copyright notice, this list of conditions and the following 00015 * disclaimer in the documentation and/or other materials provided 00016 * with the distribution. 00017 * * Neither the name of Willow Garage, Inc. nor the names of its 00018 * contributors may be used to endorse or promote products derived 00019 * from this software without specific prior written permission. 00020 * 00021 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 00022 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 00023 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 00024 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 00025 * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 00026 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 00027 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00028 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 00029 * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 00030 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 00031 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00032 * POSSIBILITY OF SUCH DAMAGE. 00033 */ 00034 00035 #include "ros/service_server_link.h" 00036 #include "ros/header.h" 00037 #include "ros/connection.h" 00038 #include "ros/service_manager.h" 00039 #include "ros/transport/transport.h" 00040 #include "ros/this_node.h" 00041 #include "ros/file_log.h" 00042 00043 #include <boost/bind.hpp> 00044 00045 #include <sstream> 00046 00047 namespace ros 00048 { 00049 00050 ServiceServerLink::ServiceServerLink(const std::string& service_name, bool persistent, const std::string& request_md5sum, 00051 const std::string& response_md5sum, const M_string& header_values) 00052 : service_name_(service_name) 00053 , persistent_(persistent) 00054 , request_md5sum_(request_md5sum) 00055 , response_md5sum_(response_md5sum) 00056 , extra_outgoing_header_values_(header_values) 00057 , header_written_(false) 00058 , header_read_(false) 00059 , dropped_(false) 00060 { 00061 } 00062 00063 ServiceServerLink::~ServiceServerLink() 00064 { 00065 ROS_ASSERT(connection_->isDropped()); 00066 00067 clearCalls(); 00068 } 00069 00070 void ServiceServerLink::cancelCall(const CallInfoPtr& info) 00071 { 00072 CallInfoPtr local = info; 00073 { 00074 boost::mutex::scoped_lock lock(local->finished_mutex_); 00075 local->finished_ = true; 00076 local->finished_condition_.notify_all(); 00077 } 00078 00079 if (boost::this_thread::get_id() != info->caller_thread_id_) 00080 { 00081 while (!local->call_finished_) 00082 { 00083 boost::this_thread::yield(); 00084 } 00085 } 00086 } 00087 00088 void ServiceServerLink::clearCalls() 00089 { 00090 CallInfoPtr local_current; 00091 00092 { 00093 boost::mutex::scoped_lock lock(call_queue_mutex_); 00094 local_current = current_call_; 00095 } 00096 00097 if (local_current) 00098 { 00099 cancelCall(local_current); 00100 } 00101 00102 boost::mutex::scoped_lock lock(call_queue_mutex_); 00103 00104 while (!call_queue_.empty()) 00105 { 00106 CallInfoPtr info = call_queue_.front(); 00107 00108 cancelCall(info); 00109 00110 call_queue_.pop(); 00111 } 00112 } 00113 00114 bool ServiceServerLink::initialize(const ConnectionPtr& connection) 00115 { 00116 connection_ = connection; 00117 connection_->addDropListener(boost::bind(&ServiceServerLink::onConnectionDropped, this, _1)); 00118 connection_->setHeaderReceivedCallback(boost::bind(&ServiceServerLink::onHeaderReceived, this, _1, _2)); 00119 00120 M_string header; 00121 header["service"] = service_name_; 00122 header["md5sum"] = request_md5sum_; 00123 header["callerid"] = this_node::getName(); 00124 header["persistent"] = persistent_ ? "1" : "0"; 00125 header.insert(extra_outgoing_header_values_.begin(), extra_outgoing_header_values_.end()); 00126 00127 connection_->writeHeader(header, boost::bind(&ServiceServerLink::onHeaderWritten, this, _1)); 00128 00129 return true; 00130 } 00131 00132 void ServiceServerLink::onHeaderWritten(const ConnectionPtr& conn) 00133 { 00134 header_written_ = true; 00135 } 00136 00137 bool ServiceServerLink::onHeaderReceived(const ConnectionPtr& conn, const Header& header) 00138 { 00139 std::string md5sum, type; 00140 if (!header.getValue("md5sum", md5sum)) 00141 { 00142 ROS_ERROR("TCPROS header from service server did not have required element: md5sum"); 00143 return false; 00144 } 00145 00146 bool empty = false; 00147 { 00148 boost::mutex::scoped_lock lock(call_queue_mutex_); 00149 empty = call_queue_.empty(); 00150 00151 if (empty) 00152 { 00153 header_read_ = true; 00154 } 00155 } 00156 00157 if (!empty) 00158 { 00159 processNextCall(); 00160 00161 header_read_ = true; 00162 } 00163 00164 return true; 00165 } 00166 00167 void ServiceServerLink::onConnectionDropped(const ConnectionPtr& conn) 00168 { 00169 ROS_ASSERT(conn == connection_); 00170 ROSCPP_LOG_DEBUG("Service client from [%s] for [%s] dropped", conn->getRemoteString().c_str(), service_name_.c_str()); 00171 00172 dropped_ = true; 00173 clearCalls(); 00174 00175 ServiceManager::instance()->removeServiceServerLink(shared_from_this()); 00176 } 00177 00178 void ServiceServerLink::onRequestWritten(const ConnectionPtr& conn) 00179 { 00180 //ros::WallDuration(0.1).sleep(); 00181 connection_->read(5, boost::bind(&ServiceServerLink::onResponseOkAndLength, this, _1, _2, _3, _4)); 00182 } 00183 00184 void ServiceServerLink::onResponseOkAndLength(const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success) 00185 { 00186 ROS_ASSERT(conn == connection_); 00187 ROS_ASSERT(size == 5); 00188 00189 if (!success) 00190 return; 00191 00192 uint8_t ok = buffer[0]; 00193 uint32_t len = *((uint32_t*)(buffer.get() + 1)); 00194 00195 if (len > 1000000000) 00196 { 00197 ROS_ERROR("a message of over a gigabyte was " \ 00198 "predicted in tcpros. that seems highly " \ 00199 "unlikely, so I'll assume protocol " \ 00200 "synchronization is lost."); 00201 conn->drop(Connection::Destructing); 00202 00203 return; 00204 } 00205 00206 { 00207 boost::mutex::scoped_lock lock(call_queue_mutex_); 00208 if ( ok != 0 ) { 00209 current_call_->success_ = true; 00210 } else { 00211 current_call_->success_ = false; 00212 } 00213 } 00214 00215 if (len > 0) 00216 { 00217 connection_->read(len, boost::bind(&ServiceServerLink::onResponse, this, _1, _2, _3, _4)); 00218 } 00219 else 00220 { 00221 onResponse(conn, boost::shared_array<uint8_t>(), 0, true); 00222 } 00223 } 00224 00225 void ServiceServerLink::onResponse(const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success) 00226 { 00227 ROS_ASSERT(conn == connection_); 00228 00229 if (!success) 00230 return; 00231 00232 { 00233 boost::mutex::scoped_lock queue_lock(call_queue_mutex_); 00234 00235 if (current_call_->success_) 00236 { 00237 *current_call_->resp_ = SerializedMessage(buffer, size); 00238 } 00239 } 00240 00241 callFinished(); 00242 } 00243 00244 void ServiceServerLink::callFinished() 00245 { 00246 CallInfoPtr saved_call; 00247 ServiceServerLinkPtr self; 00248 { 00249 boost::mutex::scoped_lock queue_lock(call_queue_mutex_); 00250 boost::mutex::scoped_lock finished_lock(current_call_->finished_mutex_); 00251 00252 ROS_DEBUG_NAMED("superdebug", "Client to service [%s] call finished with success=[%s]", service_name_.c_str(), current_call_->success_ ? "true" : "false"); 00253 00254 current_call_->finished_ = true; 00255 current_call_->finished_condition_.notify_all(); 00256 00257 saved_call = current_call_; 00258 current_call_ = CallInfoPtr(); 00259 00260 // If the call queue is empty here, we may be deleted as soon as we release these locks, so keep a shared pointer to ourselves until we return 00261 // ugly 00262 // jfaust TODO there's got to be a better way 00263 self = shared_from_this(); 00264 } 00265 00266 saved_call = CallInfoPtr(); 00267 00268 processNextCall(); 00269 } 00270 00271 void ServiceServerLink::processNextCall() 00272 { 00273 bool empty = false; 00274 { 00275 boost::mutex::scoped_lock lock(call_queue_mutex_); 00276 00277 if (current_call_) 00278 { 00279 return; 00280 } 00281 00282 if (!call_queue_.empty()) 00283 { 00284 ROS_DEBUG_NAMED("superdebug", "[%s] Client to service [%s] processing next service call", persistent_ ? "persistent" : "non-persistent", service_name_.c_str()); 00285 00286 current_call_ = call_queue_.front(); 00287 call_queue_.pop(); 00288 } 00289 else 00290 { 00291 empty = true; 00292 } 00293 } 00294 00295 if (empty) 00296 { 00297 if (!persistent_) 00298 { 00299 ROS_DEBUG_NAMED("superdebug", "Dropping non-persistent client to service [%s]", service_name_.c_str()); 00300 connection_->drop(Connection::Destructing); 00301 } 00302 else 00303 { 00304 ROS_DEBUG_NAMED("superdebug", "Keeping persistent client to service [%s]", service_name_.c_str()); 00305 } 00306 } 00307 else 00308 { 00309 SerializedMessage request; 00310 00311 { 00312 boost::mutex::scoped_lock lock(call_queue_mutex_); 00313 request = current_call_->req_; 00314 } 00315 00316 connection_->write(request.buf, request.num_bytes, boost::bind(&ServiceServerLink::onRequestWritten, this, _1)); 00317 } 00318 } 00319 00320 bool ServiceServerLink::call(const SerializedMessage& req, SerializedMessage& resp) 00321 { 00322 CallInfoPtr info(new CallInfo); 00323 info->req_ = req; 00324 info->resp_ = &resp; 00325 info->success_ = false; 00326 info->finished_ = false; 00327 info->call_finished_ = false; 00328 info->caller_thread_id_ = boost::this_thread::get_id(); 00329 00330 //ros::WallDuration(0.1).sleep(); 00331 00332 bool immediate = false; 00333 { 00334 boost::mutex::scoped_lock lock(call_queue_mutex_); 00335 00336 if (connection_->isDropped()) 00337 { 00338 ROSCPP_LOG_DEBUG("ServiceServerLink::call called on dropped connection for service [%s]", service_name_.c_str()); 00339 info->call_finished_ = true; 00340 return false; 00341 } 00342 00343 if (call_queue_.empty() && header_written_ && header_read_) 00344 { 00345 immediate = true; 00346 } 00347 00348 call_queue_.push(info); 00349 } 00350 00351 if (immediate) 00352 { 00353 processNextCall(); 00354 } 00355 00356 { 00357 boost::mutex::scoped_lock lock(info->finished_mutex_); 00358 00359 while (!info->finished_) 00360 { 00361 info->finished_condition_.wait(lock); 00362 } 00363 } 00364 00365 info->call_finished_ = true; 00366 00367 return info->success_; 00368 } 00369 00370 bool ServiceServerLink::isValid() const 00371 { 00372 return !dropped_; 00373 } 00374 00375 } // namespace ros 00376