67 static uint32_t
crc32(uint32_t crc,
const uint8_t* buf,
size_t len)
73 for (
int k = 0; k < 8; k++)
74 crc = crc & 1 ? (crc >> 1) ^ 0xedb88320 : crc >> 1;
85 m_udp_recv_buffer_size(0), m_udp_timeout_recv_nonblocking(0), m_udp_sender_timeout(0)
108 if (m_socket_impl || m_fifo_impl || m_receiver_thread)
111 m_udp_recv_buffer_size = 64 * 1024;
112 m_udp_msg_start_seq = { 0x02, 0x02, 0x02, 0x02 };
113 m_udp_timeout_recv_nonblocking = 1.0;
114 #if defined RASPBERRY && RASPBERRY > 0
115 m_udp_timeout_recv_nonblocking = -1.0;
117 m_udp_sender_timeout = 2.0;
119 m_export_udp_msg = export_udp_msg;
120 m_scandataformat = scandataformat;
123 ROS_ERROR_STREAM(
"## ERROR UdpReceiver::Init(): invalid scandataformat configuration, unsupported scandataformat=" << m_scandataformat
131 m_fifo_impl_created =
false;
135 m_fifo_impl =
new PayloadFifo(udp_input_fifolength);
136 m_fifo_impl_created =
true;
139 if (!m_socket_impl->Init(udp_sender,
udp_port))
141 ROS_ERROR_STREAM(
"## ERROR UdpReceiver::Init(): UdpReceiverSocketImpl::Init(" << udp_sender <<
"," <<
udp_port <<
") failed.");
152 m_run_receiver_thread =
true;
162 m_run_receiver_thread =
false;
164 m_socket_impl->running() =
false;
165 if (do_join && m_receiver_thread && m_receiver_thread->joinable())
166 m_receiver_thread->join();
174 m_run_receiver_thread =
false;
176 m_socket_impl->running() =
false;
177 if (m_fifo_impl && m_fifo_impl_created)
179 m_fifo_impl->Shutdown();
181 if (m_receiver_thread)
183 if (m_receiver_thread->joinable())
184 m_receiver_thread->join();
185 delete m_receiver_thread;
186 m_receiver_thread = 0;
190 delete m_socket_impl;
193 if (m_fifo_impl && m_fifo_impl_created)
198 m_fifo_impl_created =
false;
208 ROS_ERROR_STREAM(
"## ERROR UdpReceiver::Run(): UdpReceiver not initialized, call UdpReceiver::Init() first.");
213 size_t udp_recv_counter = 0;
214 std::vector<uint8_t> udp_payload(m_udp_recv_buffer_size, 0);
215 double udp_recv_timeout = -1;
219 while (m_run_receiver_thread)
221 size_t bytes_received = m_socket_impl->Receive(udp_payload, udp_recv_timeout, m_udp_msg_start_seq);
225 ROS_DEBUG_STREAM(
"UdpReceiver::Run(): " << bytes_received <<
" bytes received (port " << m_socket_impl->port() <<
", udp_receiver.cpp:" << __LINE__ <<
")");
226 if(m_run_receiver_thread && bytes_received > m_udp_msg_start_seq.size() + 8 && std::equal(udp_payload.begin(), udp_payload.begin() + m_udp_msg_start_seq.size(), m_udp_msg_start_seq.begin()))
229 bool crc_error =
false, check_crc =
true;
230 uint32_t payload_length_bytes = 0;
231 uint32_t bytes_to_receive = 0;
232 uint32_t udp_payload_offset = 0;
236 payload_length_bytes =
Convert4Byte(udp_payload.data() + m_udp_msg_start_seq.size());
237 bytes_to_receive = (uint32_t)(payload_length_bytes + m_udp_msg_start_seq.size() + 2 *
sizeof(uint32_t));
238 udp_payload_offset = m_udp_msg_start_seq.size() +
sizeof(uint32_t);
242 bool parse_success =
false;
243 uint32_t num_bytes_required = 0;
245 while (m_run_receiver_thread &&
249 if (num_bytes_required > 1024*1024)
251 parse_success =
false;
252 ROS_ERROR_STREAM(
"## ERROR UdpReceiver::Run(): " << bytes_received <<
" bytes received (compact), " << (num_bytes_required +
sizeof(uint32_t)) <<
" bytes or more required, probably incorrect payload");
257 ROS_DEBUG_STREAM(
"UdpReceiver::Run(): " << bytes_received <<
" bytes received (compact), " << (num_bytes_required +
sizeof(uint32_t)) <<
" bytes or more required");
258 while(m_run_receiver_thread && bytes_received < num_bytes_required +
sizeof(uint32_t) &&
261 std::vector<uint8_t> chunk_payload(m_udp_recv_buffer_size, 0);
262 size_t chunk_bytes_received = m_socket_impl->Receive(chunk_payload);
264 ROS_INFO_STREAM(
"UdpReceiver::Run(): chunk of " << chunk_bytes_received <<
" bytes received (udp_receiver.cpp:" << __LINE__ <<
")");
265 udp_payload.insert(udp_payload.begin() + bytes_received, chunk_payload.begin(), chunk_payload.begin() + chunk_bytes_received);
266 bytes_received += chunk_bytes_received;
271 ROS_ERROR_STREAM(
"## ERROR UdpReceiver::Run(): CompactDataParser::ParseSegment failed");
274 bytes_to_receive = (uint32_t)(payload_length_bytes +
sizeof(uint32_t));
275 udp_payload_offset = 0;
276 ROS_DEBUG_STREAM(
"UdpReceiver::Run(): payload_length_bytes=" << payload_length_bytes <<
", bytes_to_receive= " << bytes_to_receive <<
", bytes_received=" << bytes_received <<
" (udp_receiver.cpp:" << __LINE__ <<
")");
280 ROS_ERROR_STREAM(
"## ERROR UdpReceiver::Run(): invalid scandataformat configuration, unsupported scandataformat=" << m_scandataformat
282 m_run_receiver_thread =
false;
283 m_socket_impl->running() =
false;
286 if (bytes_received != bytes_to_receive)
288 if (bytes_received < bytes_to_receive)
290 ROS_ERROR_STREAM(
"## ERROR UdpReceiver::Run(): " << bytes_received <<
" bytes received, " << bytes_to_receive <<
" bytes expected, payload_length=" << payload_length_bytes <<
" bytes");
295 ROS_INFO_STREAM(
"UdpReceiver::Run(): " << bytes_received <<
" bytes received, " << bytes_to_receive <<
" bytes expected, payload_length=" << payload_length_bytes <<
" bytes");
301 size_t bytes_valid = std::min<size_t>(bytes_received, (
size_t)bytes_to_receive);
302 uint32_t u32PayloadCRC =
Convert4Byte(udp_payload.data() + bytes_valid -
sizeof(uint32_t));
303 std::vector<uint8_t> msgpack_payload(udp_payload.begin() + udp_payload_offset, udp_payload.begin() + bytes_valid -
sizeof(uint32_t));
304 uint32_t u32MsgPackCRC =
crc32(0, msgpack_payload.data(), msgpack_payload.size());
305 if (check_crc && u32PayloadCRC != u32MsgPackCRC)
308 if (do_print_crc_error)
310 ROS_ERROR_STREAM(
"## ERROR UdpReceiver::Run(): CRC 0x" << std::setfill(
'0') << std::setw(2) << std::hex << u32PayloadCRC
311 <<
" received from " << std::dec << (bytes_valid -
sizeof(uint32_t)) <<
" udp bytes different to CRC 0x"
312 << std::setfill(
'0') << std::setw(2) << std::hex << u32MsgPackCRC <<
" computed from "
313 << std::dec << (msgpack_payload.size()) <<
" byte payload, message dropped");
314 ROS_ERROR_STREAM(
"## ERROR UdpReceiver::Run(): decoded payload size: " << payload_length_bytes <<
" bytes, bytes_to_receive (expected udp message length): "
315 << bytes_to_receive <<
" byte, bytes_valid (received udp message length): " << bytes_valid <<
" byte");
320 if (payload_length_bytes != msgpack_payload.size() && do_print)
322 ROS_ERROR_STREAM(
"## ERROR UdpReceiver::Run(): payload_length_bytes=" << payload_length_bytes <<
" different to decoded payload size " << msgpack_payload.size());
328 size_t fifo_length = m_fifo_impl->Push(msgpack_payload,
fifo_clock::now(), udp_recv_counter);
330 if (m_verbose && do_print)
332 ROS_INFO_STREAM(
"UdpReceiver::Run(): " << bytes_received <<
" bytes received: " << ToPrintableString(udp_payload, bytes_received));
333 ROS_INFO_STREAM(
"UdpReceiver::Run(): " << fifo_length <<
" messages currently in paylod buffer, totally received " << udp_recv_counter <<
" udp packages");
337 if (m_export_udp_msg)
339 std::ofstream udp_ostream(std::string(
"udp_received_bin_") +
sick_scansegment_xd::FormatNumber(udp_recv_counter, 3,
true,
false, -1) +
".udp", std::ofstream::binary);
340 std::ofstream msg_ostream(std::string(
"udp_received_msg_") +
sick_scansegment_xd::FormatNumber(udp_recv_counter, 3,
true,
false, -1) +
".msg", std::ofstream::binary);
341 if (udp_ostream.is_open() && msg_ostream.is_open())
343 udp_ostream.write((
const char*)udp_payload.data(), bytes_received);
344 msg_ostream.write((
const char*)msgpack_payload.data(), msgpack_payload.size());
348 else if(bytes_received > 0)
352 ROS_ERROR_STREAM(
"## ERROR UdpReceiver::Run(): Received " << bytes_received <<
" unexpected bytes");
358 if(bytes_received > 0)
361 udp_recv_timeout = -1;
363 udp_recv_timeout = m_udp_timeout_recv_nonblocking;
365 m_run_receiver_thread =
false;
366 m_socket_impl->running() =
false;
369 catch (std::exception & e)
373 m_run_receiver_thread =
false;
374 m_socket_impl->running() =
false;
385 std::stringstream hexstream;
386 for (
size_t n = 0; n < bytes_received; n++)
388 hexstream << (n > 0 ?
" " :
"") << std::hex << (
int)(
payload[n] & 0xFF);
390 return hexstream.str();
400 std::vector<uint8_t> payload_printable(bytes_received + 1);
401 for (
size_t n = 0; n < bytes_received; n++)
404 payload_printable[n] =
payload[n];
406 payload_printable[n] =
'.';
408 payload_printable[bytes_received] = 0;
409 return std::string((
const char*)payload_printable.data());