00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #include "ros/service_publication.h"
00036 #include "ros/service_client_link.h"
00037 #include "ros/connection.h"
00038 #include "ros/callback_queue_interface.h"
00039
00040 #include <boost/bind.hpp>
00041
00042 #include <std_msgs/String.h>
00043
00044 namespace ros
00045 {
00046
00047 ServicePublication::ServicePublication(const std::string& name, const std::string &md5sum, const std::string& data_type, const std::string& request_data_type,
00048 const std::string& response_data_type, const ServiceCallbackHelperPtr& helper, CallbackQueueInterface* callback_queue,
00049 const VoidConstPtr& tracked_object)
00050 : name_(name)
00051 , md5sum_(md5sum)
00052 , data_type_(data_type)
00053 , request_data_type_(request_data_type)
00054 , response_data_type_(response_data_type)
00055 , helper_(helper)
00056 , dropped_(false)
00057 , callback_queue_(callback_queue)
00058 , has_tracked_object_(false)
00059 , tracked_object_(tracked_object)
00060 {
00061 if (tracked_object)
00062 {
00063 has_tracked_object_ = true;
00064 }
00065 }
00066
00067 ServicePublication::~ServicePublication()
00068 {
00069 drop();
00070 }
00071
00072 void ServicePublication::drop()
00073 {
00074
00075
00076 {
00077 boost::mutex::scoped_lock lock(client_links_mutex_);
00078 dropped_ = true;
00079 }
00080
00081 dropAllConnections();
00082
00083 callback_queue_->removeByID((uint64_t)this);
00084 }
00085
00086 class ServiceCallback : public CallbackInterface
00087 {
00088 public:
00089 ServiceCallback(const ServiceCallbackHelperPtr& helper, const boost::shared_array<uint8_t>& buf, size_t num_bytes, const ServiceClientLinkPtr& link, bool has_tracked_object, const VoidConstWPtr& tracked_object)
00090 : helper_(helper)
00091 , buffer_(buf)
00092 , num_bytes_(num_bytes)
00093 , link_(link)
00094 , has_tracked_object_(has_tracked_object)
00095 , tracked_object_(tracked_object)
00096 {
00097 }
00098
00099 virtual CallResult call()
00100 {
00101 if (link_->getConnection()->isDropped())
00102 {
00103 return Invalid;
00104 }
00105
00106 VoidConstPtr tracker;
00107 if (has_tracked_object_)
00108 {
00109 tracker = tracked_object_.lock();
00110
00111 if (!tracker)
00112 {
00113 SerializedMessage res = serialization::serializeServiceResponse<uint32_t>(false, 0);
00114 link_->processResponse(false, res);
00115 return Invalid;
00116 }
00117 }
00118
00119 ServiceCallbackHelperCallParams params;
00120 params.request = SerializedMessage(buffer_, num_bytes_);
00121 params.connection_header = link_->getConnection()->getHeader().getValues();
00122 try
00123 {
00124
00125 bool ok = helper_->call(params);
00126 if (ok != 0)
00127 {
00128 link_->processResponse(true, params.response);
00129 }
00130 else
00131 {
00132 SerializedMessage res = serialization::serializeServiceResponse<uint32_t>(false, 0);
00133 link_->processResponse(false, res);
00134 }
00135 }
00136 catch (std::exception& e)
00137 {
00138 ROS_ERROR("Exception thrown while processing service call: %s", e.what());
00139 std_msgs::String error_string;
00140 error_string.data = e.what();
00141 SerializedMessage res = serialization::serializeServiceResponse(false, error_string);
00142 link_->processResponse(false, res);
00143 return Invalid;
00144 }
00145
00146 return Success;
00147 }
00148
00149 private:
00150 ServiceCallbackHelperPtr helper_;
00151 boost::shared_array<uint8_t> buffer_;
00152 uint32_t num_bytes_;
00153 ServiceClientLinkPtr link_;
00154 bool has_tracked_object_;
00155 VoidConstWPtr tracked_object_;
00156 };
00157
00158 void ServicePublication::processRequest(boost::shared_array<uint8_t> buf, size_t num_bytes, const ServiceClientLinkPtr& link)
00159 {
00160 CallbackInterfacePtr cb(new ServiceCallback(helper_, buf, num_bytes, link, has_tracked_object_, tracked_object_));
00161 callback_queue_->addCallback(cb, (uint64_t)this);
00162 }
00163
00164 void ServicePublication::addServiceClientLink(const ServiceClientLinkPtr& link)
00165 {
00166 boost::mutex::scoped_lock lock(client_links_mutex_);
00167
00168 client_links_.push_back(link);
00169 }
00170
00171 void ServicePublication::removeServiceClientLink(const ServiceClientLinkPtr& link)
00172 {
00173 boost::mutex::scoped_lock lock(client_links_mutex_);
00174
00175 V_ServiceClientLink::iterator it = std::find(client_links_.begin(), client_links_.end(), link);
00176 if (it != client_links_.end())
00177 {
00178 client_links_.erase(it);
00179 }
00180 }
00181
00182 void ServicePublication::dropAllConnections()
00183 {
00184
00185
00186 V_ServiceClientLink local_links;
00187
00188 {
00189 boost::mutex::scoped_lock lock(client_links_mutex_);
00190
00191 local_links.swap(client_links_);
00192 }
00193
00194 for (V_ServiceClientLink::iterator i = local_links.begin();
00195 i != local_links.end(); ++i)
00196 {
00197 (*i)->getConnection()->drop(Connection::Destructing);
00198 }
00199 }
00200
00201 }