silverhammer_highspeed_internal_receiver.cpp
Go to the documentation of this file.
00001 /*********************************************************************
00002  * Software License Agreement (BSD License)
00003  *
00004  *  Copyright (c) 2014, JSK Lab
00005  *  All rights reserved.
00006  *
00007  *  Redistribution and use in source and binary forms, with or without
00008  *  modification, are permitted provided that the following conditions
00009  *  are met:
00010  *
00011  *   * Redistributions of source code must retain the above copyright
00012  *     notice, this list of conditions and the following disclaimer.
00013  *   * Redistributions in binary form must reproduce the above
00014  *     copyright notice, this list of conditions and the following
00015  *     disclaimer in the documentation and/o2r other materials provided
00016  *     with the distribution.
00017  *   * Neither the name of the JSK Lab nor the names of its
00018  *     contributors may be used to endorse or promote products derived
00019  *     from this software without specific prior written permission.
00020  *
00021  *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022  *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023  *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024  *  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025  *  COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026  *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027  *  BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028  *  LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029  *  CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030  *  LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031  *  ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032  *  POSSIBILITY OF SUCH DAMAGE.
00033  *********************************************************************/
00034 
00035 #include <iostream>
00036 #include <boost/array.hpp>
00037 #include <boost/asio.hpp>
00038 #include <boost/thread.hpp>
00039 #include <jsk_network_tools/SilverhammerInternalBuffer.h>
00040 #include <ros/ros.h>
00041 #include <boost/shared_ptr.hpp>
00042 #include <algorithm>
00043 
00044 namespace jsk_network_tools
00045 {
00046   typedef unsigned char Base;
00047   //typedef char Base;
00048   class Packet
00049   {
00050   public:
00051     typedef boost::shared_ptr<Packet> Ptr;
00052     size_t seq_id;
00053     size_t packet_id;
00054     size_t packet_num;
00055     std::vector<Base> data;
00056     Packet(size_t aseq_id, size_t apacket_id, size_t apacket_num):
00057       seq_id(aseq_id), packet_id(apacket_id), packet_num(apacket_num)
00058     {
00059     }
00060 
00061     void fillData(size_t size, Base* buf)
00062     {
00063       data.resize(size);
00064       memcpy(&data[0], &buf[12], size);
00065     }
00066 
00067     size_t fill(std::vector<Base>& target, size_t start, size_t end)
00068     {
00069       memcpy(&target[start], &data[0], std::min(end - start, data.size()));
00070       return std::min(end - start, data.size());
00071     }
00072 
00073     bool operator<(const Packet& other) const
00074     {
00075       return packet_id < other.packet_id;
00076     }
00077   };
00078 
00079   class SilverhammerHighspeedInternalReceiver
00080   {
00081   public:
00082     typedef std::vector<Packet> PacketArray;
00083     typedef std::map<size_t, PacketArray> PacketTable;
00084     SilverhammerHighspeedInternalReceiver(): initialized_(false)
00085     {
00086       ros::NodeHandle nh, pnh("~");
00087       nh.param("receive_port", receive_port_, 16484);
00088       nh.param("pesimistic", pesimistic_, true);
00089       nh.param("fragment_packets_tolerance", fragment_packets_tolerance_, 0);
00090       nh.param("expected_rate", expected_rate_, 2.0);
00091       pub_ = nh.advertise<SilverhammerInternalBuffer>("input", 1);
00092       // nh.param("packet_size", packet_size_, 1400);
00093       boost::asio::io_service io_service;
00094       socket_.reset(new boost::asio::ip::udp::socket(io_service, boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), receive_port_)));
00095       thread_ = boost::thread(boost::bind(&SilverhammerHighspeedInternalReceiver::threadFunc, this));
00096       pub_thread_ = boost::thread(boost::bind(&SilverhammerHighspeedInternalReceiver::publishPacketsFunc, this));
00097       
00098     }
00099 
00100   protected:
00101 
00102     void threadFunc()
00103     {
00104       PacketArray packet_array;
00105       // Is it fast enough?
00106       int last_received_seq_id = -1;
00107       while (ros::ok()) {
00108         boost::array<Base, 1500> recv_buf; // only support 1500 MTU (without header)
00109         boost::asio::ip::udp::endpoint remote_endpoint;
00110         boost::system::error_code error;
00111         size_t len = socket_->receive_from(boost::asio::buffer(recv_buf), remote_endpoint, 0, error);
00112         //ROS_INFO("received packet (%lu bytes = %lu bits)", len, len * 8);
00113         if (len < 4 + 4 + 4) {
00114           ROS_WARN("too short packet");
00115           continue;
00116         }
00117         size_t seq_id = recv_buf[0] * (1 << 24) + recv_buf[1] * (1 << 16) + recv_buf[2] * (1 << 8) + recv_buf[3] * (1 << 0);
00118         size_t packet_id = recv_buf[0 + 4] * (1 << 24) + recv_buf[1 + 4] * (1 << 16) + recv_buf[2 + 4] * (1 << 8) + recv_buf[3 + 4] * (1 << 0);
00119         size_t packet_num = recv_buf[0 + 8] * (1 << 24) + recv_buf[1 + 8] * (1 << 16) + recv_buf[2 + 8] * (1 << 8) + recv_buf[3 + 8] * (1 << 0);
00120         
00121         Packet packet(seq_id, packet_id, packet_num);
00122         packet.fillData(len - 4 * 3, recv_buf.data());
00123         // ROS_INFO("seq_id := %lu", seq_id);
00124         //ROS_INFO("packet_id := %lu", packet_id);
00125         // ROS_INFO("packet_num := %lu", packet_num);
00126 
00127         if (last_received_seq_id == -1) {
00128           packet_array.push_back(packet);
00129           last_received_seq_id = seq_id;
00130         }
00131         else if (last_received_seq_id != seq_id) {
00132           ROS_INFO("seq_id := %d, packet_array.size() := %lu, packet_num := %lu",
00133                    last_received_seq_id, packet_array.size(), packet_array[0].packet_num);
00134           size_t before_size = packet_array.size();
00135           if (packet_array.size() == packet_array[0].packet_num || !pesimistic_) {
00136             boost::mutex::scoped_lock lock(shared_packet_mutex_);
00137             shared_packet_array_ = packet_array;
00138             // Exit from special section as soon as possible
00139           }
00140           packet_array = PacketArray();
00141           packet_array.reserve(before_size * 2);
00142           packet_array.push_back(packet);
00143         }
00144         else {
00145           packet_array.push_back(packet);
00146         }
00147         last_received_seq_id = seq_id;
00148       }
00149     }
00150 
00151     void publishPacketsFunc()
00152     {
00153       ros::Rate r(expected_rate_);
00154       int published_seq_id = -1;
00155       while (ros::ok()) {
00156         {
00157           PacketArray publish_thread_packet_array;
00158           {
00159             boost::mutex::scoped_lock lock(shared_packet_mutex_);
00160             if (shared_packet_array_.size() != 0) {
00161               publish_thread_packet_array = shared_packet_array_;
00162             }
00163             // Exit from special section as soon as possible
00164           }
00165           if (publish_thread_packet_array.size() != 0) {
00166             if (publish_thread_packet_array[0].seq_id != published_seq_id) {
00167               publishPackets(publish_thread_packet_array);
00168               published_seq_id = publish_thread_packet_array[0].seq_id;
00169             }
00170           }
00171           r.sleep();
00172         }
00173       }
00174     }
00175 
00176     void publishPackets(PacketArray& packet_array)
00177     {
00178       msg_.header.stamp = ros::Time::now();
00179       msg_.seq_id = packet_array[0].seq_id;
00180       // Sort packet_array according to packet_id
00181       std::sort(packet_array.begin(), packet_array.end());
00182       // We expect all the packet size are same
00183       size_t packet_size = packet_array[0].data.size();
00184       size_t num = packet_array[0].packet_num;
00185       if (msg_.data.size() != num * packet_size) {
00186         msg_.data.resize(num * packet_size);
00187       }
00188       std::fill(msg_.data.begin(), msg_.data.end(), 0);
00189       PacketArray::iterator it = packet_array.begin();
00190       size_t target_packet_id = 0;
00191       
00192       ROS_INFO("expected data size: %lu", num * packet_size);
00193       size_t len = 0;
00194       while (target_packet_id < num && it != packet_array.end()) {
00195         if (it->packet_id == target_packet_id) {
00196           len += it->fill(msg_.data, target_packet_id * packet_size, (target_packet_id + 1) * packet_size);
00197           ++it;
00198         }
00199         else {                  // fill by dummy 0 data == just skip
00200           // pass
00201           len += packet_size;
00202         }
00203         ++target_packet_id;
00204       }
00205       ROS_INFO("len: %lu", len);
00206       msg_.data.resize(len);
00207       pub_.publish(msg_);
00208     }
00209 
00210     PacketArray shared_packet_array_;
00211     SilverhammerInternalBuffer msg_;
00212     bool initialized_;
00213     boost::mutex mutex_;
00214     boost::mutex shared_packet_mutex_;
00215     boost::condition_variable thread_state_;
00216     boost::thread thread_;
00217     boost::thread pub_thread_;
00218     boost::shared_ptr<boost::asio::ip::udp::socket> socket_;
00219     int receive_port_;
00220     int fragment_packets_tolerance_;
00221     std::string receive_ip_;
00222     double expected_rate_;
00223     //int packet_size_;
00224     ros::Publisher pub_;
00225     bool pesimistic_;
00226   private:
00227   };
00228 }
00229 
00230 int main(int argc, char** argv)
00231 {
00232   ros::init(argc, argv, "silverhammer_highspeed_internal_receiver");
00233   jsk_network_tools::SilverhammerHighspeedInternalReceiver receiver;
00234   ros::spin();
00235   return 0;
00236 }


jsk_network_tools
Author(s): Yusuke Furuta
autogenerated on Fri Sep 8 2017 03:38:47