00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #include <iostream>
00036 #include <boost/array.hpp>
00037 #include <boost/asio.hpp>
00038 #include <boost/thread.hpp>
00039 #include <jsk_network_tools/SilverhammerInternalBuffer.h>
00040 #include <ros/ros.h>
00041 #include <boost/shared_ptr.hpp>
00042 #include <algorithm>
00043
00044 namespace jsk_network_tools
00045 {
00046 typedef unsigned char Base;
00047
00048 class Packet
00049 {
00050 public:
00051 typedef boost::shared_ptr<Packet> Ptr;
00052 size_t seq_id;
00053 size_t packet_id;
00054 size_t packet_num;
00055 std::vector<Base> data;
00056 Packet(size_t aseq_id, size_t apacket_id, size_t apacket_num):
00057 seq_id(aseq_id), packet_id(apacket_id), packet_num(apacket_num)
00058 {
00059 }
00060
00061 void fillData(size_t size, Base* buf)
00062 {
00063 data.resize(size);
00064 memcpy(&data[0], &buf[12], size);
00065 }
00066
00067 size_t fill(std::vector<Base>& target, size_t start, size_t end)
00068 {
00069 memcpy(&target[start], &data[0], std::min(end - start, data.size()));
00070 return std::min(end - start, data.size());
00071 }
00072
00073 bool operator<(const Packet& other) const
00074 {
00075 return packet_id < other.packet_id;
00076 }
00077 };
00078
00079 class SilverhammerHighspeedInternalReceiver
00080 {
00081 public:
00082 typedef std::vector<Packet> PacketArray;
00083 typedef std::map<size_t, PacketArray> PacketTable;
00084 SilverhammerHighspeedInternalReceiver(): initialized_(false)
00085 {
00086 ros::NodeHandle nh, pnh("~");
00087 nh.param("receive_port", receive_port_, 16484);
00088 nh.param("pesimistic", pesimistic_, true);
00089 nh.param("fragment_packets_tolerance", fragment_packets_tolerance_, 0);
00090 nh.param("expected_rate", expected_rate_, 2.0);
00091 pub_ = nh.advertise<SilverhammerInternalBuffer>("input", 1);
00092
00093 boost::asio::io_service io_service;
00094 socket_.reset(new boost::asio::ip::udp::socket(io_service, boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), receive_port_)));
00095 thread_ = boost::thread(boost::bind(&SilverhammerHighspeedInternalReceiver::threadFunc, this));
00096 pub_thread_ = boost::thread(boost::bind(&SilverhammerHighspeedInternalReceiver::publishPacketsFunc, this));
00097
00098 }
00099
00100 protected:
00101
00102 void threadFunc()
00103 {
00104 PacketArray packet_array;
00105
00106 int last_received_seq_id = -1;
00107 while (ros::ok()) {
00108 boost::array<Base, 1500> recv_buf;
00109 boost::asio::ip::udp::endpoint remote_endpoint;
00110 boost::system::error_code error;
00111 size_t len = socket_->receive_from(boost::asio::buffer(recv_buf), remote_endpoint, 0, error);
00112
00113 if (len < 4 + 4 + 4) {
00114 ROS_WARN("too short packet");
00115 continue;
00116 }
00117 size_t seq_id = recv_buf[0] * (1 << 24) + recv_buf[1] * (1 << 16) + recv_buf[2] * (1 << 8) + recv_buf[3] * (1 << 0);
00118 size_t packet_id = recv_buf[0 + 4] * (1 << 24) + recv_buf[1 + 4] * (1 << 16) + recv_buf[2 + 4] * (1 << 8) + recv_buf[3 + 4] * (1 << 0);
00119 size_t packet_num = recv_buf[0 + 8] * (1 << 24) + recv_buf[1 + 8] * (1 << 16) + recv_buf[2 + 8] * (1 << 8) + recv_buf[3 + 8] * (1 << 0);
00120
00121 Packet packet(seq_id, packet_id, packet_num);
00122 packet.fillData(len - 4 * 3, recv_buf.data());
00123
00124
00125
00126
00127 if (last_received_seq_id == -1) {
00128 packet_array.push_back(packet);
00129 last_received_seq_id = seq_id;
00130 }
00131 else if (last_received_seq_id != seq_id) {
00132 ROS_INFO("seq_id := %d, packet_array.size() := %lu, packet_num := %lu",
00133 last_received_seq_id, packet_array.size(), packet_array[0].packet_num);
00134 size_t before_size = packet_array.size();
00135 if (packet_array.size() == packet_array[0].packet_num || !pesimistic_) {
00136 boost::mutex::scoped_lock lock(shared_packet_mutex_);
00137 shared_packet_array_ = packet_array;
00138
00139 }
00140 packet_array = PacketArray();
00141 packet_array.reserve(before_size * 2);
00142 packet_array.push_back(packet);
00143 }
00144 else {
00145 packet_array.push_back(packet);
00146 }
00147 last_received_seq_id = seq_id;
00148 }
00149 }
00150
00151 void publishPacketsFunc()
00152 {
00153 ros::Rate r(expected_rate_);
00154 int published_seq_id = -1;
00155 while (ros::ok()) {
00156 {
00157 PacketArray publish_thread_packet_array;
00158 {
00159 boost::mutex::scoped_lock lock(shared_packet_mutex_);
00160 if (shared_packet_array_.size() != 0) {
00161 publish_thread_packet_array = shared_packet_array_;
00162 }
00163
00164 }
00165 if (publish_thread_packet_array.size() != 0) {
00166 if (publish_thread_packet_array[0].seq_id != published_seq_id) {
00167 publishPackets(publish_thread_packet_array);
00168 published_seq_id = publish_thread_packet_array[0].seq_id;
00169 }
00170 }
00171 r.sleep();
00172 }
00173 }
00174 }
00175
00176 void publishPackets(PacketArray& packet_array)
00177 {
00178 msg_.header.stamp = ros::Time::now();
00179 msg_.seq_id = packet_array[0].seq_id;
00180
00181 std::sort(packet_array.begin(), packet_array.end());
00182
00183 size_t packet_size = packet_array[0].data.size();
00184 size_t num = packet_array[0].packet_num;
00185 if (msg_.data.size() != num * packet_size) {
00186 msg_.data.resize(num * packet_size);
00187 }
00188 std::fill(msg_.data.begin(), msg_.data.end(), 0);
00189 PacketArray::iterator it = packet_array.begin();
00190 size_t target_packet_id = 0;
00191
00192 ROS_INFO("expected data size: %lu", num * packet_size);
00193 size_t len = 0;
00194 while (target_packet_id < num && it != packet_array.end()) {
00195 if (it->packet_id == target_packet_id) {
00196 len += it->fill(msg_.data, target_packet_id * packet_size, (target_packet_id + 1) * packet_size);
00197 ++it;
00198 }
00199 else {
00200
00201 len += packet_size;
00202 }
00203 ++target_packet_id;
00204 }
00205 ROS_INFO("len: %lu", len);
00206 msg_.data.resize(len);
00207 pub_.publish(msg_);
00208 }
00209
00210 PacketArray shared_packet_array_;
00211 SilverhammerInternalBuffer msg_;
00212 bool initialized_;
00213 boost::mutex mutex_;
00214 boost::mutex shared_packet_mutex_;
00215 boost::condition_variable thread_state_;
00216 boost::thread thread_;
00217 boost::thread pub_thread_;
00218 boost::shared_ptr<boost::asio::ip::udp::socket> socket_;
00219 int receive_port_;
00220 int fragment_packets_tolerance_;
00221 std::string receive_ip_;
00222 double expected_rate_;
00223
00224 ros::Publisher pub_;
00225 bool pesimistic_;
00226 private:
00227 };
00228 }
00229
00230 int main(int argc, char** argv)
00231 {
00232 ros::init(argc, argv, "silverhammer_highspeed_internal_receiver");
00233 jsk_network_tools::SilverhammerHighspeedInternalReceiver receiver;
00234 ros::spin();
00235 return 0;
00236 }