36 #include <boost/array.hpp> 37 #include <boost/asio.hpp> 38 #include <boost/thread.hpp> 39 #include <jsk_network_tools/SilverhammerInternalBuffer.h> 41 #include <boost/shared_ptr.hpp> 46 typedef unsigned char Base;
56 Packet(
size_t aseq_id,
size_t apacket_id,
size_t apacket_num):
57 seq_id(aseq_id), packet_id(apacket_id), packet_num(apacket_num)
64 memcpy(&data[0], &buf[12], size);
67 size_t fill(std::vector<Base>& target,
size_t start,
size_t end)
69 memcpy(&target[start], &data[0], std::min(end - start, data.size()));
70 return std::min(end - start, data.size());
87 nh.
param(
"receive_port", receive_port_, 16484);
88 nh.
param(
"pesimistic", pesimistic_,
true);
89 nh.
param(
"fragment_packets_tolerance", fragment_packets_tolerance_, 0);
90 nh.
param(
"expected_rate", expected_rate_, 2.0);
91 pub_ = nh.
advertise<SilverhammerInternalBuffer>(
"input", 1);
93 boost::asio::io_service io_service;
94 socket_.reset(
new boost::asio::ip::udp::socket(io_service, boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), receive_port_)));
104 PacketArray packet_array;
106 int last_received_seq_id = -1;
108 boost::array<Base, 1500> recv_buf;
109 boost::asio::ip::udp::endpoint remote_endpoint;
110 boost::system::error_code error;
111 size_t len = socket_->receive_from(boost::asio::buffer(recv_buf), remote_endpoint, 0, error);
113 if (len < 4 + 4 + 4) {
117 size_t seq_id = recv_buf[0] * (1 << 24) + recv_buf[1] * (1 << 16) + recv_buf[2] * (1 << 8) + recv_buf[3] * (1 << 0);
118 size_t packet_id = recv_buf[0 + 4] * (1 << 24) + recv_buf[1 + 4] * (1 << 16) + recv_buf[2 + 4] * (1 << 8) + recv_buf[3 + 4] * (1 << 0);
119 size_t packet_num = recv_buf[0 + 8] * (1 << 24) + recv_buf[1 + 8] * (1 << 16) + recv_buf[2 + 8] * (1 << 8) + recv_buf[3 + 8] * (1 << 0);
121 Packet packet(seq_id, packet_id, packet_num);
122 packet.
fillData(len - 4 * 3, recv_buf.data());
127 if (last_received_seq_id == -1) {
128 packet_array.push_back(packet);
129 last_received_seq_id =
seq_id;
131 else if (last_received_seq_id != seq_id) {
132 ROS_INFO(
"seq_id := %d, packet_array.size() := %lu, packet_num := %lu",
133 last_received_seq_id, packet_array.size(), packet_array[0].packet_num);
134 size_t before_size = packet_array.size();
135 if (packet_array.size() == packet_array[0].packet_num || !pesimistic_) {
136 boost::mutex::scoped_lock lock(shared_packet_mutex_);
137 shared_packet_array_ = packet_array;
140 packet_array = PacketArray();
141 packet_array.reserve(before_size * 2);
142 packet_array.push_back(packet);
145 packet_array.push_back(packet);
147 last_received_seq_id =
seq_id;
154 int published_seq_id = -1;
157 PacketArray publish_thread_packet_array;
159 boost::mutex::scoped_lock lock(shared_packet_mutex_);
160 if (shared_packet_array_.size() != 0) {
161 publish_thread_packet_array = shared_packet_array_;
165 if (publish_thread_packet_array.size() != 0) {
166 if (publish_thread_packet_array[0].
seq_id != published_seq_id) {
167 publishPackets(publish_thread_packet_array);
168 published_seq_id = publish_thread_packet_array[0].seq_id;
179 msg_.seq_id = packet_array[0].seq_id;
181 std::sort(packet_array.begin(), packet_array.end());
183 size_t packet_size = packet_array[0].data.size();
184 size_t num = packet_array[0].packet_num;
185 if (msg_.data.size() != num * packet_size) {
186 msg_.data.resize(num * packet_size);
188 std::fill(msg_.data.begin(), msg_.data.end(), 0);
189 PacketArray::iterator it = packet_array.begin();
190 size_t target_packet_id = 0;
192 ROS_INFO(
"expected data size: %lu", num * packet_size);
194 while (target_packet_id < num && it != packet_array.end()) {
195 if (it->packet_id == target_packet_id) {
196 len += it->fill(msg_.data, target_packet_id * packet_size, (target_packet_id + 1) * packet_size);
206 msg_.data.resize(len);
211 SilverhammerInternalBuffer
msg_;
230 int main(
int argc,
char** argv)
232 ros::init(argc, argv,
"silverhammer_highspeed_internal_receiver");
ROSCPP_DECL void init(int &argc, char **argv, const std::string &name, uint32_t options=0)
bool param(const std::string ¶m_name, T ¶m_val, const T &default_val) const
Publisher advertise(const std::string &topic, uint32_t queue_size, bool latch=false)
int main(int argc, char **argv)