topic_handlers.h
Go to the documentation of this file.
00001 
00035 #ifndef ROSSERIAL_SERVER_TOPIC_HANDLERS_H
00036 #define ROSSERIAL_SERVER_TOPIC_HANDLERS_H
00037 
00038 #include <boost/thread/condition_variable.hpp>
00039 #include <boost/thread/mutex.hpp>
00040 #include <ros/ros.h>
00041 #include <rosserial_msgs/TopicInfo.h>
00042 #include <rosserial_msgs/RequestMessageInfo.h>
00043 #include <rosserial_msgs/RequestServiceInfo.h>
00044 #include <topic_tools/shape_shifter.h>
00045 
00046 namespace rosserial_server
00047 {
00048 
00049 class Publisher {
00050 public:
00051   Publisher(ros::NodeHandle& nh, const rosserial_msgs::TopicInfo& topic_info) {
00052     if (!message_service_.isValid()) {
00053       // lazy-initialize the service caller.
00054       message_service_ = nh.serviceClient<rosserial_msgs::RequestMessageInfo>("message_info");
00055       if (!message_service_.waitForExistence(ros::Duration(5.0))) {
00056         ROS_WARN("Timed out waiting for message_info service to become available.");
00057       }
00058     }
00059 
00060     rosserial_msgs::RequestMessageInfo info;
00061     info.request.type = topic_info.message_type;
00062     if (message_service_.call(info)) {
00063       if (info.response.md5 != topic_info.md5sum) {
00064         ROS_WARN_STREAM("Message" << topic_info.message_type  << "MD5 sum from client does not match that in system. Will avoid using system's message definition.");
00065         info.response.definition = "";
00066       }
00067     } else {
00068       ROS_WARN("Failed to call message_info service. Proceeding without full message definition.");
00069     }
00070 
00071     message_.morph(topic_info.md5sum, topic_info.message_type, info.response.definition, "false");
00072     publisher_ = message_.advertise(nh, topic_info.topic_name, 1);
00073   }
00074 
00075   void handle(ros::serialization::IStream stream) {
00076     ros::serialization::Serializer<topic_tools::ShapeShifter>::read(stream, message_);
00077     publisher_.publish(message_);
00078   }
00079 
00080   ROS_DEPRECATED std::string get_topic() {
00081       return get_name();
00082   }
00083   std::string get_name() {
00084     return publisher_.getTopic();
00085   }
00086 
00087 private:
00088   ros::Publisher publisher_;
00089   topic_tools::ShapeShifter message_;
00090 
00091   static ros::ServiceClient message_service_;
00092 };
00093 
00094 ros::ServiceClient Publisher::message_service_;
00095 typedef boost::shared_ptr<Publisher> PublisherPtr;
00096 
00097 
00098 class Subscriber {
00099 public:
00100   Subscriber(ros::NodeHandle& nh, rosserial_msgs::TopicInfo& topic_info,
00101       boost::function<void(std::vector<uint8_t> buffer)> write_fn)
00102     : write_fn_(write_fn) {
00103     ros::SubscribeOptions opts;
00104     opts.init<topic_tools::ShapeShifter>(
00105         topic_info.topic_name, 1, boost::bind(&Subscriber::handle, this, _1));
00106     opts.md5sum = topic_info.md5sum;
00107     opts.datatype = topic_info.message_type;
00108     subscriber_ = nh.subscribe(opts);
00109   }
00110 
00111   ROS_DEPRECATED std::string get_topic() {
00112       return get_name();
00113   }
00114   std::string get_name() {
00115     return subscriber_.getTopic();
00116   }
00117 
00118 private:
00119   void handle(const boost::shared_ptr<topic_tools::ShapeShifter const>& msg) {
00120     size_t length = ros::serialization::serializationLength(*msg);
00121     std::vector<uint8_t> buffer(length);
00122 
00123     ros::serialization::OStream ostream(&buffer[0], length);
00124     ros::serialization::Serializer<topic_tools::ShapeShifter>::write(ostream, *msg);
00125 
00126     write_fn_(buffer);
00127   }
00128 
00129   ros::Subscriber subscriber_;
00130   boost::function<void(std::vector<uint8_t> buffer)> write_fn_;
00131 };
00132 
00133 typedef boost::shared_ptr<Subscriber> SubscriberPtr;
00134 
00135 class ServiceClient {
00136 public:
00137   ServiceClient(ros::NodeHandle& nh, rosserial_msgs::TopicInfo& topic_info,
00138       boost::function<void(std::vector<uint8_t> buffer, const uint16_t topic_id)> write_fn)
00139     : write_fn_(write_fn) {
00140     topic_id_ = -1;
00141     if (!service_info_service_.isValid()) {
00142       // lazy-initialize the service caller.
00143       service_info_service_ = nh.serviceClient<rosserial_msgs::RequestServiceInfo>("service_info");
00144       if (!service_info_service_.waitForExistence(ros::Duration(5.0))) {
00145         ROS_WARN("Timed out waiting for service_info service to become available.");
00146       }
00147     }
00148 
00149     rosserial_msgs::RequestServiceInfo info;
00150     info.request.service = topic_info.message_type;
00151     ROS_DEBUG("Calling service_info service for topic name %s",topic_info.topic_name.c_str());
00152     if (service_info_service_.call(info)) {
00153       request_message_md5_ = info.response.request_md5;
00154       response_message_md5_ = info.response.response_md5;
00155     } else {
00156       ROS_WARN("Failed to call service_info service. The service client will be created with blank md5sum.");
00157     }
00158     ros::ServiceClientOptions opts;
00159     opts.service = topic_info.topic_name;
00160     opts.md5sum = service_md5_ = info.response.service_md5;
00161     opts.persistent = false; // always false for now
00162     service_client_ = nh.serviceClient(opts);
00163   }
00164   void setTopicId(uint16_t topic_id) {
00165     topic_id_ = topic_id;
00166   }
00167   std::string getRequestMessageMD5() {
00168     return request_message_md5_;
00169   }
00170   std::string getResponseMessageMD5() {
00171     return response_message_md5_;
00172   }
00173 
00174   void handle(ros::serialization::IStream stream) {
00175     // deserialize request message
00176     ros::serialization::Serializer<topic_tools::ShapeShifter>::read(stream, request_message_);
00177 
00178     // perform service call
00179     // note that at present, at least for rosserial-windows a service call returns nothing,
00180     // so we discard the return value of this call() invocation.
00181     service_client_.call(request_message_, response_message_, service_md5_);
00182 
00183     // write service response over the wire
00184     size_t length = ros::serialization::serializationLength(response_message_);
00185     std::vector<uint8_t> buffer(length);
00186     ros::serialization::OStream ostream(&buffer[0], length);
00187     ros::serialization::Serializer<topic_tools::ShapeShifter>::write(ostream, response_message_);
00188     write_fn_(buffer,topic_id_);
00189   }
00190 
00191   std::string get_name() {
00192     return service_client_.getService();
00193   }
00194 
00195 private:
00196   topic_tools::ShapeShifter request_message_;
00197   topic_tools::ShapeShifter response_message_;
00198   ros::ServiceClient service_client_;
00199   static ros::ServiceClient service_info_service_;
00200   boost::function<void(std::vector<uint8_t> buffer, const uint16_t topic_id)> write_fn_;
00201   std::string service_md5_;
00202   std::string request_message_md5_;
00203   std::string response_message_md5_;
00204   uint16_t topic_id_;
00205 };
00206 
00207 ros::ServiceClient ServiceClient::service_info_service_;
00208 typedef boost::shared_ptr<ServiceClient> ServiceClientPtr;
00209 
00210 class ServiceServer {
00211   class ServiceCallbackHelper : public ros::ServiceCallbackHelper {
00212     boost::function<void(std::vector<uint8_t> buffer, const uint16_t topic_id)> write_fn_;
00213     uint16_t topic_id_;
00214     int got_response_;
00215     boost::mutex mutex_;
00216     boost::condition_variable cond_;
00217     topic_tools::ShapeShifter response_message_;
00218  public:
00219     ServiceCallbackHelper(boost::function<void(std::vector<uint8_t> buffer, const uint16_t topic_id)> write_fn):
00220     write_fn_(write_fn), topic_id_(-1) {}
00221     virtual bool call(ros::ServiceCallbackHelperCallParams& params){
00222       std::vector<uint8_t> buffer(params.request.message_start, params.request.message_start + params.request.num_bytes);
00223 
00224       boost::mutex::scoped_lock lock(mutex_);
00225       got_response_ = 0;
00226       write_fn_(buffer,topic_id_);
00227 
00228       while (!got_response_){
00229           cond_.wait(lock);
00230       }
00231 
00232       if(got_response_ == 1) {
00233         params.response = ros::serialization::serializeServiceResponse(true, response_message_);
00234         return true;
00235       }
00236       return false;
00237     }
00238     void handle(ros::serialization::IStream stream) {
00239       ros::serialization::Serializer<topic_tools::ShapeShifter>::read(stream, response_message_);
00240       boost::mutex::scoped_lock lock(mutex_);
00241       got_response_ = 1;
00242       cond_.notify_all();
00243     }
00244 
00245     void abort() {
00246       boost::mutex::scoped_lock lock(mutex_);
00247       got_response_ = -1;
00248       cond_.notify_all();
00249     }
00250 
00251     void setTopicId(uint16_t topic_id) {
00252         topic_id_ = topic_id;
00253     }
00254   };
00255 public:
00256   ServiceServer(ros::NodeHandle& nh, rosserial_msgs::TopicInfo& topic_info,
00257       boost::function<void(std::vector<uint8_t> buffer, const uint16_t topic_id)> write_fn)
00258   {
00259     if (!service_info_service_.isValid()) {
00260       // lazy-initialize the service caller.
00261       service_info_service_ = nh.serviceClient<rosserial_msgs::RequestServiceInfo>("service_info");
00262       if (!service_info_service_.waitForExistence(ros::Duration(5.0))) {
00263         ROS_WARN("Timed out waiting for service_info service to become available.");
00264       }
00265     }
00266 
00267     rosserial_msgs::RequestServiceInfo info;
00268     info.request.service = topic_info.message_type;
00269     ROS_DEBUG("Calling service_info service for topic name %s",topic_info.topic_name.c_str());
00270     if (service_info_service_.call(info)) {
00271       request_message_md5_ = info.response.request_md5;
00272       response_message_md5_ = info.response.response_md5;
00273     } else {
00274       ROS_WARN("Failed to call service_info service. The service client will be created with blank md5sum.");
00275     }
00276 
00277     ros::AdvertiseServiceOptions opts;
00278     opts.service = topic_info.topic_name;
00279     opts.md5sum = info.response.service_md5;
00280     opts.datatype = topic_info.message_type;
00281     opts.req_datatype = request_message_md5_;
00282     opts.res_datatype = response_message_md5_;
00283 
00284     service_call_helper_.reset(new ServiceCallbackHelper(write_fn));
00285     opts.helper = service_call_helper_;
00286 
00287     service_server_ = nh.advertiseService(opts);
00288   }
00289   void setTopicId(uint16_t topic_id) {
00290     service_call_helper_->setTopicId(topic_id);
00291   }
00292   std::string getRequestMessageMD5() {
00293     return request_message_md5_;
00294   }
00295   std::string getResponseMessageMD5() {
00296     return response_message_md5_;
00297   }
00298   void handle(ros::serialization::IStream stream) {
00299     service_call_helper_->handle(stream);
00300   }
00301   void abort() {
00302     service_call_helper_->abort();
00303   }
00304   std::string get_name() {
00305     return service_server_.getService();
00306   }
00307 
00308 private:
00309   ros::ServiceServer service_server_;
00310   static ros::ServiceClient service_info_service_;
00311   boost::shared_ptr<ServiceCallbackHelper> service_call_helper_;
00312   std::string request_message_md5_;
00313   std::string response_message_md5_;
00314 };
00315 
00316 ros::ServiceClient ServiceServer::service_info_service_;
00317 typedef boost::shared_ptr<ServiceServer> ServiceServerPtr;
00318 
00319 
00320 }  // namespace
00321 
00322 #endif  // ROSSERIAL_SERVER_TOPIC_HANDLERS_H


rosserial_server
Author(s): Mike Purvis
autogenerated on Thu Jun 6 2019 19:56:34