udpmulti_subscriber.h
Go to the documentation of this file.
00001 #ifndef UDPMULTI_MESSAGE_TRANSPORT_SUBSCRIBER_H
00002 #define UDPMULTI_MESSAGE_TRANSPORT_SUBSCRIBER_H
00003 
00004 #include <boost/asio.hpp>
00005 #include <boost/bind.hpp>
00006 #include <boost/thread.hpp>
00007 
00008 #include <string>
00009 
00010 #include <message_transport/simple_subscriber_plugin.h>
00011 #include <udpmulti_transport/UDPMultHeader.h>
00012 #include <udpmulti_transport/udpmulti_publisher.h>
00013 
00014 namespace udpmulti_transport {
00015 
00016         template <class Base>
00017         class UDPMultiSubscriber : public message_transport::SimpleSubscriberPlugin<Base,udpmulti_transport::UDPMultHeader>
00018         {
00019                 public:
00020                         UDPMultiSubscriber() : io_service_(), socket_(io_service_), user_cb_(NULL), rec_thread_(NULL) {
00021                         }
00022 
00023                         virtual ~UDPMultiSubscriber() {
00024                                 // close listener
00025                 ROS_INFO("Shutting down UDPMultiSubscriber");
00026                 io_service_.stop();
00027                 if (rec_thread_) {
00028                     try {
00029                         socket_.shutdown(boost::asio::ip::tcp::socket::shutdown_receive);
00030                     } catch (boost::exception_detail::clone_impl<boost::exception_detail::error_info_injector<boost::system::system_error> > e) {
00031                         // ignore
00032                     }
00033                     rec_thread_->interrupt();
00034                     rec_thread_->join();
00035                     delete rec_thread_;
00036                 }
00037                 rec_thread_ = NULL;
00038                         }
00039 
00040                         virtual std::string getTransportName() const
00041                         {
00042                                 return "udpmulti";
00043                         }
00044 
00045                 protected:
00046             void receiveThread() {
00047                 while (ros::ok()) {
00048                     std::size_t rec;
00049                     // ROS_INFO("Waiting for datagram");
00050                     rec = socket_.receive_from(
00051                             boost::asio::buffer(data_, MAX_UDP_PACKET_SIZE), endpoint_);
00052                     // ROS_INFO("Received datagram: %d bytes",rec);
00053                     if (!rec) continue;
00054 
00055                     boost::shared_ptr<Base> message_ptr(new Base);
00056                     ros::serialization::IStream in(data_,rec);
00057                     ros::serialization::deserialize(in, *message_ptr);
00058 
00059                     if (user_cb_ && ros::ok()) {
00060                         (*user_cb_)(message_ptr);
00061                     }
00062                 }
00063             }
00064                         virtual void internalCallback(const udpmulti_transport::UDPMultHeaderConstPtr& message,
00065                                         const typename message_transport::SimpleSubscriberPlugin<Base,udpmulti_transport::UDPMultHeader>::Callback& user_cb)
00066                         {
00067                                 user_cb_ = &user_cb;
00068                                 // Process the header, and get the listener started.
00069                 if (listening_interface_.empty()) {
00070                     // First message, launch the initialisation
00071                     ros::NodeHandle & nh = this->nh();
00072                     nh.param<std::string>("listening_interface",listening_interface_,"0.0.0.0");
00073                     multicast_address_ = message->multicast_address;
00074                     port_ = message->port;
00075                     ROS_INFO("Listening on %s, address '%s:%d'",listening_interface_.c_str(),multicast_address_.c_str(),port_);
00076 
00077                     // Create the socket so that multiple may be bound to the same address.
00078                     boost::asio::ip::udp::endpoint listen_endpoint(
00079                             boost::asio::ip::address::from_string(listening_interface_), port_);
00080                     socket_.open(listen_endpoint.protocol());
00081                     socket_.set_option(boost::asio::ip::udp::socket::reuse_address(true));
00082                     socket_.bind(listen_endpoint);
00083 
00084                     // Join the multicast group.
00085                     socket_.set_option(boost::asio::ip::multicast::join_group(
00086                                 boost::asio::ip::address::from_string(multicast_address_)));
00087                     // Allow loopback
00088                     socket_.set_option(boost::asio::ip::multicast::enable_loopback(true));
00089 
00090 #if 0
00091                     socket_.async_receive_from(
00092                             boost::asio::buffer(data_, MAX_UDP_PACKET_SIZE), endpoint_,
00093                             boost::bind(&UDPMultiSubscriber::handle_receive_from, this,
00094                                 boost::asio::placeholders::error,
00095                                 boost::asio::placeholders::bytes_transferred));
00096 #else
00097                     rec_thread_ = new  boost::thread(&UDPMultiSubscriber::receiveThread,this);
00098 #endif
00099                 }
00100                         }
00101 
00102 #if 0
00103             void handle_receive_from(const boost::system::error_code& error,
00104                     size_t bytes_recvd)
00105             {
00106                 ROS_INFO("Received datagram");
00107                 if (!error)
00108                 {
00109                     ROS_INFO("Accepted datagram");
00110                     boost::shared_ptr<Base> message_ptr(new Base);
00111                     ros::serialization::IStream in(data_,MAX_UDP_PACKET_SIZE);
00112                     ros::serialization::deserialize(in, *message_ptr);
00113 
00114                     (*user_cb_)(message_ptr);
00115 
00116                     socket_.async_receive_from(
00117                             boost::asio::buffer(data_, MAX_UDP_PACKET_SIZE), endpoint_,
00118                             boost::bind(&UDPMultiSubscriber::handle_receive_from, this,
00119                                 boost::asio::placeholders::error,
00120                                 boost::asio::placeholders::bytes_transferred));
00121                 }
00122             }
00123 #endif
00124 
00125                         uint32_t port_;
00126                         std::string listening_interface_;
00127             boost::asio::io_service io_service_;
00128             boost::asio::ip::udp::endpoint endpoint_;
00129             boost::asio::ip::udp::socket socket_;
00130                         const typename message_transport::SimpleSubscriberPlugin<Base,udpmulti_transport::UDPMultHeader>::Callback* user_cb_;
00131             boost::thread *rec_thread_;
00132                         std::string multicast_address_;
00133             uint8_t data_[MAX_UDP_PACKET_SIZE];
00134         };
00135 
00136 } //namespace transport
00137 
00138 #endif // UDPMULTI_MESSAGE_TRANSPORT_SUBSCRIBER_H


udpmulti_transport
Author(s): Cedric Pradalier
autogenerated on Sat Dec 28 2013 16:57:21