silverhammer_highspeed_internal_receiver.cpp
Go to the documentation of this file.
1 /*********************************************************************
2  * Software License Agreement (BSD License)
3  *
4  * Copyright (c) 2014, JSK Lab
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * * Redistributions of source code must retain the above copyright
12  * notice, this list of conditions and the following disclaimer.
13  * * Redistributions in binary form must reproduce the above
14  * copyright notice, this list of conditions and the following
15  * disclaimer in the documentation and/o2r other materials provided
16  * with the distribution.
17  * * Neither the name of the JSK Lab nor the names of its
18  * contributors may be used to endorse or promote products derived
19  * from this software without specific prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25  * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29  * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32  * POSSIBILITY OF SUCH DAMAGE.
33  *********************************************************************/
34 
35 #include <iostream>
36 #include <boost/array.hpp>
37 #include <boost/asio.hpp>
38 #include <boost/thread.hpp>
39 #include <jsk_network_tools/SilverhammerInternalBuffer.h>
40 #include <ros/ros.h>
41 #include <boost/shared_ptr.hpp>
42 #include <algorithm>
43 
44 namespace jsk_network_tools
45 {
46  typedef unsigned char Base;
47  //typedef char Base;
48  class Packet
49  {
50  public:
52  size_t seq_id;
53  size_t packet_id;
54  size_t packet_num;
55  std::vector<Base> data;
56  Packet(size_t aseq_id, size_t apacket_id, size_t apacket_num):
57  seq_id(aseq_id), packet_id(apacket_id), packet_num(apacket_num)
58  {
59  }
60 
61  void fillData(size_t size, Base* buf)
62  {
63  data.resize(size);
64  memcpy(&data[0], &buf[12], size);
65  }
66 
67  size_t fill(std::vector<Base>& target, size_t start, size_t end)
68  {
69  memcpy(&target[start], &data[0], std::min(end - start, data.size()));
70  return std::min(end - start, data.size());
71  }
72 
73  bool operator<(const Packet& other) const
74  {
75  return packet_id < other.packet_id;
76  }
77  };
78 
80  {
81  public:
82  typedef std::vector<Packet> PacketArray;
83  typedef std::map<size_t, PacketArray> PacketTable;
85  {
86  ros::NodeHandle nh, pnh("~");
87  nh.param("receive_port", receive_port_, 16484);
88  nh.param("pesimistic", pesimistic_, true);
89  nh.param("fragment_packets_tolerance", fragment_packets_tolerance_, 0);
90  nh.param("expected_rate", expected_rate_, 2.0);
91  pub_ = nh.advertise<SilverhammerInternalBuffer>("input", 1);
92  // nh.param("packet_size", packet_size_, 1400);
93  boost::asio::io_service io_service;
94  socket_.reset(new boost::asio::ip::udp::socket(io_service, boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), receive_port_)));
95  thread_ = boost::thread(boost::bind(&SilverhammerHighspeedInternalReceiver::threadFunc, this));
96  pub_thread_ = boost::thread(boost::bind(&SilverhammerHighspeedInternalReceiver::publishPacketsFunc, this));
97 
98  }
99 
100  protected:
101 
102  void threadFunc()
103  {
104  PacketArray packet_array;
105  // Is it fast enough?
106  int last_received_seq_id = -1;
107  while (ros::ok()) {
108  boost::array<Base, 1500> recv_buf; // only support 1500 MTU (without header)
109  boost::asio::ip::udp::endpoint remote_endpoint;
110  boost::system::error_code error;
111  size_t len = socket_->receive_from(boost::asio::buffer(recv_buf), remote_endpoint, 0, error);
112  //ROS_INFO("received packet (%lu bytes = %lu bits)", len, len * 8);
113  if (len < 4 + 4 + 4) {
114  ROS_WARN("too short packet");
115  continue;
116  }
117  size_t seq_id = recv_buf[0] * (1 << 24) + recv_buf[1] * (1 << 16) + recv_buf[2] * (1 << 8) + recv_buf[3] * (1 << 0);
118  size_t packet_id = recv_buf[0 + 4] * (1 << 24) + recv_buf[1 + 4] * (1 << 16) + recv_buf[2 + 4] * (1 << 8) + recv_buf[3 + 4] * (1 << 0);
119  size_t packet_num = recv_buf[0 + 8] * (1 << 24) + recv_buf[1 + 8] * (1 << 16) + recv_buf[2 + 8] * (1 << 8) + recv_buf[3 + 8] * (1 << 0);
120 
121  Packet packet(seq_id, packet_id, packet_num);
122  packet.fillData(len - 4 * 3, recv_buf.data());
123  // ROS_INFO("seq_id := %lu", seq_id);
124  //ROS_INFO("packet_id := %lu", packet_id);
125  // ROS_INFO("packet_num := %lu", packet_num);
126 
127  if (last_received_seq_id == -1) {
128  packet_array.push_back(packet);
129  last_received_seq_id = seq_id;
130  }
131  else if (last_received_seq_id != seq_id) {
132  ROS_INFO("seq_id := %d, packet_array.size() := %lu, packet_num := %lu",
133  last_received_seq_id, packet_array.size(), packet_array[0].packet_num);
134  size_t before_size = packet_array.size();
135  if (packet_array.size() == packet_array[0].packet_num || !pesimistic_) {
136  boost::mutex::scoped_lock lock(shared_packet_mutex_);
137  shared_packet_array_ = packet_array;
138  // Exit from special section as soon as possible
139  }
140  packet_array = PacketArray();
141  packet_array.reserve(before_size * 2);
142  packet_array.push_back(packet);
143  }
144  else {
145  packet_array.push_back(packet);
146  }
147  last_received_seq_id = seq_id;
148  }
149  }
150 
152  {
153  ros::Rate r(expected_rate_);
154  int published_seq_id = -1;
155  while (ros::ok()) {
156  {
157  PacketArray publish_thread_packet_array;
158  {
159  boost::mutex::scoped_lock lock(shared_packet_mutex_);
160  if (shared_packet_array_.size() != 0) {
161  publish_thread_packet_array = shared_packet_array_;
162  }
163  // Exit from special section as soon as possible
164  }
165  if (publish_thread_packet_array.size() != 0) {
166  if (publish_thread_packet_array[0].seq_id != published_seq_id) {
167  publishPackets(publish_thread_packet_array);
168  published_seq_id = publish_thread_packet_array[0].seq_id;
169  }
170  }
171  r.sleep();
172  }
173  }
174  }
175 
176  void publishPackets(PacketArray& packet_array)
177  {
178  msg_.header.stamp = ros::Time::now();
179  msg_.seq_id = packet_array[0].seq_id;
180  // Sort packet_array according to packet_id
181  std::sort(packet_array.begin(), packet_array.end());
182  // We expect all the packet size are same
183  size_t packet_size = packet_array[0].data.size();
184  size_t num = packet_array[0].packet_num;
185  if (msg_.data.size() != num * packet_size) {
186  msg_.data.resize(num * packet_size);
187  }
188  std::fill(msg_.data.begin(), msg_.data.end(), 0);
189  PacketArray::iterator it = packet_array.begin();
190  size_t target_packet_id = 0;
191 
192  ROS_INFO("expected data size: %lu", num * packet_size);
193  size_t len = 0;
194  while (target_packet_id < num && it != packet_array.end()) {
195  if (it->packet_id == target_packet_id) {
196  len += it->fill(msg_.data, target_packet_id * packet_size, (target_packet_id + 1) * packet_size);
197  ++it;
198  }
199  else { // fill by dummy 0 data == just skip
200  // pass
201  len += packet_size;
202  }
203  ++target_packet_id;
204  }
205  ROS_INFO("len: %lu", len);
206  msg_.data.resize(len);
207  pub_.publish(msg_);
208  }
209 
210  PacketArray shared_packet_array_;
211  SilverhammerInternalBuffer msg_;
213  boost::mutex mutex_;
214  boost::mutex shared_packet_mutex_;
216  boost::thread thread_;
217  boost::thread pub_thread_;
221  std::string receive_ip_;
223  //int packet_size_;
226  private:
227  };
228 }
229 
230 int main(int argc, char** argv)
231 {
232  ros::init(argc, argv, "silverhammer_highspeed_internal_receiver");
234  ros::spin();
235  return 0;
236 }
ROSCPP_DECL void init(int &argc, char **argv, const std::string &name, uint32_t options=0)
#define ROS_WARN(...)
#define ROS_INFO(...)
bool param(const std::string &param_name, T &param_val, const T &default_val) const
ROSCPP_DECL bool ok()
Packet(size_t aseq_id, size_t apacket_id, size_t apacket_num)
size_t fill(std::vector< Base > &target, size_t start, size_t end)
Publisher advertise(const std::string &topic, uint32_t queue_size, bool latch=false)
ROSCPP_DECL void spin()
bool sleep()
int main(int argc, char **argv)
static Time now()


jsk_network_tools
Author(s): Yusuke Furuta
autogenerated on Tue Feb 6 2018 03:45:07