00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #include "ros/service_publication.h"
00036 #include "ros/service_client_link.h"
00037 #include "ros/connection.h"
00038 #include "ros/callback_queue_interface.h"
00039
00040 #include <boost/bind.hpp>
00041
00042 namespace ros
00043 {
00044
00045 ServicePublication::ServicePublication(const std::string& name, const std::string &md5sum, const std::string& data_type, const std::string& request_data_type,
00046 const std::string& response_data_type, const ServiceCallbackHelperPtr& helper, CallbackQueueInterface* callback_queue,
00047 const VoidConstPtr& tracked_object)
00048 : name_(name)
00049 , md5sum_(md5sum)
00050 , data_type_(data_type)
00051 , request_data_type_(request_data_type)
00052 , response_data_type_(response_data_type)
00053 , helper_(helper)
00054 , dropped_(false)
00055 , callback_queue_(callback_queue)
00056 , has_tracked_object_(false)
00057 , tracked_object_(tracked_object)
00058 {
00059 if (tracked_object)
00060 {
00061 has_tracked_object_ = true;
00062 }
00063 }
00064
00065 ServicePublication::~ServicePublication()
00066 {
00067 drop();
00068 }
00069
00070 void ServicePublication::drop()
00071 {
00072
00073
00074 {
00075 boost::mutex::scoped_lock lock(client_links_mutex_);
00076 dropped_ = true;
00077 }
00078
00079 dropAllConnections();
00080
00081 callback_queue_->removeByID((uint64_t)this);
00082 }
00083
00084 class ServiceCallback : public CallbackInterface
00085 {
00086 public:
00087 ServiceCallback(const ServiceCallbackHelperPtr& helper, const boost::shared_array<uint8_t>& buf, size_t num_bytes, const ServiceClientLinkPtr& link, bool has_tracked_object, const VoidConstWPtr& tracked_object)
00088 : helper_(helper)
00089 , buffer_(buf)
00090 , num_bytes_(num_bytes)
00091 , link_(link)
00092 , has_tracked_object_(has_tracked_object)
00093 , tracked_object_(tracked_object)
00094 {
00095 }
00096
00097 virtual CallResult call()
00098 {
00099 if (link_->getConnection()->isDropped())
00100 {
00101 return Invalid;
00102 }
00103
00104 VoidConstPtr tracker;
00105 if (has_tracked_object_)
00106 {
00107 tracker = tracked_object_.lock();
00108
00109 if (!tracker)
00110 {
00111 SerializedMessage res = serialization::serializeServiceResponse(false, 0);
00112 link_->processResponse(false, res);
00113 return Invalid;
00114 }
00115 }
00116
00117 ServiceCallbackHelperCallParams params;
00118 params.request = SerializedMessage(buffer_, num_bytes_);
00119 params.connection_header = link_->getConnection()->getHeader().getValues();
00120 try
00121 {
00122
00123 bool ok = helper_->call(params);
00124 link_->processResponse(ok, params.response);
00125 }
00126 catch (std::exception& e)
00127 {
00128 ROS_ERROR("Exception thrown while processing service call: %s", e.what());
00129 SerializedMessage res = serialization::serializeServiceResponse(false, 0);
00130 link_->processResponse(false, res);
00131 return Invalid;
00132 }
00133
00134 return Success;
00135 }
00136
00137 private:
00138 ServiceCallbackHelperPtr helper_;
00139 boost::shared_array<uint8_t> buffer_;
00140 uint32_t num_bytes_;
00141 ServiceClientLinkPtr link_;
00142 bool has_tracked_object_;
00143 VoidConstWPtr tracked_object_;
00144 };
00145
00146 void ServicePublication::processRequest(boost::shared_array<uint8_t> buf, size_t num_bytes, const ServiceClientLinkPtr& link)
00147 {
00148 CallbackInterfacePtr cb(new ServiceCallback(helper_, buf, num_bytes, link, has_tracked_object_, tracked_object_));
00149 callback_queue_->addCallback(cb, (uint64_t)this);
00150 }
00151
00152 void ServicePublication::addServiceClientLink(const ServiceClientLinkPtr& link)
00153 {
00154 boost::mutex::scoped_lock lock(client_links_mutex_);
00155
00156 client_links_.push_back(link);
00157 }
00158
00159 void ServicePublication::removeServiceClientLink(const ServiceClientLinkPtr& link)
00160 {
00161 boost::mutex::scoped_lock lock(client_links_mutex_);
00162
00163 V_ServiceClientLink::iterator it = std::find(client_links_.begin(), client_links_.end(), link);
00164 if (it != client_links_.end())
00165 {
00166 client_links_.erase(it);
00167 }
00168 }
00169
00170 void ServicePublication::dropAllConnections()
00171 {
00172
00173
00174 V_ServiceClientLink local_links;
00175
00176 {
00177 boost::mutex::scoped_lock lock(client_links_mutex_);
00178
00179 local_links.swap(client_links_);
00180 }
00181
00182 for (V_ServiceClientLink::iterator i = local_links.begin();
00183 i != local_links.end(); ++i)
00184 {
00185 (*i)->getConnection()->drop(Connection::Destructing);
00186 }
00187 }
00188
00189 }