$search
00001 #ifndef UDPMULTI_MESSAGE_TRANSPORT_SUBSCRIBER_H 00002 #define UDPMULTI_MESSAGE_TRANSPORT_SUBSCRIBER_H 00003 00004 #include <boost/asio.hpp> 00005 #include <boost/bind.hpp> 00006 #include <boost/thread.hpp> 00007 00008 #include <string> 00009 00010 #include <message_transport/simple_subscriber_plugin.h> 00011 #include <udpmulti_transport/UDPMultHeader.h> 00012 #include <udpmulti_transport/udpmulti_publisher.h> 00013 00014 namespace udpmulti_transport { 00015 00016 template <class Base> 00017 class UDPMultiSubscriber : public message_transport::SimpleSubscriberPlugin<Base,udpmulti_transport::UDPMultHeader> 00018 { 00019 public: 00020 UDPMultiSubscriber() : io_service_(), socket_(io_service_), user_cb_(NULL), rec_thread_(NULL) { 00021 } 00022 00023 virtual ~UDPMultiSubscriber() { 00024 // close listener 00025 ROS_INFO("Shutting down UDPMultiSubscriber"); 00026 io_service_.stop(); 00027 if (rec_thread_) { 00028 try { 00029 socket_.shutdown(boost::asio::ip::tcp::socket::shutdown_receive); 00030 } catch (boost::exception_detail::clone_impl<boost::exception_detail::error_info_injector<boost::system::system_error> > e) { 00031 // ignore 00032 } 00033 rec_thread_->interrupt(); 00034 rec_thread_->join(); 00035 delete rec_thread_; 00036 } 00037 rec_thread_ = NULL; 00038 } 00039 00040 virtual std::string getTransportName() const 00041 { 00042 return "udpmulti"; 00043 } 00044 00045 protected: 00046 void receiveThread() { 00047 while (ros::ok()) { 00048 std::size_t rec; 00049 // ROS_INFO("Waiting for datagram"); 00050 rec = socket_.receive_from( 00051 boost::asio::buffer(data_, MAX_UDP_PACKET_SIZE), endpoint_); 00052 // ROS_INFO("Received datagram: %d bytes",rec); 00053 if (!rec) continue; 00054 00055 boost::shared_ptr<Base> message_ptr(new Base); 00056 ros::serialization::IStream in(data_,rec); 00057 ros::serialization::deserialize(in, *message_ptr); 00058 00059 if (user_cb_ && ros::ok()) { 00060 (*user_cb_)(message_ptr); 00061 } 00062 } 00063 } 00064 virtual void internalCallback(const udpmulti_transport::UDPMultHeaderConstPtr& message, 00065 const typename message_transport::SimpleSubscriberPlugin<Base,udpmulti_transport::UDPMultHeader>::Callback& user_cb) 00066 { 00067 user_cb_ = &user_cb; 00068 // Process the header, and get the listener started. 00069 if (listening_interface_.empty()) { 00070 // First message, launch the initialisation 00071 ros::NodeHandle & nh = this->nh(); 00072 nh.param<std::string>("listening_interface",listening_interface_,"0.0.0.0"); 00073 multicast_address_ = message->multicast_address; 00074 port_ = message->port; 00075 ROS_INFO("Listening on %s, address '%s:%d'",listening_interface_.c_str(),multicast_address_.c_str(),port_); 00076 00077 // Create the socket so that multiple may be bound to the same address. 00078 boost::asio::ip::udp::endpoint listen_endpoint( 00079 boost::asio::ip::address::from_string(listening_interface_), port_); 00080 socket_.open(listen_endpoint.protocol()); 00081 socket_.set_option(boost::asio::ip::udp::socket::reuse_address(true)); 00082 socket_.bind(listen_endpoint); 00083 00084 // Join the multicast group. 00085 socket_.set_option(boost::asio::ip::multicast::join_group( 00086 boost::asio::ip::address::from_string(multicast_address_))); 00087 // Allow loopback 00088 socket_.set_option(boost::asio::ip::multicast::enable_loopback(true)); 00089 00090 #if 0 00091 socket_.async_receive_from( 00092 boost::asio::buffer(data_, MAX_UDP_PACKET_SIZE), endpoint_, 00093 boost::bind(&UDPMultiSubscriber::handle_receive_from, this, 00094 boost::asio::placeholders::error, 00095 boost::asio::placeholders::bytes_transferred)); 00096 #else 00097 rec_thread_ = new boost::thread(&UDPMultiSubscriber::receiveThread,this); 00098 #endif 00099 } 00100 } 00101 00102 #if 0 00103 void handle_receive_from(const boost::system::error_code& error, 00104 size_t bytes_recvd) 00105 { 00106 ROS_INFO("Received datagram"); 00107 if (!error) 00108 { 00109 ROS_INFO("Accepted datagram"); 00110 boost::shared_ptr<Base> message_ptr(new Base); 00111 ros::serialization::IStream in(data_,MAX_UDP_PACKET_SIZE); 00112 ros::serialization::deserialize(in, *message_ptr); 00113 00114 (*user_cb_)(message_ptr); 00115 00116 socket_.async_receive_from( 00117 boost::asio::buffer(data_, MAX_UDP_PACKET_SIZE), endpoint_, 00118 boost::bind(&UDPMultiSubscriber::handle_receive_from, this, 00119 boost::asio::placeholders::error, 00120 boost::asio::placeholders::bytes_transferred)); 00121 } 00122 } 00123 #endif 00124 00125 uint32_t port_; 00126 std::string listening_interface_; 00127 boost::asio::io_service io_service_; 00128 boost::asio::ip::udp::endpoint endpoint_; 00129 boost::asio::ip::udp::socket socket_; 00130 const typename message_transport::SimpleSubscriberPlugin<Base,udpmulti_transport::UDPMultHeader>::Callback* user_cb_; 00131 boost::thread *rec_thread_; 00132 std::string multicast_address_; 00133 uint8_t data_[MAX_UDP_PACKET_SIZE]; 00134 }; 00135 00136 } //namespace transport 00137 00138 #endif // UDPMULTI_MESSAGE_TRANSPORT_SUBSCRIBER_H