service_server_link.cpp
Go to the documentation of this file.
00001 /*
00002  * Software License Agreement (BSD License)
00003  *
00004  *  Copyright (c) 2008, Willow Garage, Inc.
00005  *  All rights reserved.
00006  *
00007  *  Redistribution and use in source and binary forms, with or without
00008  *  modification, are permitted provided that the following conditions
00009  *  are met:
00010  *
00011  *   * Redistributions of source code must retain the above copyright
00012  *     notice, this list of conditions and the following disclaimer.
00013  *   * Redistributions in binary form must reproduce the above
00014  *     copyright notice, this list of conditions and the following
00015  *     disclaimer in the documentation and/or other materials provided
00016  *     with the distribution.
00017  *   * Neither the name of Willow Garage, Inc. nor the names of its
00018  *     contributors may be used to endorse or promote products derived
00019  *     from this software without specific prior written permission.
00020  *
00021  *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022  *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023  *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024  *  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025  *  COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026  *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027  *  BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028  *  LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029  *  CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030  *  LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031  *  ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032  *  POSSIBILITY OF SUCH DAMAGE.
00033  */
00034 
00035 #include "ros/service_server_link.h"
00036 #include "ros/header.h"
00037 #include "ros/connection.h"
00038 #include "ros/service_manager.h"
00039 #include "ros/transport/transport.h"
00040 #include "ros/this_node.h"
00041 #include "ros/file_log.h"
00042 
00043 #include <boost/bind.hpp>
00044 
00045 #include <sstream>
00046 
00047 namespace ros
00048 {
00049 
00050 ServiceServerLink::ServiceServerLink(const std::string& service_name, bool persistent, const std::string& request_md5sum,
00051                              const std::string& response_md5sum, const M_string& header_values)
00052 : service_name_(service_name)
00053 , persistent_(persistent)
00054 , request_md5sum_(request_md5sum)
00055 , response_md5sum_(response_md5sum)
00056 , extra_outgoing_header_values_(header_values)
00057 , header_written_(false)
00058 , header_read_(false)
00059 , dropped_(false)
00060 {
00061 }
00062 
00063 ServiceServerLink::~ServiceServerLink()
00064 {
00065   ROS_ASSERT(connection_->isDropped());
00066 
00067   clearCalls();
00068 }
00069 
00070 void ServiceServerLink::cancelCall(const CallInfoPtr& info)
00071 {
00072   CallInfoPtr local = info;
00073   {
00074     boost::mutex::scoped_lock lock(local->finished_mutex_);
00075     local->finished_ = true;
00076     local->finished_condition_.notify_all();
00077   }
00078 
00079   if (boost::this_thread::get_id() != info->caller_thread_id_)
00080   {
00081     while (!local->call_finished_)
00082     {
00083       boost::this_thread::yield();
00084     }
00085   }
00086 }
00087 
00088 void ServiceServerLink::clearCalls()
00089 {
00090   CallInfoPtr local_current;
00091 
00092   {
00093     boost::mutex::scoped_lock lock(call_queue_mutex_);
00094     local_current = current_call_;
00095   }
00096 
00097   if (local_current)
00098   {
00099     cancelCall(local_current);
00100   }
00101 
00102   boost::mutex::scoped_lock lock(call_queue_mutex_);
00103 
00104   while (!call_queue_.empty())
00105   {
00106     CallInfoPtr info = call_queue_.front();
00107 
00108     cancelCall(info);
00109 
00110     call_queue_.pop();
00111   }
00112 }
00113 
00114 bool ServiceServerLink::initialize(const ConnectionPtr& connection)
00115 {
00116   connection_ = connection;
00117   connection_->addDropListener(boost::bind(&ServiceServerLink::onConnectionDropped, this, _1));
00118   connection_->setHeaderReceivedCallback(boost::bind(&ServiceServerLink::onHeaderReceived, this, _1, _2));
00119 
00120   M_string header;
00121   header["service"] = service_name_;
00122   header["md5sum"] = request_md5sum_;
00123   header["callerid"] = this_node::getName();
00124   header["persistent"] = persistent_ ? "1" : "0";
00125   header.insert(extra_outgoing_header_values_.begin(), extra_outgoing_header_values_.end());
00126 
00127   connection_->writeHeader(header, boost::bind(&ServiceServerLink::onHeaderWritten, this, _1));
00128 
00129   return true;
00130 }
00131 
00132 void ServiceServerLink::onHeaderWritten(const ConnectionPtr& conn)
00133 {
00134   header_written_ = true;
00135 }
00136 
00137 bool ServiceServerLink::onHeaderReceived(const ConnectionPtr& conn, const Header& header)
00138 {
00139   std::string md5sum, type;
00140   if (!header.getValue("md5sum", md5sum))
00141   {
00142     ROS_ERROR("TCPROS header from service server did not have required element: md5sum");
00143     return false;
00144   }
00145 
00146   bool empty = false;
00147   {
00148     boost::mutex::scoped_lock lock(call_queue_mutex_);
00149     empty = call_queue_.empty();
00150 
00151     if (empty)
00152     {
00153       header_read_ = true;
00154     }
00155   }
00156 
00157   if (!empty)
00158   {
00159     processNextCall();
00160 
00161     header_read_ = true;
00162   }
00163 
00164   return true;
00165 }
00166 
00167 void ServiceServerLink::onConnectionDropped(const ConnectionPtr& conn)
00168 {
00169   ROS_ASSERT(conn == connection_);
00170   ROSCPP_LOG_DEBUG("Service client from [%s] for [%s] dropped", conn->getRemoteString().c_str(), service_name_.c_str());
00171 
00172   dropped_ = true;
00173   clearCalls();
00174 
00175   ServiceManager::instance()->removeServiceServerLink(shared_from_this());
00176 }
00177 
00178 void ServiceServerLink::onRequestWritten(const ConnectionPtr& conn)
00179 {
00180   //ros::WallDuration(0.1).sleep();
00181   connection_->read(5, boost::bind(&ServiceServerLink::onResponseOkAndLength, this, _1, _2, _3, _4));
00182 }
00183 
00184 void ServiceServerLink::onResponseOkAndLength(const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success)
00185 {
00186   ROS_ASSERT(conn == connection_);
00187   ROS_ASSERT(size == 5);
00188 
00189   if (!success)
00190     return;
00191 
00192   uint8_t ok = buffer[0];
00193   uint32_t len = *((uint32_t*)(buffer.get() + 1));
00194 
00195   if (len > 1000000000)
00196   {
00197     ROS_ERROR("a message of over a gigabyte was " \
00198                 "predicted in tcpros. that seems highly " \
00199                 "unlikely, so I'll assume protocol " \
00200                 "synchronization is lost.");
00201     conn->drop(Connection::Destructing);
00202 
00203     return;
00204   }
00205 
00206   {
00207     boost::mutex::scoped_lock lock(call_queue_mutex_);
00208     if ( ok != 0 ) {
00209         current_call_->success_ = true;
00210     } else {
00211         current_call_->success_ = false;
00212     }
00213   }
00214 
00215   if (len > 0)
00216   {
00217     connection_->read(len, boost::bind(&ServiceServerLink::onResponse, this, _1, _2, _3, _4));
00218   }
00219   else
00220   {
00221     onResponse(conn, boost::shared_array<uint8_t>(), 0, true);
00222   }
00223 }
00224 
00225 void ServiceServerLink::onResponse(const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success)
00226 {
00227   ROS_ASSERT(conn == connection_);
00228 
00229   if (!success)
00230     return;
00231 
00232   {
00233     boost::mutex::scoped_lock queue_lock(call_queue_mutex_);
00234 
00235     if (current_call_->success_)
00236     {
00237       *current_call_->resp_ = SerializedMessage(buffer, size);
00238     }
00239     else
00240     {
00241       current_call_->exception_string_ = std::string(reinterpret_cast<char*>(buffer.get()), size);
00242     }
00243   }
00244 
00245   callFinished();
00246 }
00247 
00248 void ServiceServerLink::callFinished()
00249 {
00250   CallInfoPtr saved_call;
00251   ServiceServerLinkPtr self;
00252   {
00253     boost::mutex::scoped_lock queue_lock(call_queue_mutex_);
00254     boost::mutex::scoped_lock finished_lock(current_call_->finished_mutex_);
00255 
00256     ROS_DEBUG_NAMED("superdebug", "Client to service [%s] call finished with success=[%s]", service_name_.c_str(), current_call_->success_ ? "true" : "false");
00257 
00258     current_call_->finished_ = true;
00259     current_call_->finished_condition_.notify_all();
00260 
00261     saved_call = current_call_;
00262     current_call_ = CallInfoPtr();
00263 
00264     // If the call queue is empty here, we may be deleted as soon as we release these locks, so keep a shared pointer to ourselves until we return
00265     // ugly
00266     // jfaust TODO there's got to be a better way
00267     self = shared_from_this();
00268   }
00269 
00270   saved_call = CallInfoPtr();
00271 
00272   processNextCall();
00273 }
00274 
00275 void ServiceServerLink::processNextCall()
00276 {
00277   bool empty = false;
00278   {
00279     boost::mutex::scoped_lock lock(call_queue_mutex_);
00280 
00281     if (current_call_)
00282     {
00283       return;
00284     }
00285 
00286     if (!call_queue_.empty())
00287     {
00288       ROS_DEBUG_NAMED("superdebug", "[%s] Client to service [%s] processing next service call", persistent_ ? "persistent" : "non-persistent", service_name_.c_str());
00289 
00290       current_call_ = call_queue_.front();
00291       call_queue_.pop();
00292     }
00293     else
00294     {
00295       empty = true;
00296     }
00297   }
00298 
00299   if (empty)
00300   {
00301     if (!persistent_)
00302     {
00303       ROS_DEBUG_NAMED("superdebug", "Dropping non-persistent client to service [%s]", service_name_.c_str());
00304       connection_->drop(Connection::Destructing);
00305     }
00306     else
00307     {
00308       ROS_DEBUG_NAMED("superdebug", "Keeping persistent client to service [%s]", service_name_.c_str());
00309     }
00310   }
00311   else
00312   {
00313     SerializedMessage request;
00314 
00315     {
00316       boost::mutex::scoped_lock lock(call_queue_mutex_);
00317       request = current_call_->req_;
00318     }
00319 
00320     connection_->write(request.buf, request.num_bytes, boost::bind(&ServiceServerLink::onRequestWritten, this, _1));
00321   }
00322 }
00323 
00324 bool ServiceServerLink::call(const SerializedMessage& req, SerializedMessage& resp)
00325 {
00326   CallInfoPtr info(new CallInfo);
00327   info->req_ = req;
00328   info->resp_ = &resp;
00329   info->success_ = false;
00330   info->finished_ = false;
00331   info->call_finished_ = false;
00332   info->caller_thread_id_ = boost::this_thread::get_id();
00333 
00334   //ros::WallDuration(0.1).sleep();
00335 
00336   bool immediate = false;
00337   {
00338     boost::mutex::scoped_lock lock(call_queue_mutex_);
00339 
00340     if (connection_->isDropped())
00341     {
00342       ROSCPP_LOG_DEBUG("ServiceServerLink::call called on dropped connection for service [%s]", service_name_.c_str());
00343       info->call_finished_ = true;
00344       return false;
00345     }
00346 
00347     if (call_queue_.empty() && header_written_ && header_read_)
00348     {
00349       immediate = true;
00350     }
00351 
00352     call_queue_.push(info);
00353   }
00354 
00355   if (immediate)
00356   {
00357     processNextCall();
00358   }
00359 
00360   {
00361     boost::mutex::scoped_lock lock(info->finished_mutex_);
00362 
00363     while (!info->finished_)
00364     {
00365       info->finished_condition_.wait(lock);
00366     }
00367   }
00368 
00369   info->call_finished_ = true;
00370 
00371   if (info->exception_string_.length() > 0)
00372   {
00373     ROS_ERROR("Service call failed: service [%s] responded with an error: %s", service_name_.c_str(), info->exception_string_.c_str());
00374   }
00375 
00376   return info->success_;
00377 }
00378 
00379 bool ServiceServerLink::isValid() const
00380 {
00381   return !dropped_;
00382 }
00383 
00384 } // namespace ros
00385 


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim
autogenerated on Fri Aug 28 2015 12:33:11