Go to the documentation of this file.00001 #ifndef UDPMULTI_MESSAGE_TRANSPORT_SUBSCRIBER_H
00002 #define UDPMULTI_MESSAGE_TRANSPORT_SUBSCRIBER_H
00003
00004 #include <boost/asio.hpp>
00005 #include <boost/bind.hpp>
00006 #include <boost/thread.hpp>
00007
00008 #include <string>
00009
00010 #include <message_transport/simple_subscriber_plugin.h>
00011 #include <udpmulti_transport/UDPMultHeader.h>
00012 #include <udpmulti_transport/udpmulti_publisher.h>
00013
00014 namespace udpmulti_transport {
00015
00016 template <class Base>
00017 class UDPMultiSubscriber : public message_transport::SimpleSubscriberPlugin<Base,udpmulti_transport::UDPMultHeader>
00018 {
00019 public:
00020 UDPMultiSubscriber() : io_service_(), socket_(io_service_), user_cb_(NULL), rec_thread_(NULL) {
00021 }
00022
00023 virtual ~UDPMultiSubscriber() {
00024
00025 ROS_INFO("Shutting down UDPMultiSubscriber");
00026 io_service_.stop();
00027 if (rec_thread_) {
00028 try {
00029 socket_.shutdown(boost::asio::ip::tcp::socket::shutdown_receive);
00030 } catch (boost::exception_detail::clone_impl<boost::exception_detail::error_info_injector<boost::system::system_error> > e) {
00031
00032 }
00033 rec_thread_->interrupt();
00034 rec_thread_->join();
00035 delete rec_thread_;
00036 }
00037 rec_thread_ = NULL;
00038 }
00039
00040 virtual std::string getTransportName() const
00041 {
00042 return "udpmulti";
00043 }
00044
00045 protected:
00046 void receiveThread() {
00047 while (ros::ok()) {
00048 std::size_t rec;
00049
00050 rec = socket_.receive_from(
00051 boost::asio::buffer(data_, MAX_UDP_PACKET_SIZE), endpoint_);
00052
00053 if (!rec) continue;
00054
00055 boost::shared_ptr<Base> message_ptr(new Base);
00056 ros::serialization::IStream in(data_,rec);
00057 ros::serialization::deserialize(in, *message_ptr);
00058
00059 if (user_cb_ && ros::ok()) {
00060 (*user_cb_)(message_ptr);
00061 }
00062 }
00063 }
00064 virtual void internalCallback(const udpmulti_transport::UDPMultHeaderConstPtr& message,
00065 const typename message_transport::SimpleSubscriberPlugin<Base,udpmulti_transport::UDPMultHeader>::Callback& user_cb)
00066 {
00067 user_cb_ = &user_cb;
00068
00069 if (listening_interface_.empty()) {
00070
00071 ros::NodeHandle & nh = this->nh();
00072 nh.param<std::string>("listening_interface",listening_interface_,"0.0.0.0");
00073 multicast_address_ = message->multicast_address;
00074 port_ = message->port;
00075 ROS_INFO("Listening on %s, address '%s:%d'",listening_interface_.c_str(),multicast_address_.c_str(),port_);
00076
00077
00078 boost::asio::ip::udp::endpoint listen_endpoint(
00079 boost::asio::ip::address::from_string(listening_interface_), port_);
00080 socket_.open(listen_endpoint.protocol());
00081 socket_.set_option(boost::asio::ip::udp::socket::reuse_address(true));
00082 socket_.bind(listen_endpoint);
00083
00084
00085 socket_.set_option(boost::asio::ip::multicast::join_group(
00086 boost::asio::ip::address::from_string(multicast_address_)));
00087
00088 socket_.set_option(boost::asio::ip::multicast::enable_loopback(true));
00089
00090 #if 0
00091 socket_.async_receive_from(
00092 boost::asio::buffer(data_, MAX_UDP_PACKET_SIZE), endpoint_,
00093 boost::bind(&UDPMultiSubscriber::handle_receive_from, this,
00094 boost::asio::placeholders::error,
00095 boost::asio::placeholders::bytes_transferred));
00096 #else
00097 rec_thread_ = new boost::thread(&UDPMultiSubscriber::receiveThread,this);
00098 #endif
00099 }
00100 }
00101
00102 #if 0
00103 void handle_receive_from(const boost::system::error_code& error,
00104 size_t bytes_recvd)
00105 {
00106 ROS_INFO("Received datagram");
00107 if (!error)
00108 {
00109 ROS_INFO("Accepted datagram");
00110 boost::shared_ptr<Base> message_ptr(new Base);
00111 ros::serialization::IStream in(data_,MAX_UDP_PACKET_SIZE);
00112 ros::serialization::deserialize(in, *message_ptr);
00113
00114 (*user_cb_)(message_ptr);
00115
00116 socket_.async_receive_from(
00117 boost::asio::buffer(data_, MAX_UDP_PACKET_SIZE), endpoint_,
00118 boost::bind(&UDPMultiSubscriber::handle_receive_from, this,
00119 boost::asio::placeholders::error,
00120 boost::asio::placeholders::bytes_transferred));
00121 }
00122 }
00123 #endif
00124
00125 uint32_t port_;
00126 std::string listening_interface_;
00127 boost::asio::io_service io_service_;
00128 boost::asio::ip::udp::endpoint endpoint_;
00129 boost::asio::ip::udp::socket socket_;
00130 const typename message_transport::SimpleSubscriberPlugin<Base,udpmulti_transport::UDPMultHeader>::Callback* user_cb_;
00131 boost::thread *rec_thread_;
00132 std::string multicast_address_;
00133 uint8_t data_[MAX_UDP_PACKET_SIZE];
00134 };
00135
00136 }
00137
00138 #endif // UDPMULTI_MESSAGE_TRANSPORT_SUBSCRIBER_H