service_server_link.cpp
Go to the documentation of this file.
00001 /*
00002  * Software License Agreement (BSD License)
00003  *
00004  *  Copyright (c) 2008, Willow Garage, Inc.
00005  *  All rights reserved.
00006  *
00007  *  Redistribution and use in source and binary forms, with or without
00008  *  modification, are permitted provided that the following conditions
00009  *  are met:
00010  *
00011  *   * Redistributions of source code must retain the above copyright
00012  *     notice, this list of conditions and the following disclaimer.
00013  *   * Redistributions in binary form must reproduce the above
00014  *     copyright notice, this list of conditions and the following
00015  *     disclaimer in the documentation and/or other materials provided
00016  *     with the distribution.
00017  *   * Neither the name of Willow Garage, Inc. nor the names of its
00018  *     contributors may be used to endorse or promote products derived
00019  *     from this software without specific prior written permission.
00020  *
00021  *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022  *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023  *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024  *  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025  *  COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026  *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027  *  BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028  *  LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029  *  CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030  *  LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031  *  ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032  *  POSSIBILITY OF SUCH DAMAGE.
00033  */
00034 
00035 #include "ros/service_server_link.h"
00036 #include "ros/header.h"
00037 #include "ros/connection.h"
00038 #include "ros/service_manager.h"
00039 #include "ros/transport/transport.h"
00040 #include "ros/this_node.h"
00041 #include "ros/file_log.h"
00042 
00043 #include <boost/bind.hpp>
00044 
00045 #include <sstream>
00046 
00047 namespace ros
00048 {
00049 
00050 ServiceServerLink::ServiceServerLink(const std::string& service_name, bool persistent, const std::string& request_md5sum,
00051                              const std::string& response_md5sum, const M_string& header_values)
00052 : service_name_(service_name)
00053 , persistent_(persistent)
00054 , request_md5sum_(request_md5sum)
00055 , response_md5sum_(response_md5sum)
00056 , extra_outgoing_header_values_(header_values)
00057 , header_written_(false)
00058 , header_read_(false)
00059 , dropped_(false)
00060 {
00061 }
00062 
00063 ServiceServerLink::~ServiceServerLink()
00064 {
00065   ROS_ASSERT(connection_->isDropped());
00066 
00067   clearCalls();
00068 }
00069 
00070 void ServiceServerLink::cancelCall(const CallInfoPtr& info)
00071 {
00072   CallInfoPtr local = info;
00073   {
00074     boost::mutex::scoped_lock lock(local->finished_mutex_);
00075     local->finished_ = true;
00076     local->finished_condition_.notify_all();
00077   }
00078 
00079   if (boost::this_thread::get_id() != info->caller_thread_id_)
00080   {
00081     while (!local->call_finished_)
00082     {
00083       boost::this_thread::yield();
00084     }
00085   }
00086 }
00087 
00088 void ServiceServerLink::clearCalls()
00089 {
00090   CallInfoPtr local_current;
00091 
00092   {
00093     boost::mutex::scoped_lock lock(call_queue_mutex_);
00094     local_current = current_call_;
00095   }
00096 
00097   if (local_current)
00098   {
00099     cancelCall(local_current);
00100   }
00101 
00102   boost::mutex::scoped_lock lock(call_queue_mutex_);
00103 
00104   while (!call_queue_.empty())
00105   {
00106     CallInfoPtr info = call_queue_.front();
00107 
00108     cancelCall(info);
00109 
00110     call_queue_.pop();
00111   }
00112 }
00113 
00114 bool ServiceServerLink::initialize(const ConnectionPtr& connection)
00115 {
00116   connection_ = connection;
00117   connection_->addDropListener(boost::bind(&ServiceServerLink::onConnectionDropped, this, _1));
00118   connection_->setHeaderReceivedCallback(boost::bind(&ServiceServerLink::onHeaderReceived, this, _1, _2));
00119 
00120   M_string header;
00121   header["service"] = service_name_;
00122   header["md5sum"] = request_md5sum_;
00123   header["callerid"] = this_node::getName();
00124   header["persistent"] = persistent_ ? "1" : "0";
00125   header.insert(extra_outgoing_header_values_.begin(), extra_outgoing_header_values_.end());
00126 
00127   connection_->writeHeader(header, boost::bind(&ServiceServerLink::onHeaderWritten, this, _1));
00128 
00129   return true;
00130 }
00131 
00132 void ServiceServerLink::onHeaderWritten(const ConnectionPtr& conn)
00133 {
00134   (void)conn;
00135   header_written_ = true;
00136 }
00137 
00138 bool ServiceServerLink::onHeaderReceived(const ConnectionPtr& conn, const Header& header)
00139 {
00140   (void)conn;
00141   std::string md5sum, type;
00142   if (!header.getValue("md5sum", md5sum))
00143   {
00144     ROS_ERROR("TCPROS header from service server did not have required element: md5sum");
00145     return false;
00146   }
00147 
00148   bool empty = false;
00149   {
00150     boost::mutex::scoped_lock lock(call_queue_mutex_);
00151     empty = call_queue_.empty();
00152 
00153     if (empty)
00154     {
00155       header_read_ = true;
00156     }
00157   }
00158 
00159   if (!empty)
00160   {
00161     processNextCall();
00162 
00163     header_read_ = true;
00164   }
00165 
00166   return true;
00167 }
00168 
00169 void ServiceServerLink::onConnectionDropped(const ConnectionPtr& conn)
00170 {
00171   ROS_ASSERT(conn == connection_);
00172   ROSCPP_LOG_DEBUG("Service client from [%s] for [%s] dropped", conn->getRemoteString().c_str(), service_name_.c_str());
00173 
00174   dropped_ = true;
00175   clearCalls();
00176 
00177   ServiceManager::instance()->removeServiceServerLink(shared_from_this());
00178 }
00179 
00180 void ServiceServerLink::onRequestWritten(const ConnectionPtr& conn)
00181 {
00182   (void)conn;
00183   //ros::WallDuration(0.1).sleep();
00184   connection_->read(5, boost::bind(&ServiceServerLink::onResponseOkAndLength, this, _1, _2, _3, _4));
00185 }
00186 
00187 void ServiceServerLink::onResponseOkAndLength(const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success)
00188 {
00189   ROS_ASSERT(conn == connection_);
00190   ROS_ASSERT(size == 5);
00191 
00192   if (!success)
00193     return;
00194 
00195   uint8_t ok = buffer[0];
00196   uint32_t len = *((uint32_t*)(buffer.get() + 1));
00197 
00198   if (len > 1000000000)
00199   {
00200     ROS_ERROR("a message of over a gigabyte was " \
00201                 "predicted in tcpros. that seems highly " \
00202                 "unlikely, so I'll assume protocol " \
00203                 "synchronization is lost.");
00204     conn->drop(Connection::Destructing);
00205 
00206     return;
00207   }
00208 
00209   {
00210     boost::mutex::scoped_lock lock(call_queue_mutex_);
00211     if ( ok != 0 ) {
00212         current_call_->success_ = true;
00213     } else {
00214         current_call_->success_ = false;
00215     }
00216   }
00217 
00218   if (len > 0)
00219   {
00220     connection_->read(len, boost::bind(&ServiceServerLink::onResponse, this, _1, _2, _3, _4));
00221   }
00222   else
00223   {
00224     onResponse(conn, boost::shared_array<uint8_t>(), 0, true);
00225   }
00226 }
00227 
00228 void ServiceServerLink::onResponse(const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success)
00229 {
00230   ROS_ASSERT(conn == connection_);
00231 
00232   if (!success)
00233     return;
00234 
00235   {
00236     boost::mutex::scoped_lock queue_lock(call_queue_mutex_);
00237 
00238     if (current_call_->success_)
00239     {
00240       *current_call_->resp_ = SerializedMessage(buffer, size);
00241     }
00242     else
00243     {
00244       current_call_->exception_string_ = std::string(reinterpret_cast<char*>(buffer.get()), size);
00245     }
00246   }
00247 
00248   callFinished();
00249 }
00250 
00251 void ServiceServerLink::callFinished()
00252 {
00253   CallInfoPtr saved_call;
00254   ServiceServerLinkPtr self;
00255   {
00256     boost::mutex::scoped_lock queue_lock(call_queue_mutex_);
00257     boost::mutex::scoped_lock finished_lock(current_call_->finished_mutex_);
00258 
00259     ROS_DEBUG_NAMED("superdebug", "Client to service [%s] call finished with success=[%s]", service_name_.c_str(), current_call_->success_ ? "true" : "false");
00260 
00261     current_call_->finished_ = true;
00262     current_call_->finished_condition_.notify_all();
00263 
00264     saved_call = current_call_;
00265     current_call_ = CallInfoPtr();
00266 
00267     // If the call queue is empty here, we may be deleted as soon as we release these locks, so keep a shared pointer to ourselves until we return
00268     // ugly
00269     // jfaust TODO there's got to be a better way
00270     self = shared_from_this();
00271   }
00272 
00273   saved_call = CallInfoPtr();
00274 
00275   processNextCall();
00276 }
00277 
00278 void ServiceServerLink::processNextCall()
00279 {
00280   bool empty = false;
00281   {
00282     boost::mutex::scoped_lock lock(call_queue_mutex_);
00283 
00284     if (current_call_)
00285     {
00286       return;
00287     }
00288 
00289     if (!call_queue_.empty())
00290     {
00291       ROS_DEBUG_NAMED("superdebug", "[%s] Client to service [%s] processing next service call", persistent_ ? "persistent" : "non-persistent", service_name_.c_str());
00292 
00293       current_call_ = call_queue_.front();
00294       call_queue_.pop();
00295     }
00296     else
00297     {
00298       empty = true;
00299     }
00300   }
00301 
00302   if (empty)
00303   {
00304     if (!persistent_)
00305     {
00306       ROS_DEBUG_NAMED("superdebug", "Dropping non-persistent client to service [%s]", service_name_.c_str());
00307       connection_->drop(Connection::Destructing);
00308     }
00309     else
00310     {
00311       ROS_DEBUG_NAMED("superdebug", "Keeping persistent client to service [%s]", service_name_.c_str());
00312     }
00313   }
00314   else
00315   {
00316     SerializedMessage request;
00317 
00318     {
00319       boost::mutex::scoped_lock lock(call_queue_mutex_);
00320       request = current_call_->req_;
00321     }
00322 
00323     connection_->write(request.buf, request.num_bytes, boost::bind(&ServiceServerLink::onRequestWritten, this, _1));
00324   }
00325 }
00326 
00327 bool ServiceServerLink::call(const SerializedMessage& req, SerializedMessage& resp)
00328 {
00329   CallInfoPtr info(boost::make_shared<CallInfo>());
00330   info->req_ = req;
00331   info->resp_ = &resp;
00332   info->success_ = false;
00333   info->finished_ = false;
00334   info->call_finished_ = false;
00335   info->caller_thread_id_ = boost::this_thread::get_id();
00336 
00337   //ros::WallDuration(0.1).sleep();
00338 
00339   bool immediate = false;
00340   {
00341     boost::mutex::scoped_lock lock(call_queue_mutex_);
00342 
00343     if (connection_->isDropped())
00344     {
00345       ROSCPP_LOG_DEBUG("ServiceServerLink::call called on dropped connection for service [%s]", service_name_.c_str());
00346       info->call_finished_ = true;
00347       return false;
00348     }
00349 
00350     if (call_queue_.empty() && header_written_ && header_read_)
00351     {
00352       immediate = true;
00353     }
00354 
00355     call_queue_.push(info);
00356   }
00357 
00358   if (immediate)
00359   {
00360     processNextCall();
00361   }
00362 
00363   {
00364     boost::mutex::scoped_lock lock(info->finished_mutex_);
00365 
00366     while (!info->finished_)
00367     {
00368       info->finished_condition_.wait(lock);
00369     }
00370   }
00371 
00372   info->call_finished_ = true;
00373 
00374   if (info->exception_string_.length() > 0)
00375   {
00376     ROS_ERROR("Service call failed: service [%s] responded with an error: %s", service_name_.c_str(), info->exception_string_.c_str());
00377   }
00378 
00379   return info->success_;
00380 }
00381 
00382 bool ServiceServerLink::isValid() const
00383 {
00384   return !dropped_;
00385 }
00386 
00387 } // namespace ros
00388 


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim
autogenerated on Tue Mar 7 2017 03:44:47