scan_data_receiver.cpp
Go to the documentation of this file.
1 // Copyright (c) 2014, Pepperl+Fuchs GmbH, Mannheim
2 // Copyright (c) 2014, Denis Dillenberger
3 // All rights reserved.
4 //
5 // Use, modification, and distribution is subject to the
6 // 3-clause BSD license ("Revised BSD License",
7 // "New BSD License", or "Modified BSD License")
8 // You should have received a copy of this license
9 // in a file named COPYING or LICENSE.
10 
11 #include "scan_data_receiver.h"
12 #include <chrono>
13 #include <ctime>
14 
15 namespace pepperl_fuchs {
16 
17 ScanDataReceiver::ScanDataReceiver(const std::string hostname, const int tcp_port):inbuf_(4096),instream_(&inbuf_),ring_buffer_(65536),scan_data_()
18 {
19  last_data_time_ = std::time(0);
20  tcp_socket_ = 0;
21  udp_socket_ = 0;
22  udp_port_ = -1;
23  is_connected_ = false;
24 
25  std::cout << "Connecting to TCP data channel at " << hostname << ":" << tcp_port << " ... ";
26  try
27  {
28  // Resolve hostname/ip
29  boost::asio::ip::tcp::resolver resolver(io_service_);
30  boost::asio::ip::tcp::resolver::query query(hostname, std::to_string(tcp_port));
31  boost::asio::ip::tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
32  boost::asio::ip::tcp::resolver::iterator end;
33 
34  tcp_socket_ = new boost::asio::ip::tcp::socket(io_service_);
35  boost::system::error_code error = boost::asio::error::host_not_found;
36 
37  // Iterate over endpoints and etablish connection
38  while (error && endpoint_iterator != end)
39  {
40  tcp_socket_->close();
41  tcp_socket_->connect(*endpoint_iterator++, error);
42  }
43  if (error)
44  throw boost::system::system_error(error);
45 
46  // Start async reading
47  boost::asio::async_read(*tcp_socket_, inbuf_, boost::bind(&ScanDataReceiver::handleSocketRead, this, boost::asio::placeholders::error));
48  io_service_thread_ = boost::thread(boost::bind(&boost::asio::io_service::run, &io_service_));
49  is_connected_ = true;
50  }
51  catch (std::exception& e)
52  {
53  std::cerr << "Exception: " << e.what() << std::endl;
54  }
55 }
56 
57 //-----------------------------------------------------------------------------
59 {
60  tcp_socket_ = 0;
61  udp_socket_ = 0;
62  udp_port_ = -1;
63  is_connected_ = false;
64 
65 
66  try
67  {
68  udp_socket_ = new boost::asio::ip::udp::socket(io_service_, boost::asio::ip::udp::v4());
69  udp_socket_->bind(boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), 0));
70  udp_port_ = udp_socket_->local_endpoint().port();
71  // Start async reading
72  udp_socket_->async_receive_from(boost::asio::buffer(&udp_buffer_[0],udp_buffer_.size()), udp_endpoint_,
73  boost::bind(&ScanDataReceiver::handleSocketRead, this,
74  boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
75  io_service_thread_ = boost::thread(boost::bind(&boost::asio::io_service::run, &io_service_));
76  is_connected_ = true;
77  }
78  catch (std::exception& e)
79  {
80  std::cerr << "Exception: " << e.what() << std::endl;
81  }
82  std::cout << "Receiving scanner data at local UDP port " << udp_port_ << " ... ";
83 
84 }
85 
86 //-----------------------------------------------------------------------------
88 {
89  disconnect();
90  delete udp_socket_;
91  delete tcp_socket_;
92 }
93 
94 //-----------------------------------------------------------------------------
95 void ScanDataReceiver::handleSocketRead(const boost::system::error_code& error)
96 {
97  if (!error )
98  {
99  // Read all received data and write it to the internal ring buffer
100  instream_.clear();
101  while(!instream_.eof())
102  {
103  char buf[4096];
104  instream_.read(buf,4096);
105  int bytes_read = instream_.gcount();
106  writeBufferBack(buf,bytes_read);
107  }
108 
109  // Handle (read and parse) packets stored in the internal ring buffer
110  while( handleNextPacket() ) {}
111 
112  // Read data asynchronously
113  boost::asio::async_read(*tcp_socket_, inbuf_, boost::bind(&ScanDataReceiver::handleSocketRead, this, boost::asio::placeholders::error));
114  }
115  else
116  {
117  if( error.value() != 995 )
118  std::cerr << "ERROR: " << "data connection error: " << error.message() << "(" << error.value() << ")" << std::endl;
119  disconnect();
120  }
121  last_data_time_ = std::time(0);
122 }
123 
124 //-----------------------------------------------------------------------------
125 void ScanDataReceiver::handleSocketRead(const boost::system::error_code &error, std::size_t bytes_transferred)
126 {
127  if (!error )
128  {
129  // Read all received data and write it to the internal ring buffer
130  writeBufferBack(&udp_buffer_[0],bytes_transferred);
131 
132  // Handle (read and parse) packets stored in the internal ring buffer
133  while( handleNextPacket() ) {}
134 
135  // Read data asynchronously
136  udp_socket_->async_receive_from(boost::asio::buffer(&udp_buffer_[0],udp_buffer_.size()), udp_endpoint_,
137  boost::bind(&ScanDataReceiver::handleSocketRead, this,
138  boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
139  }
140  else
141  {
142  if( error.value() != 995 )
143  std::cerr << "ERROR: " << "data connection error: " << error.message() << "(" << error.value() << ")" << std::endl;
144  disconnect();
145  }
146  last_data_time_ = std::time(0);
147 }
148 
149 //-----------------------------------------------------------------------------
151 {
152  // Search for a packet
153  int packet_start = findPacketStart();
154  if( packet_start<0 )
155  return false;
156 
157  // Try to retrieve packet
158  char buf[65536];
159  PacketTypeC* p = (PacketTypeC*) buf;
160  if( !retrievePacket(packet_start,p) )
161  return false;
162 
163  // Lock internal outgoing data queue, automatically unlocks at end of function
164  std::unique_lock<std::mutex> lock(data_mutex_);
165 
166  // Create new scan container if necessary
167  if( p->header.packet_number == 1 || scan_data_.empty() )
168  {
169  scan_data_.emplace_back();
170  if( scan_data_.size()>100 )
171  {
172  scan_data_.pop_front();
173  std::cerr << "Too many scans in receiver queue: Dropping scans!" << std::endl;
174  }
175  data_notifier_.notify_one();
176  }
177  ScanData& scandata = scan_data_.back();
178 
179  // Parse payload of packet
180  std::uint32_t* p_scan_data = (std::uint32_t*) &buf[p->header.header_size];
181  int num_scan_points = p->header.num_points_packet;
182 
183  for( int i=0; i<num_scan_points; i++ )
184  {
185  unsigned int data = p_scan_data[i];
186  unsigned int distance = (data & 0x000FFFFF);
187  unsigned int amplitude = (data & 0xFFFFF000) >> 20;
188 
189  scandata.distance_data.push_back(distance);
190  scandata.amplitude_data.push_back(amplitude);
191  }
192 
193  // Save header
194  scandata.headers.push_back(p->header);
195 
196  return true;
197 }
198 
199 //-----------------------------------------------------------------------------
201 {
202  if( ring_buffer_.size()<60 )
203  return -1;
204  for( std::size_t i=0; i<ring_buffer_.size()-4; i++)
205  {
206  if( ((unsigned char) ring_buffer_[i]) == 0x5c
207  && ((unsigned char) ring_buffer_[i+1]) == 0xa2
208  && ((unsigned char) ring_buffer_[i+2]) == 0x43
209  && ((unsigned char) ring_buffer_[i+3]) == 0x00 )
210  {
211  return i;
212  }
213  }
214  return -2;
215 }
216 
217 //-----------------------------------------------------------------------------
219 {
220  if( ring_buffer_.size()<60 )
221  return false;
222 
223  // Erase preceding bytes
224  ring_buffer_.erase_begin(start);
225 
226  char* pp = (char*) p;
227  // Read header
228  readBufferFront(pp,60);
229 
230  if( ring_buffer_.size() < p->header.packet_size )
231  return false;
232 
233  // Read header+payload data
235 
236  // Erase packet from ring buffer
237  ring_buffer_.erase_begin(p->header.packet_size);
238  return true;
239 }
240 
241 //-----------------------------------------------------------------------------
243 {
244  is_connected_ = false;
245  try
246  {
247  if( tcp_socket_ )
248  tcp_socket_->close();
249  if( udp_socket_ )
250  udp_socket_->close();
251  io_service_.stop();
252  if( boost::this_thread::get_id() != io_service_thread_.get_id() )
253  io_service_thread_.join();
254  }
255  catch (std::exception& e)
256  {
257  std::cerr << "Exception: " << e.what() << std::endl;
258  }
259 }
260 
261 //-----------------------------------------------------------------------------
263 {
264  if( !isConnected() )
265  return false;
266  if( (std::time(0)-last_data_time_) > 2 )
267  {
268  disconnect();
269  return false;
270  }
271  return true;
272 }
273 
274 //-----------------------------------------------------------------------------
276 {
277  std::unique_lock<std::mutex> lock(data_mutex_);
278  ScanData data(std::move(scan_data_.front()));
279  scan_data_.pop_front();
280  return data;
281 }
282 
283 //-----------------------------------------------------------------------------
285 {
286  std::unique_lock<std::mutex> lock(data_mutex_);
287  while( checkConnection() && isConnected() && scan_data_.size()<2 )
288  {
289  data_notifier_.wait_for(lock, std::chrono::seconds(1));
290  }
291  ScanData data;
292  if( scan_data_.size() >= 2 && isConnected() )
293  {
294  data = ScanData(std::move(scan_data_.front()));
295  scan_data_.pop_front();
296  }
297  return data;
298 }
299 
300 //-----------------------------------------------------------------------------
302 {
303  if( scan_data_.size() == 0 )
304  return 0;
305  else
306  return scan_data_.size()-1;
307 }
308 //-----------------------------------------------------------------------------
309 void ScanDataReceiver::writeBufferBack(char *src, std::size_t numbytes)
310 {
311  if( ring_buffer_.size()+numbytes > ring_buffer_.capacity() )
312  throw std::exception();
313  ring_buffer_.resize(ring_buffer_.size()+numbytes);
314  char* pone = ring_buffer_.array_one().first;
315  std::size_t pone_size = ring_buffer_.array_one().second;
316  char* ptwo = ring_buffer_.array_two().first;
317  std::size_t ptwo_size = ring_buffer_.array_two().second;
318 
319  if( ptwo_size >= numbytes )
320  {
321  std::memcpy(ptwo+ptwo_size-numbytes, src, numbytes);
322  }
323  else
324  {
325  std::memcpy(pone+pone_size+ptwo_size-numbytes,
326  src,
327  numbytes-ptwo_size );
328  std::memcpy(ptwo,
329  src+numbytes-ptwo_size,
330  ptwo_size );
331  }
332 }
333 
334 //-----------------------------------------------------------------------------
335 void ScanDataReceiver::readBufferFront(char *dst, std::size_t numbytes)
336 {
337  if( ring_buffer_.size() < numbytes )
338  throw std::exception();
339  char* pone = ring_buffer_.array_one().first;
340  std::size_t pone_size = ring_buffer_.array_one().second;
341  char* ptwo = ring_buffer_.array_two().first;
342  //std::size_t ptwo_size = ring_buffer_.array_two().second;
343 
344  if( pone_size >= numbytes )
345  {
346  std::memcpy( dst, pone, numbytes );
347  }
348  else
349  {
350  std::memcpy( dst, pone, pone_size );
351  std::memcpy( dst+pone_size, ptwo, numbytes-pone_size);
352  }
353 }
354 
355 //-----------------------------------------------------------------------------
356 }
boost::thread io_service_thread_
Event handler thread.
boost::asio::ip::tcp::socket * tcp_socket_
Receiving socket.
void disconnect()
Disconnect and cleanup.
~ScanDataReceiver()
Disconnect cleanly.
std::size_t getFullScansAvailable() const
Get the total number of fully received laserscans available.
std::uint16_t packet_number
Sequence number for packet (counting packets of a particular scan, starting with 1) ...
double last_data_time_
time in seconds since epoch, when last data was received
void handleSocketRead(const boost::system::error_code &error)
Asynchronous callback function, called if data has been reveived by the TCP socket.
boost::asio::streambuf inbuf_
Boost::Asio streambuffer.
boost::asio::ip::udp::socket * udp_socket_
Receiving socket.
Normally contains one complete laserscan (a full rotation of the scanner head)
std::mutex data_mutex_
Protection against data races between ROS and IO threads.
std::uint32_t packet_size
Overall packet size (header+payload), 1404 bytes with maximum payload.
boost::asio::ip::udp::endpoint udp_endpoint_
Endpoint in case of UDP receiver.
std::uint16_t num_points_packet
Total number of scan points within this packet.
std::condition_variable data_notifier_
Data notification condition variable.
std::vector< std::uint32_t > distance_data
Distance data in polar form in millimeter.
bool retrievePacket(std::size_t start, PacketTypeC *p)
boost::circular_buffer< char > ring_buffer_
Internal ringbuffer for temporarily storing reveived data.
ScanDataReceiver()
Open an UDP port and listen on it.
std::vector< std::uint32_t > amplitude_data
Amplitude data in the range 32-4095, values lower than 32 indicate an error or undefined values...
std::vector< PacketHeader > headers
Header received with the distance and amplitude data.
boost::asio::io_service io_service_
Structure of a UDP or TCP data packet from the laserscanner.
std::uint16_t header_size
Header size, defaults to 60 bytes.
bool isConnected() const
Return connection status.
std::array< char, 65536 > udp_buffer_
Buffer in case of UDP receiver.
void readBufferFront(char *dst, std::size_t numbytes)
void writeBufferBack(char *src, std::size_t numbytes)
int udp_port_
Data (UDP) port at local side.
std::deque< ScanData > scan_data_
Double ended queue with sucessfully received and parsed data, organized as single complete scans...
bool is_connected_
Internal connection state.
std::istream instream_
Input stream.


pepperl_fuchs_r2000
Author(s): Denis Dillenberger
autogenerated on Mon Jun 10 2019 14:12:48