Go to the documentation of this file.00001
00035 #ifndef ROSSERIAL_SERVER_TOPIC_HANDLERS_H
00036 #define ROSSERIAL_SERVER_TOPIC_HANDLERS_H
00037
00038 #include <boost/thread/condition_variable.hpp>
00039 #include <boost/thread/mutex.hpp>
00040 #include <ros/ros.h>
00041 #include <rosserial_msgs/TopicInfo.h>
00042 #include <rosserial_msgs/RequestMessageInfo.h>
00043 #include <rosserial_msgs/RequestServiceInfo.h>
00044 #include <topic_tools/shape_shifter.h>
00045
00046 namespace rosserial_server
00047 {
00048
00049 class Publisher {
00050 public:
00051 Publisher(ros::NodeHandle& nh, const rosserial_msgs::TopicInfo& topic_info) {
00052 if (!message_service_.isValid()) {
00053
00054 message_service_ = nh.serviceClient<rosserial_msgs::RequestMessageInfo>("message_info");
00055 if (!message_service_.waitForExistence(ros::Duration(5.0))) {
00056 ROS_WARN("Timed out waiting for message_info service to become available.");
00057 }
00058 }
00059
00060 rosserial_msgs::RequestMessageInfo info;
00061 info.request.type = topic_info.message_type;
00062 if (message_service_.call(info)) {
00063 if (info.response.md5 != topic_info.md5sum) {
00064 ROS_WARN_STREAM("Message" << topic_info.message_type << "MD5 sum from client does not match that in system. Will avoid using system's message definition.");
00065 info.response.definition = "";
00066 }
00067 } else {
00068 ROS_WARN("Failed to call message_info service. Proceeding without full message definition.");
00069 }
00070
00071 message_.morph(topic_info.md5sum, topic_info.message_type, info.response.definition, "false");
00072 publisher_ = message_.advertise(nh, topic_info.topic_name, 1);
00073 }
00074
00075 void handle(ros::serialization::IStream stream) {
00076 ros::serialization::Serializer<topic_tools::ShapeShifter>::read(stream, message_);
00077 publisher_.publish(message_);
00078 }
00079
00080 ROS_DEPRECATED std::string get_topic() {
00081 return get_name();
00082 }
00083 std::string get_name() {
00084 return publisher_.getTopic();
00085 }
00086
00087 private:
00088 ros::Publisher publisher_;
00089 topic_tools::ShapeShifter message_;
00090
00091 static ros::ServiceClient message_service_;
00092 };
00093
00094 ros::ServiceClient Publisher::message_service_;
00095 typedef boost::shared_ptr<Publisher> PublisherPtr;
00096
00097
00098 class Subscriber {
00099 public:
00100 Subscriber(ros::NodeHandle& nh, rosserial_msgs::TopicInfo& topic_info,
00101 boost::function<void(std::vector<uint8_t> buffer)> write_fn)
00102 : write_fn_(write_fn) {
00103 ros::SubscribeOptions opts;
00104 opts.init<topic_tools::ShapeShifter>(
00105 topic_info.topic_name, 1, boost::bind(&Subscriber::handle, this, _1));
00106 opts.md5sum = topic_info.md5sum;
00107 opts.datatype = topic_info.message_type;
00108 subscriber_ = nh.subscribe(opts);
00109 }
00110
00111 ROS_DEPRECATED std::string get_topic() {
00112 return get_name();
00113 }
00114 std::string get_name() {
00115 return subscriber_.getTopic();
00116 }
00117
00118 private:
00119 void handle(const boost::shared_ptr<topic_tools::ShapeShifter const>& msg) {
00120 size_t length = ros::serialization::serializationLength(*msg);
00121 std::vector<uint8_t> buffer(length);
00122
00123 ros::serialization::OStream ostream(&buffer[0], length);
00124 ros::serialization::Serializer<topic_tools::ShapeShifter>::write(ostream, *msg);
00125
00126 write_fn_(buffer);
00127 }
00128
00129 ros::Subscriber subscriber_;
00130 boost::function<void(std::vector<uint8_t> buffer)> write_fn_;
00131 };
00132
00133 typedef boost::shared_ptr<Subscriber> SubscriberPtr;
00134
00135 class ServiceClient {
00136 public:
00137 ServiceClient(ros::NodeHandle& nh, rosserial_msgs::TopicInfo& topic_info,
00138 boost::function<void(std::vector<uint8_t> buffer, const uint16_t topic_id)> write_fn)
00139 : write_fn_(write_fn) {
00140 topic_id_ = -1;
00141 if (!service_info_service_.isValid()) {
00142
00143 service_info_service_ = nh.serviceClient<rosserial_msgs::RequestServiceInfo>("service_info");
00144 if (!service_info_service_.waitForExistence(ros::Duration(5.0))) {
00145 ROS_WARN("Timed out waiting for service_info service to become available.");
00146 }
00147 }
00148
00149 rosserial_msgs::RequestServiceInfo info;
00150 info.request.service = topic_info.message_type;
00151 ROS_DEBUG("Calling service_info service for topic name %s",topic_info.topic_name.c_str());
00152 if (service_info_service_.call(info)) {
00153 request_message_md5_ = info.response.request_md5;
00154 response_message_md5_ = info.response.response_md5;
00155 } else {
00156 ROS_WARN("Failed to call service_info service. The service client will be created with blank md5sum.");
00157 }
00158 ros::ServiceClientOptions opts;
00159 opts.service = topic_info.topic_name;
00160 opts.md5sum = service_md5_ = info.response.service_md5;
00161 opts.persistent = false;
00162 service_client_ = nh.serviceClient(opts);
00163 }
00164 void setTopicId(uint16_t topic_id) {
00165 topic_id_ = topic_id;
00166 }
00167 std::string getRequestMessageMD5() {
00168 return request_message_md5_;
00169 }
00170 std::string getResponseMessageMD5() {
00171 return response_message_md5_;
00172 }
00173
00174 void handle(ros::serialization::IStream stream) {
00175
00176 ros::serialization::Serializer<topic_tools::ShapeShifter>::read(stream, request_message_);
00177
00178
00179
00180
00181 service_client_.call(request_message_, response_message_, service_md5_);
00182
00183
00184 size_t length = ros::serialization::serializationLength(response_message_);
00185 std::vector<uint8_t> buffer(length);
00186 ros::serialization::OStream ostream(&buffer[0], length);
00187 ros::serialization::Serializer<topic_tools::ShapeShifter>::write(ostream, response_message_);
00188 write_fn_(buffer,topic_id_);
00189 }
00190
00191 std::string get_name() {
00192 return service_client_.getService();
00193 }
00194
00195 private:
00196 topic_tools::ShapeShifter request_message_;
00197 topic_tools::ShapeShifter response_message_;
00198 ros::ServiceClient service_client_;
00199 static ros::ServiceClient service_info_service_;
00200 boost::function<void(std::vector<uint8_t> buffer, const uint16_t topic_id)> write_fn_;
00201 std::string service_md5_;
00202 std::string request_message_md5_;
00203 std::string response_message_md5_;
00204 uint16_t topic_id_;
00205 };
00206
00207 ros::ServiceClient ServiceClient::service_info_service_;
00208 typedef boost::shared_ptr<ServiceClient> ServiceClientPtr;
00209
00210 class ServiceServer {
00211 class ServiceCallbackHelper : public ros::ServiceCallbackHelper {
00212 boost::function<void(std::vector<uint8_t> buffer, const uint16_t topic_id)> write_fn_;
00213 uint16_t topic_id_;
00214 int got_response_;
00215 boost::mutex mutex_;
00216 boost::condition_variable cond_;
00217 topic_tools::ShapeShifter response_message_;
00218 public:
00219 ServiceCallbackHelper(boost::function<void(std::vector<uint8_t> buffer, const uint16_t topic_id)> write_fn):
00220 write_fn_(write_fn), topic_id_(-1) {}
00221 virtual bool call(ros::ServiceCallbackHelperCallParams& params){
00222 std::vector<uint8_t> buffer(params.request.message_start, params.request.message_start + params.request.num_bytes);
00223
00224 boost::mutex::scoped_lock lock(mutex_);
00225 got_response_ = 0;
00226 write_fn_(buffer,topic_id_);
00227
00228 while (!got_response_){
00229 cond_.wait(lock);
00230 }
00231
00232 if(got_response_ == 1) {
00233 params.response = ros::serialization::serializeServiceResponse(true, response_message_);
00234 return true;
00235 }
00236 return false;
00237 }
00238 void handle(ros::serialization::IStream stream) {
00239 ros::serialization::Serializer<topic_tools::ShapeShifter>::read(stream, response_message_);
00240 boost::mutex::scoped_lock lock(mutex_);
00241 got_response_ = 1;
00242 cond_.notify_all();
00243 }
00244
00245 void abort() {
00246 boost::mutex::scoped_lock lock(mutex_);
00247 got_response_ = -1;
00248 cond_.notify_all();
00249 }
00250
00251 void setTopicId(uint16_t topic_id) {
00252 topic_id_ = topic_id;
00253 }
00254 };
00255 public:
00256 ServiceServer(ros::NodeHandle& nh, rosserial_msgs::TopicInfo& topic_info,
00257 boost::function<void(std::vector<uint8_t> buffer, const uint16_t topic_id)> write_fn)
00258 {
00259 if (!service_info_service_.isValid()) {
00260
00261 service_info_service_ = nh.serviceClient<rosserial_msgs::RequestServiceInfo>("service_info");
00262 if (!service_info_service_.waitForExistence(ros::Duration(5.0))) {
00263 ROS_WARN("Timed out waiting for service_info service to become available.");
00264 }
00265 }
00266
00267 rosserial_msgs::RequestServiceInfo info;
00268 info.request.service = topic_info.message_type;
00269 ROS_DEBUG("Calling service_info service for topic name %s",topic_info.topic_name.c_str());
00270 if (service_info_service_.call(info)) {
00271 request_message_md5_ = info.response.request_md5;
00272 response_message_md5_ = info.response.response_md5;
00273 } else {
00274 ROS_WARN("Failed to call service_info service. The service client will be created with blank md5sum.");
00275 }
00276
00277 ros::AdvertiseServiceOptions opts;
00278 opts.service = topic_info.topic_name;
00279 opts.md5sum = info.response.service_md5;
00280 opts.datatype = topic_info.message_type;
00281 opts.req_datatype = request_message_md5_;
00282 opts.res_datatype = response_message_md5_;
00283
00284 service_call_helper_.reset(new ServiceCallbackHelper(write_fn));
00285 opts.helper = service_call_helper_;
00286
00287 service_server_ = nh.advertiseService(opts);
00288 }
00289 void setTopicId(uint16_t topic_id) {
00290 service_call_helper_->setTopicId(topic_id);
00291 }
00292 std::string getRequestMessageMD5() {
00293 return request_message_md5_;
00294 }
00295 std::string getResponseMessageMD5() {
00296 return response_message_md5_;
00297 }
00298 void handle(ros::serialization::IStream stream) {
00299 service_call_helper_->handle(stream);
00300 }
00301 void abort() {
00302 service_call_helper_->abort();
00303 }
00304 std::string get_name() {
00305 return service_server_.getService();
00306 }
00307
00308 private:
00309 ros::ServiceServer service_server_;
00310 static ros::ServiceClient service_info_service_;
00311 boost::shared_ptr<ServiceCallbackHelper> service_call_helper_;
00312 std::string request_message_md5_;
00313 std::string response_message_md5_;
00314 };
00315
00316 ros::ServiceClient ServiceServer::service_info_service_;
00317 typedef boost::shared_ptr<ServiceServer> ServiceServerPtr;
00318
00319
00320 }
00321
00322 #endif // ROSSERIAL_SERVER_TOPIC_HANDLERS_H