HardwareCanSourcePeak.cpp
Go to the documentation of this file.
00001 // this is for emacs file handling -*- mode: c++; indent-tabs-mode: nil -*-
00002 
00003 // -- BEGIN LICENSE BLOCK ----------------------------------------------
00004 // This file is part of FZIs ic_workspace.
00005 //
00006 // This program is free software licensed under the LGPL
00007 // (GNU LESSER GENERAL PUBLIC LICENSE Version 3).
00008 // You can find a copy of this license in LICENSE folder in the top
00009 // directory of the source code.
00010 //
00011 // © Copyright 2016 FZI Forschungszentrum Informatik, Karlsruhe, Germany
00012 //
00013 // -- END LICENSE BLOCK ------------------------------------------------
00014 
00015 //----------------------------------------------------------------------
00029 //----------------------------------------------------------------------
00030 #include "HardwareCanSourcePeak.h"
00031 
00032 #include <deque>
00033 
00034 #include <boost/date_time/posix_time/posix_time.hpp>
00035 #include <boost/filesystem/operations.hpp>
00036 #include <boost/filesystem/fstream.hpp>
00037 
00038 #include "icl_hardware_can/Logging.h"
00039 #include "icl_hardware_can/tException.h"
00040 
00041 #include <icl_core_thread/Thread.h>
00042 #include <icl_core_thread/Mutex.h>
00043 #include <icl_core_thread/ScopedMutexLock.h>
00044 #include <icl_core_thread/Sem.h>
00045 #include <icl_core_config/Config.h>
00046 #include <icl_sourcesink/SimpleURI.h>
00047 
00048 namespace icl_hardware {
00049 namespace can {
00050 
00054 class HardwareCanSourcePeak::WorkerThread : public icl_core::thread::Thread
00055 {
00056 public:
00057   WorkerThread(tCanDevice *can_device, std::size_t max_buffer_size = 256)
00058     : icl_core::thread::Thread("CanWorkerThread", -10),
00059       m_can_device(can_device),
00060       m_recv_buffer(),
00061       m_max_buffer_size(max_buffer_size),
00062       m_buffer_mutex(),
00063       m_buffer_semaphore(0),
00064       m_use_can_mask(false),
00065       m_can_mask(),
00066       m_single_can_id(0),
00067       m_dsin(0),
00068       m_sequence_number(0)
00069   { }
00070 
00072   virtual void run();
00073 
00075   bool hasData() const;
00076 
00080   CanMessageStamped::Ptr get();
00081 
00083   boost::scoped_ptr<tCanDevice>& canDevice() { return m_can_device; }
00084 
00086   void setCanMask(const CanMatrix& can_mask)
00087   {
00088     m_use_can_mask = true;
00089     m_can_mask = can_mask;
00090   }
00091 
00093   void setSingleCanID(uint16_t can_id)
00094   {
00095     m_single_can_id = can_id;
00096   }
00097 
00098 private:
00100   boost::scoped_ptr<tCanDevice> m_can_device;
00104   std::deque<CanMessageStamped::Ptr> m_recv_buffer;
00106   std::size_t m_max_buffer_size;
00108   mutable icl_core::thread::Mutex m_buffer_mutex;
00110   icl_core::thread::Semaphore m_buffer_semaphore;
00112   bool m_use_can_mask;
00117   CanMatrix m_can_mask;
00121   uint16_t m_single_can_id;
00125   std::size_t m_dsin;
00130   std::size_t m_sequence_number;
00131 };
00132 
00133 void HardwareCanSourcePeak::WorkerThread::run()
00134 {
00135   while (execute())
00136   {
00137     if (!m_can_device->IsInitialized())
00138     {
00139       // TODO: Try to reconnect
00140       throw tException(ePEAK_DEVICE_ERROR, "Lost connection to peak device.");
00141     }
00142     CanMessageStamped::Ptr msg(new CanMessageStamped);
00143     while (m_can_device->Receive(**msg) > 0)
00144     {
00145       msg->header().timestamp = boost::posix_time::microsec_clock::local_time();
00146       msg->header().dsin = m_dsin++;
00147       // TODO: Check if message ID is in CAN map
00148       {
00149         msg->header().sequence_number = m_sequence_number++;
00150         msg->header().coordinate_system = "";
00151         icl_core::thread::ScopedMutexLock lock(m_buffer_mutex);
00152         m_recv_buffer.push_front(msg);
00153         msg.reset(new CanMessageStamped);
00154         if (m_recv_buffer.size() > m_max_buffer_size)
00155         {
00156           LOGGING_WARNING(CAN, "Buffer overflow, too many outstanding messages!" << endl);
00157           double message_rate = double(m_recv_buffer.size())
00158             / ((m_recv_buffer.front()->header().timestamp
00159                 - m_recv_buffer.back()->header().timestamp).total_microseconds()
00160                * 1e-6);
00161           LOGGING_DEBUG(CAN, "Received " << message_rate << " msg/s." << endl);
00162           while (m_recv_buffer.size() > m_max_buffer_size)
00163           {
00164             m_recv_buffer.pop_back();
00165           }
00166         }
00167       }
00168       m_buffer_semaphore.post();
00169       // Minimal usleep at this point, to decrease the number of
00170       // receivable incoming messages
00171       // TODO: Check with Hugo why this is necessary
00172       usleep(300);
00173     }
00174     usleep(5000);
00175   }
00176 }
00177 
00178 CanMessageStamped::Ptr HardwareCanSourcePeak::WorkerThread::get()
00179 {
00180   m_buffer_semaphore.wait();
00181   icl_core::thread::ScopedMutexLock lock(m_buffer_mutex);
00182   CanMessageStamped::Ptr msg = m_recv_buffer.back();
00183   m_recv_buffer.pop_back();
00184   return msg;
00185 }
00186 
00187 bool HardwareCanSourcePeak::WorkerThread::hasData() const
00188 {
00189   icl_core::thread::ScopedMutexLock lock(m_buffer_mutex);
00190   return !m_recv_buffer.empty();
00191 }
00192 
00193 //----------------------------------------------------------------------
00194 // Main class
00195 //----------------------------------------------------------------------
00196 
00197 HardwareCanSourcePeak::HardwareCanSourcePeak(const std::string& uri, const std::string& name)
00198   : HardwareCanSource(uri, name),
00199     m_buffer(),
00200     m_worker_thread()
00201 {
00202   m_is_good = false;
00203   icl_sourcesink::SimpleURI parsed_uri(uri);
00204 
00205   uint32_t can_baudrate = 500;
00206   boost::optional<uint32_t> uri_baudrate = parsed_uri.getQuery<uint32_t>("baudrate");
00207   if (uri_baudrate)
00208   {
00209     can_baudrate = *uri_baudrate;
00210   }
00211 
00212   uint32_t can_buffer_size = 256;
00213   boost::optional<uint32_t> uri_buffer_size = parsed_uri.getQuery<uint32_t>("buffer_size");
00214   if (uri_buffer_size)
00215   {
00216     can_buffer_size = *uri_buffer_size;
00217   }
00218 
00219   uint16_t can_id = 0;
00220   boost::optional<uint16_t> uri_can_id = parsed_uri.getQuery<uint16_t>("can_id");
00221   if (uri_can_id)
00222   {
00223     if (*uri_can_id > 2047)
00224     {
00225       LOGGING_ERROR(CAN, "Illegal value for 'can_id' (must be <= 2047). Ignoring." << endl);
00226     }
00227     else
00228     {
00229       can_id = *uri_can_id;
00230     }
00231   }
00232 
00233   std::string can_mask = "";
00234   boost::optional<std::string> uri_can_mask = parsed_uri.getQuery<std::string>("can_mask");
00235   if (uri_can_mask)
00236   {
00237     can_mask = *uri_can_mask;
00238   }
00239   else
00240   {
00241     can_mask = icl_core::config::getDefault<std::string>("/icl_hardware_can/can_mask", "");
00242   }
00243 
00244   LOGGING_DEBUG(CAN, "Device: " << parsed_uri.path() << endl);
00245   LOGGING_DEBUG(CAN, "Baudrate: " << can_baudrate << " kbps" << endl);
00246 
00247   LOGGING_DEBUG(CAN, "Opening CAN device... " << endl);
00248   tCanDevice::CheckLXRTInterface();
00249   m_worker_thread.reset(new WorkerThread(
00250                           tCanDevice::Create(
00251                             parsed_uri.path().c_str(),
00252                             O_RDWR | O_NONBLOCK,
00253                             0xff,
00254                             0xff,
00255                             can_baudrate,
00256                             300,
00257                             8000),
00258                           can_buffer_size));
00259 
00260   // Check if CAN device was initialized successfully.
00261   if (m_worker_thread->canDevice()->IsInitialized())
00262   {
00263     LOGGING_DEBUG(CAN, "CAN device successfully initialized." << endl);
00264     m_worker_thread->start();
00265   }
00266   else
00267   {
00268     m_worker_thread->canDevice().reset();
00269     LOGGING_ERROR(CAN, "Error initializing CAN device." << endl);
00270     return;
00271   }
00272 
00273   // Set CAN mask
00274   if (can_id > 0)
00275   {
00276     LOGGING_DEBUG(CAN, "Using single CAN ID: " << can_id << endl);
00277     m_worker_thread->setSingleCanID(can_id);
00278   }
00279   else if (can_mask != "")
00280   {
00281     tCanMatrixParser parser(can_mask);
00282     if (parser.isActive())
00283     {
00284       m_worker_thread->setCanMask(parser.getCanMatrix());
00285       LOGGING_DEBUG(CAN, "Using CAN Mask: " << can_mask << endl);
00286     }
00287     else
00288     {
00289       LOGGING_WARNING(CAN, "Could not use provided CAN mask " << can_mask << endl);
00290     }
00291   }
00292 
00293   m_sleep_time = 500000/can_baudrate;
00294 
00295   // Wait for first CAN message.
00296   m_is_good = advance();
00297 }
00298 
00299 HardwareCanSourcePeak::~HardwareCanSourcePeak()
00300 {
00301   if (m_worker_thread)
00302   {
00303     m_worker_thread->stop();
00304     m_worker_thread->join();
00305   }
00306 }
00307 
00308 bool HardwareCanSourcePeak::advance()
00309 {
00310   if (!m_worker_thread)
00311   {
00312     LOGGING_ERROR(CAN, "Worker thread nonexistent for some reason." << endl);
00313     m_is_good = false;
00314   }
00315   else if (!m_worker_thread->canDevice())
00316   {
00317     LOGGING_ERROR(CAN, "CAN device object disappeared." << endl);
00318     m_is_good = false;
00319   }
00320   else if (!m_worker_thread->canDevice()->IsInitialized())
00321   {
00322     LOGGING_ERROR(CAN, "CAN device is closed." << endl);
00323     m_is_good = false;
00324   }
00325   else if (!m_worker_thread->running())
00326   {
00327     LOGGING_ERROR(CAN, "Worker thread was terminated unexpectedly." << endl);
00328     m_is_good = false;
00329   }
00330   m_buffer = m_worker_thread->get();
00331   return bool(m_buffer);
00332 }
00333 
00334 }
00335 }


fzi_icl_can
Author(s):
autogenerated on Thu Jun 6 2019 20:26:01