00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011 #include "scan_data_receiver.h"
00012 #include <chrono>
00013 #include <ctime>
00014
00015 namespace pepperl_fuchs {
00016
00017 ScanDataReceiver::ScanDataReceiver(const std::string hostname, const int tcp_port):inbuf_(4096),instream_(&inbuf_),ring_buffer_(65536),scan_data_()
00018 {
00019 last_data_time_ = std::time(0);
00020 tcp_socket_ = 0;
00021 udp_socket_ = 0;
00022 udp_port_ = -1;
00023 is_connected_ = false;
00024
00025 std::cout << "Connecting to TCP data channel at " << hostname << ":" << tcp_port << " ... ";
00026 try
00027 {
00028
00029 boost::asio::ip::tcp::resolver resolver(io_service_);
00030 boost::asio::ip::tcp::resolver::query query(hostname, std::to_string(tcp_port));
00031 boost::asio::ip::tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
00032 boost::asio::ip::tcp::resolver::iterator end;
00033
00034 tcp_socket_ = new boost::asio::ip::tcp::socket(io_service_);
00035 boost::system::error_code error = boost::asio::error::host_not_found;
00036
00037
00038 while (error && endpoint_iterator != end)
00039 {
00040 tcp_socket_->close();
00041 tcp_socket_->connect(*endpoint_iterator++, error);
00042 }
00043 if (error)
00044 throw boost::system::system_error(error);
00045
00046
00047 boost::asio::async_read(*tcp_socket_, inbuf_, boost::bind(&ScanDataReceiver::handleSocketRead, this, boost::asio::placeholders::error));
00048 io_service_thread_ = boost::thread(boost::bind(&boost::asio::io_service::run, &io_service_));
00049 is_connected_ = true;
00050 }
00051 catch (std::exception& e)
00052 {
00053 std::cerr << "Exception: " << e.what() << std::endl;
00054 }
00055 }
00056
00057
00058 ScanDataReceiver::ScanDataReceiver():inbuf_(4096),instream_(&inbuf_),ring_buffer_(65536),scan_data_()
00059 {
00060 tcp_socket_ = 0;
00061 udp_socket_ = 0;
00062 udp_port_ = -1;
00063 is_connected_ = false;
00064
00065
00066 try
00067 {
00068 udp_socket_ = new boost::asio::ip::udp::socket(io_service_, boost::asio::ip::udp::v4());
00069 udp_socket_->bind(boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), 0));
00070 udp_port_ = udp_socket_->local_endpoint().port();
00071
00072 udp_socket_->async_receive_from(boost::asio::buffer(&udp_buffer_[0],udp_buffer_.size()), udp_endpoint_,
00073 boost::bind(&ScanDataReceiver::handleSocketRead, this,
00074 boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
00075 io_service_thread_ = boost::thread(boost::bind(&boost::asio::io_service::run, &io_service_));
00076 is_connected_ = true;
00077 }
00078 catch (std::exception& e)
00079 {
00080 std::cerr << "Exception: " << e.what() << std::endl;
00081 }
00082 std::cout << "Receiving scanner data at local UDP port " << udp_port_ << " ... ";
00083
00084 }
00085
00086
00087 ScanDataReceiver::~ScanDataReceiver()
00088 {
00089 disconnect();
00090 delete udp_socket_;
00091 delete tcp_socket_;
00092 }
00093
00094
00095 void ScanDataReceiver::handleSocketRead(const boost::system::error_code& error)
00096 {
00097 if (!error )
00098 {
00099
00100 instream_.clear();
00101 while(!instream_.eof())
00102 {
00103 char buf[4096];
00104 instream_.read(buf,4096);
00105 int bytes_read = instream_.gcount();
00106 writeBufferBack(buf,bytes_read);
00107 }
00108
00109
00110 while( handleNextPacket() ) {}
00111
00112
00113 boost::asio::async_read(*tcp_socket_, inbuf_, boost::bind(&ScanDataReceiver::handleSocketRead, this, boost::asio::placeholders::error));
00114 }
00115 else
00116 {
00117 if( error.value() != 995 )
00118 std::cerr << "ERROR: " << "data connection error: " << error.message() << "(" << error.value() << ")" << std::endl;
00119 disconnect();
00120 }
00121 last_data_time_ = std::time(0);
00122 }
00123
00124
00125 void ScanDataReceiver::handleSocketRead(const boost::system::error_code &error, std::size_t bytes_transferred)
00126 {
00127 if (!error )
00128 {
00129
00130 writeBufferBack(&udp_buffer_[0],bytes_transferred);
00131
00132
00133 while( handleNextPacket() ) {}
00134
00135
00136 udp_socket_->async_receive_from(boost::asio::buffer(&udp_buffer_[0],udp_buffer_.size()), udp_endpoint_,
00137 boost::bind(&ScanDataReceiver::handleSocketRead, this,
00138 boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
00139 }
00140 else
00141 {
00142 if( error.value() != 995 )
00143 std::cerr << "ERROR: " << "data connection error: " << error.message() << "(" << error.value() << ")" << std::endl;
00144 disconnect();
00145 }
00146 last_data_time_ = std::time(0);
00147 }
00148
00149
00150 bool ScanDataReceiver::handleNextPacket()
00151 {
00152
00153 int packet_start = findPacketStart();
00154 if( packet_start<0 )
00155 return false;
00156
00157
00158 char buf[65536];
00159 PacketTypeC* p = (PacketTypeC*) buf;
00160 if( !retrievePacket(packet_start,p) )
00161 return false;
00162
00163
00164 std::unique_lock<std::mutex> lock(data_mutex_);
00165
00166
00167 if( p->header.packet_number == 1 || scan_data_.empty() )
00168 {
00169 scan_data_.emplace_back();
00170 if( scan_data_.size()>100 )
00171 {
00172 scan_data_.pop_front();
00173 std::cerr << "Too many scans in receiver queue: Dropping scans!" << std::endl;
00174 }
00175 data_notifier_.notify_one();
00176 }
00177 ScanData& scandata = scan_data_.back();
00178
00179
00180 std::uint32_t* p_scan_data = (std::uint32_t*) &buf[p->header.header_size];
00181 int num_scan_points = p->header.num_points_packet;
00182
00183 for( int i=0; i<num_scan_points; i++ )
00184 {
00185 unsigned int data = p_scan_data[i];
00186 unsigned int distance = (data & 0x000FFFFF);
00187 unsigned int amplitude = (data & 0xFFFFF000) >> 20;
00188
00189 scandata.distance_data.push_back(distance);
00190 scandata.amplitude_data.push_back(amplitude);
00191 }
00192
00193
00194 scandata.headers.push_back(p->header);
00195
00196 return true;
00197 }
00198
00199
00200 int ScanDataReceiver::findPacketStart()
00201 {
00202 if( ring_buffer_.size()<60 )
00203 return -1;
00204 for( std::size_t i=0; i<ring_buffer_.size()-4; i++)
00205 {
00206 if( ((unsigned char) ring_buffer_[i]) == 0x5c
00207 && ((unsigned char) ring_buffer_[i+1]) == 0xa2
00208 && ((unsigned char) ring_buffer_[i+2]) == 0x43
00209 && ((unsigned char) ring_buffer_[i+3]) == 0x00 )
00210 {
00211 return i;
00212 }
00213 }
00214 return -2;
00215 }
00216
00217
00218 bool ScanDataReceiver::retrievePacket(std::size_t start, PacketTypeC *p)
00219 {
00220 if( ring_buffer_.size()<60 )
00221 return false;
00222
00223
00224 ring_buffer_.erase_begin(start);
00225
00226 char* pp = (char*) p;
00227
00228 readBufferFront(pp,60);
00229
00230 if( ring_buffer_.size() < p->header.packet_size )
00231 return false;
00232
00233
00234 readBufferFront(pp,p->header.packet_size);
00235
00236
00237 ring_buffer_.erase_begin(p->header.packet_size);
00238 return true;
00239 }
00240
00241
00242 void ScanDataReceiver::disconnect()
00243 {
00244 is_connected_ = false;
00245 try
00246 {
00247 if( tcp_socket_ )
00248 tcp_socket_->close();
00249 if( udp_socket_ )
00250 udp_socket_->close();
00251 io_service_.stop();
00252 if( boost::this_thread::get_id() != io_service_thread_.get_id() )
00253 io_service_thread_.join();
00254 }
00255 catch (std::exception& e)
00256 {
00257 std::cerr << "Exception: " << e.what() << std::endl;
00258 }
00259 }
00260
00261
00262 bool ScanDataReceiver::checkConnection()
00263 {
00264 if( !isConnected() )
00265 return false;
00266 if( (std::time(0)-last_data_time_) > 2 )
00267 {
00268 disconnect();
00269 return false;
00270 }
00271 return true;
00272 }
00273
00274
00275 ScanData ScanDataReceiver::getScan()
00276 {
00277 std::unique_lock<std::mutex> lock(data_mutex_);
00278 ScanData data(std::move(scan_data_.front()));
00279 scan_data_.pop_front();
00280 return data;
00281 }
00282
00283
00284 ScanData ScanDataReceiver::getFullScan()
00285 {
00286 std::unique_lock<std::mutex> lock(data_mutex_);
00287 while( checkConnection() && isConnected() && scan_data_.size()<2 )
00288 {
00289 data_notifier_.wait_for(lock, std::chrono::seconds(1));
00290 }
00291 ScanData data;
00292 if( scan_data_.size() >= 2 && isConnected() )
00293 {
00294 data = ScanData(std::move(scan_data_.front()));
00295 scan_data_.pop_front();
00296 }
00297 return data;
00298 }
00299
00300
00301 std::size_t ScanDataReceiver::getFullScansAvailable() const
00302 {
00303 if( scan_data_.size() == 0 )
00304 return 0;
00305 else
00306 return scan_data_.size()-1;
00307 }
00308
00309 void ScanDataReceiver::writeBufferBack(char *src, std::size_t numbytes)
00310 {
00311 if( ring_buffer_.size()+numbytes > ring_buffer_.capacity() )
00312 throw std::exception();
00313 ring_buffer_.resize(ring_buffer_.size()+numbytes);
00314 char* pone = ring_buffer_.array_one().first;
00315 std::size_t pone_size = ring_buffer_.array_one().second;
00316 char* ptwo = ring_buffer_.array_two().first;
00317 std::size_t ptwo_size = ring_buffer_.array_two().second;
00318
00319 if( ptwo_size >= numbytes )
00320 {
00321 std::memcpy(ptwo+ptwo_size-numbytes, src, numbytes);
00322 }
00323 else
00324 {
00325 std::memcpy(pone+pone_size+ptwo_size-numbytes,
00326 src,
00327 numbytes-ptwo_size );
00328 std::memcpy(ptwo,
00329 src+numbytes-ptwo_size,
00330 ptwo_size );
00331 }
00332 }
00333
00334
00335 void ScanDataReceiver::readBufferFront(char *dst, std::size_t numbytes)
00336 {
00337 if( ring_buffer_.size() < numbytes )
00338 throw std::exception();
00339 char* pone = ring_buffer_.array_one().first;
00340 std::size_t pone_size = ring_buffer_.array_one().second;
00341 char* ptwo = ring_buffer_.array_two().first;
00342
00343
00344 if( pone_size >= numbytes )
00345 {
00346 std::memcpy( dst, pone, numbytes );
00347 }
00348 else
00349 {
00350 std::memcpy( dst, pone, pone_size );
00351 std::memcpy( dst+pone_size, ptwo, numbytes-pone_size);
00352 }
00353 }
00354
00355
00356 }