scan_data_receiver.cpp
Go to the documentation of this file.
00001 // Copyright (c) 2014, Pepperl+Fuchs GmbH, Mannheim
00002 // Copyright (c) 2014, Denis Dillenberger
00003 // All rights reserved.
00004 //
00005 // Use, modification, and distribution is subject to the
00006 // 3-clause BSD license ("Revised BSD License",
00007 // "New BSD License", or "Modified BSD License")
00008 // You should have received a copy of this license
00009 // in a file named COPYING or LICENSE.
00010 
00011 #include "scan_data_receiver.h"
00012 #include <chrono>
00013 #include <ctime>
00014 
00015 namespace pepperl_fuchs {
00016 
00017 ScanDataReceiver::ScanDataReceiver(const std::string hostname, const int tcp_port):inbuf_(4096),instream_(&inbuf_),ring_buffer_(65536),scan_data_()
00018 {
00019     last_data_time_ = std::time(0);
00020     tcp_socket_ = 0;
00021     udp_socket_ = 0;
00022     udp_port_ = -1;
00023     is_connected_ = false;
00024 
00025     std::cout << "Connecting to TCP data channel at " << hostname << ":" << tcp_port << " ... ";
00026     try
00027     {
00028         // Resolve hostname/ip
00029         boost::asio::ip::tcp::resolver resolver(io_service_);
00030         boost::asio::ip::tcp::resolver::query query(hostname, std::to_string(tcp_port));
00031         boost::asio::ip::tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
00032         boost::asio::ip::tcp::resolver::iterator end;
00033 
00034         tcp_socket_ = new boost::asio::ip::tcp::socket(io_service_);
00035         boost::system::error_code error = boost::asio::error::host_not_found;
00036 
00037         // Iterate over endpoints and etablish connection
00038         while (error && endpoint_iterator != end)
00039         {
00040             tcp_socket_->close();
00041             tcp_socket_->connect(*endpoint_iterator++, error);
00042         }
00043         if (error)
00044             throw boost::system::system_error(error);
00045 
00046         // Start async reading
00047         boost::asio::async_read(*tcp_socket_, inbuf_, boost::bind(&ScanDataReceiver::handleSocketRead, this, boost::asio::placeholders::error));
00048         io_service_thread_ = boost::thread(boost::bind(&boost::asio::io_service::run, &io_service_));
00049         is_connected_ = true;
00050     }
00051     catch (std::exception& e)
00052     {
00053         std::cerr << "Exception: " <<  e.what() << std::endl;
00054     }
00055 }
00056 
00057 //-----------------------------------------------------------------------------
00058 ScanDataReceiver::ScanDataReceiver():inbuf_(4096),instream_(&inbuf_),ring_buffer_(65536),scan_data_()
00059 {
00060     tcp_socket_ = 0;
00061     udp_socket_ = 0;
00062     udp_port_ = -1;
00063     is_connected_ = false;
00064 
00065 
00066     try
00067     {
00068         udp_socket_ = new boost::asio::ip::udp::socket(io_service_, boost::asio::ip::udp::v4());
00069         udp_socket_->bind(boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), 0));
00070         udp_port_ = udp_socket_->local_endpoint().port();
00071         // Start async reading
00072         udp_socket_->async_receive_from(boost::asio::buffer(&udp_buffer_[0],udp_buffer_.size()), udp_endpoint_,
00073                                         boost::bind(&ScanDataReceiver::handleSocketRead, this,
00074                                                     boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
00075         io_service_thread_ = boost::thread(boost::bind(&boost::asio::io_service::run, &io_service_));
00076         is_connected_ = true;
00077     }
00078     catch (std::exception& e)
00079     {
00080         std::cerr << "Exception: " <<  e.what() << std::endl;
00081     }
00082     std::cout << "Receiving scanner data at local UDP port " << udp_port_ << " ... ";
00083 
00084 }
00085 
00086 //-----------------------------------------------------------------------------
00087 ScanDataReceiver::~ScanDataReceiver()
00088 {
00089     disconnect();
00090     delete udp_socket_;
00091     delete tcp_socket_;
00092 }
00093 
00094 //-----------------------------------------------------------------------------
00095 void ScanDataReceiver::handleSocketRead(const boost::system::error_code& error)
00096 {
00097     if (!error )
00098     {
00099         // Read all received data and write it to the internal ring buffer
00100         instream_.clear();
00101         while(!instream_.eof())
00102         {
00103             char buf[4096];
00104             instream_.read(buf,4096);
00105             int bytes_read = instream_.gcount();
00106             writeBufferBack(buf,bytes_read);
00107         }
00108 
00109         // Handle (read and parse) packets stored in the internal ring buffer
00110         while( handleNextPacket() ) {}
00111 
00112         // Read data asynchronously
00113         boost::asio::async_read(*tcp_socket_, inbuf_, boost::bind(&ScanDataReceiver::handleSocketRead, this, boost::asio::placeholders::error));
00114     }
00115     else
00116     {
00117         if( error.value() != 995 )
00118             std::cerr << "ERROR: " << "data connection error: " << error.message() << "(" << error.value() << ")" << std::endl;
00119         disconnect();
00120     }
00121     last_data_time_ = std::time(0);
00122 }
00123 
00124 //-----------------------------------------------------------------------------
00125 void ScanDataReceiver::handleSocketRead(const boost::system::error_code &error, std::size_t bytes_transferred)
00126 {
00127     if (!error )
00128     {
00129         // Read all received data and write it to the internal ring buffer
00130         writeBufferBack(&udp_buffer_[0],bytes_transferred);
00131 
00132         // Handle (read and parse) packets stored in the internal ring buffer
00133         while( handleNextPacket() ) {}
00134 
00135         // Read data asynchronously
00136         udp_socket_->async_receive_from(boost::asio::buffer(&udp_buffer_[0],udp_buffer_.size()), udp_endpoint_,
00137                                         boost::bind(&ScanDataReceiver::handleSocketRead, this,
00138                                                     boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
00139     }
00140     else
00141     {
00142         if( error.value() != 995 )
00143             std::cerr << "ERROR: " << "data connection error: " << error.message() << "(" << error.value() << ")" << std::endl;
00144         disconnect();
00145     }
00146     last_data_time_ = std::time(0);
00147 }
00148 
00149 //-----------------------------------------------------------------------------
00150 bool ScanDataReceiver::handleNextPacket()
00151 {
00152     // Search for a packet
00153     int packet_start = findPacketStart();
00154     if( packet_start<0 )
00155         return false;
00156 
00157     // Try to retrieve packet
00158     char buf[65536];
00159     PacketTypeC* p = (PacketTypeC*) buf;
00160     if( !retrievePacket(packet_start,p) )
00161         return false;
00162 
00163     // Lock internal outgoing data queue, automatically unlocks at end of function
00164     std::unique_lock<std::mutex> lock(data_mutex_);
00165 
00166     // Create new scan container if necessary
00167     if( p->header.packet_number == 1 || scan_data_.empty() )
00168     {
00169         scan_data_.emplace_back();
00170         if( scan_data_.size()>100 )
00171         {
00172             scan_data_.pop_front();
00173             std::cerr << "Too many scans in receiver queue: Dropping scans!" << std::endl;
00174         }
00175         data_notifier_.notify_one();
00176     }
00177     ScanData& scandata = scan_data_.back();
00178 
00179     // Parse payload of packet
00180     std::uint32_t* p_scan_data = (std::uint32_t*) &buf[p->header.header_size];
00181     int num_scan_points = p->header.num_points_packet;
00182 
00183     for( int i=0; i<num_scan_points; i++ )
00184     {
00185         unsigned int data = p_scan_data[i];
00186         unsigned int distance = (data & 0x000FFFFF);
00187         unsigned int amplitude = (data & 0xFFFFF000) >> 20;
00188 
00189         scandata.distance_data.push_back(distance);
00190         scandata.amplitude_data.push_back(amplitude);
00191     }
00192 
00193     // Save header
00194     scandata.headers.push_back(p->header);
00195 
00196     return true;
00197 }
00198 
00199 //-----------------------------------------------------------------------------
00200 int ScanDataReceiver::findPacketStart()
00201 {
00202     if( ring_buffer_.size()<60 )
00203         return -1;
00204     for( std::size_t i=0; i<ring_buffer_.size()-4; i++)
00205     {
00206         if(   ((unsigned char) ring_buffer_[i])   == 0x5c
00207            && ((unsigned char) ring_buffer_[i+1]) == 0xa2
00208            && ((unsigned char) ring_buffer_[i+2]) == 0x43
00209            && ((unsigned char) ring_buffer_[i+3]) == 0x00 )
00210         {
00211             return i;
00212         }
00213     }
00214     return -2;
00215 }
00216 
00217 //-----------------------------------------------------------------------------
00218 bool ScanDataReceiver::retrievePacket(std::size_t start, PacketTypeC *p)
00219 {
00220     if( ring_buffer_.size()<60 )
00221         return false;
00222 
00223     // Erase preceding bytes
00224     ring_buffer_.erase_begin(start);
00225 
00226     char* pp = (char*) p;
00227     // Read header
00228     readBufferFront(pp,60);
00229 
00230     if( ring_buffer_.size() < p->header.packet_size )
00231         return false;
00232 
00233     // Read header+payload data
00234     readBufferFront(pp,p->header.packet_size);
00235 
00236     // Erase packet from ring buffer
00237     ring_buffer_.erase_begin(p->header.packet_size);
00238     return true;
00239 }
00240 
00241 //-----------------------------------------------------------------------------
00242 void ScanDataReceiver::disconnect()
00243 {
00244     is_connected_ = false;
00245     try
00246     {
00247         if( tcp_socket_ )
00248             tcp_socket_->close();
00249         if( udp_socket_ )
00250             udp_socket_->close();
00251         io_service_.stop();
00252         if( boost::this_thread::get_id() != io_service_thread_.get_id() )
00253             io_service_thread_.join();
00254     }
00255     catch (std::exception& e)
00256     {
00257         std::cerr << "Exception: " <<  e.what() << std::endl;
00258     }
00259 }
00260 
00261 //-----------------------------------------------------------------------------
00262 bool ScanDataReceiver::checkConnection()
00263 {
00264     if( !isConnected() )
00265         return false;
00266     if( (std::time(0)-last_data_time_) > 2 )
00267     {
00268         disconnect();
00269         return false;
00270     }
00271     return true;
00272 }
00273 
00274 //-----------------------------------------------------------------------------
00275 ScanData ScanDataReceiver::getScan()
00276 {
00277     std::unique_lock<std::mutex> lock(data_mutex_);
00278     ScanData data(std::move(scan_data_.front()));
00279     scan_data_.pop_front();
00280     return data;
00281 }
00282 
00283 //-----------------------------------------------------------------------------
00284 ScanData ScanDataReceiver::getFullScan()
00285 {
00286     std::unique_lock<std::mutex> lock(data_mutex_);
00287     while( checkConnection() && isConnected() && scan_data_.size()<2 )
00288     {
00289         data_notifier_.wait_for(lock, std::chrono::seconds(1));
00290     }
00291     ScanData data;
00292     if( scan_data_.size() >= 2 && isConnected() )
00293     {
00294         data = ScanData(std::move(scan_data_.front()));
00295         scan_data_.pop_front();
00296     }
00297     return data;
00298 }
00299 
00300 //-----------------------------------------------------------------------------
00301 std::size_t ScanDataReceiver::getFullScansAvailable() const
00302 {
00303     if( scan_data_.size() == 0 )
00304         return 0;
00305     else
00306         return scan_data_.size()-1;
00307 }
00308 //-----------------------------------------------------------------------------
00309 void ScanDataReceiver::writeBufferBack(char *src, std::size_t numbytes)
00310 {
00311     if( ring_buffer_.size()+numbytes > ring_buffer_.capacity() )
00312         throw std::exception();
00313     ring_buffer_.resize(ring_buffer_.size()+numbytes);
00314     char* pone = ring_buffer_.array_one().first;
00315     std::size_t pone_size = ring_buffer_.array_one().second;
00316     char* ptwo = ring_buffer_.array_two().first;
00317     std::size_t ptwo_size = ring_buffer_.array_two().second;
00318 
00319     if( ptwo_size >= numbytes )
00320     {
00321         std::memcpy(ptwo+ptwo_size-numbytes, src, numbytes);
00322     }
00323     else
00324     {
00325         std::memcpy(pone+pone_size+ptwo_size-numbytes,
00326                     src,
00327                     numbytes-ptwo_size );
00328         std::memcpy(ptwo,
00329                     src+numbytes-ptwo_size,
00330                     ptwo_size );
00331     }
00332 }
00333 
00334 //-----------------------------------------------------------------------------
00335 void ScanDataReceiver::readBufferFront(char *dst, std::size_t numbytes)
00336 {
00337     if( ring_buffer_.size() < numbytes )
00338         throw std::exception();
00339     char* pone = ring_buffer_.array_one().first;
00340     std::size_t pone_size = ring_buffer_.array_one().second;
00341     char* ptwo = ring_buffer_.array_two().first;
00342     //std::size_t ptwo_size = ring_buffer_.array_two().second;
00343 
00344     if( pone_size >= numbytes )
00345     {
00346         std::memcpy( dst, pone, numbytes );
00347     }
00348     else
00349     {
00350         std::memcpy( dst, pone, pone_size );
00351         std::memcpy( dst+pone_size, ptwo, numbytes-pone_size);
00352     }
00353 }
00354 
00355 //-----------------------------------------------------------------------------
00356 }


pepperl_fuchs_r2000
Author(s): Denis Dillenberger
autogenerated on Wed Aug 26 2015 15:22:22