HardwareCanSourcePeak.cpp
Go to the documentation of this file.
1 // this is for emacs file handling -*- mode: c++; indent-tabs-mode: nil -*-
2 
3 // -- BEGIN LICENSE BLOCK ----------------------------------------------
4 // This file is part of FZIs ic_workspace.
5 //
6 // This program is free software licensed under the LGPL
7 // (GNU LESSER GENERAL PUBLIC LICENSE Version 3).
8 // You can find a copy of this license in LICENSE folder in the top
9 // directory of the source code.
10 //
11 // © Copyright 2016 FZI Forschungszentrum Informatik, Karlsruhe, Germany
12 //
13 // -- END LICENSE BLOCK ------------------------------------------------
14 
15 //----------------------------------------------------------------------
29 //----------------------------------------------------------------------
30 #include "HardwareCanSourcePeak.h"
31 
32 #include <deque>
33 
34 #include <boost/date_time/posix_time/posix_time.hpp>
35 #include <boost/filesystem/operations.hpp>
36 #include <boost/filesystem/fstream.hpp>
37 
40 
41 #include <icl_core_thread/Thread.h>
42 #include <icl_core_thread/Mutex.h>
44 #include <icl_core_thread/Sem.h>
45 #include <icl_core_config/Config.h>
46 #include <icl_sourcesink/SimpleURI.h>
47 
48 namespace icl_hardware {
49 namespace can {
50 
55 {
56 public:
57  WorkerThread(tCanDevice *can_device, std::size_t max_buffer_size = 256)
58  : icl_core::thread::Thread("CanWorkerThread", -10),
59  m_can_device(can_device),
60  m_recv_buffer(),
61  m_max_buffer_size(max_buffer_size),
64  m_use_can_mask(false),
65  m_can_mask(),
66  m_single_can_id(0),
67  m_dsin(0),
69  { }
70 
72  virtual void run();
73 
75  bool hasData() const;
76 
81 
83  boost::scoped_ptr<tCanDevice>& canDevice() { return m_can_device; }
84 
86  void setCanMask(const CanMatrix& can_mask)
87  {
88  m_use_can_mask = true;
89  m_can_mask = can_mask;
90  }
91 
93  void setSingleCanID(uint16_t can_id)
94  {
95  m_single_can_id = can_id;
96  }
97 
98 private:
100  boost::scoped_ptr<tCanDevice> m_can_device;
104  std::deque<CanMessageStamped::Ptr> m_recv_buffer;
106  std::size_t m_max_buffer_size;
125  std::size_t m_dsin;
130  std::size_t m_sequence_number;
131 };
132 
134 {
135  while (execute())
136  {
137  if (!m_can_device->IsInitialized())
138  {
139  // TODO: Try to reconnect
140  throw tException(ePEAK_DEVICE_ERROR, "Lost connection to peak device.");
141  }
143  while (m_can_device->Receive(**msg) > 0)
144  {
145  msg->header().timestamp = boost::posix_time::microsec_clock::local_time();
146  msg->header().dsin = m_dsin++;
147  // TODO: Check if message ID is in CAN map
148  {
149  msg->header().sequence_number = m_sequence_number++;
150  msg->header().coordinate_system = "";
152  m_recv_buffer.push_front(msg);
153  msg.reset(new CanMessageStamped);
154  if (m_recv_buffer.size() > m_max_buffer_size)
155  {
156  LOGGING_WARNING(CAN, "Buffer overflow, too many outstanding messages!" << endl);
157  double message_rate = double(m_recv_buffer.size())
158  / ((m_recv_buffer.front()->header().timestamp
159  - m_recv_buffer.back()->header().timestamp).total_microseconds()
160  * 1e-6);
161  LOGGING_DEBUG(CAN, "Received " << message_rate << " msg/s." << endl);
162  while (m_recv_buffer.size() > m_max_buffer_size)
163  {
164  m_recv_buffer.pop_back();
165  }
166  }
167  }
169  // Minimal usleep at this point, to decrease the number of
170  // receivable incoming messages
171  // TODO: Check with Hugo why this is necessary
172  usleep(300);
173  }
174  usleep(5000);
175  }
176 }
177 
179 {
183  m_recv_buffer.pop_back();
184  return msg;
185 }
186 
188 {
190  return !m_recv_buffer.empty();
191 }
192 
193 //----------------------------------------------------------------------
194 // Main class
195 //----------------------------------------------------------------------
196 
197 HardwareCanSourcePeak::HardwareCanSourcePeak(const std::string& uri, const std::string& name)
198  : HardwareCanSource(uri, name),
199  m_buffer(),
201 {
202  m_is_good = false;
203  icl_sourcesink::SimpleURI parsed_uri(uri);
204 
205  uint32_t can_baudrate = 500;
206  boost::optional<uint32_t> uri_baudrate = parsed_uri.getQuery<uint32_t>("baudrate");
207  if (uri_baudrate)
208  {
209  can_baudrate = *uri_baudrate;
210  }
211 
212  uint32_t can_buffer_size = 256;
213  boost::optional<uint32_t> uri_buffer_size = parsed_uri.getQuery<uint32_t>("buffer_size");
214  if (uri_buffer_size)
215  {
216  can_buffer_size = *uri_buffer_size;
217  }
218 
219  uint16_t can_id = 0;
220  boost::optional<uint16_t> uri_can_id = parsed_uri.getQuery<uint16_t>("can_id");
221  if (uri_can_id)
222  {
223  if (*uri_can_id > 2047)
224  {
225  LOGGING_ERROR(CAN, "Illegal value for 'can_id' (must be <= 2047). Ignoring." << endl);
226  }
227  else
228  {
229  can_id = *uri_can_id;
230  }
231  }
232 
233  std::string can_mask = "";
234  boost::optional<std::string> uri_can_mask = parsed_uri.getQuery<std::string>("can_mask");
235  if (uri_can_mask)
236  {
237  can_mask = *uri_can_mask;
238  }
239  else
240  {
241  can_mask = icl_core::config::getDefault<std::string>("/icl_hardware_can/can_mask", "");
242  }
243 
244  LOGGING_DEBUG(CAN, "Device: " << parsed_uri.path() << endl);
245  LOGGING_DEBUG(CAN, "Baudrate: " << can_baudrate << " kbps" << endl);
246 
247  LOGGING_DEBUG(CAN, "Opening CAN device... " << endl);
249  m_worker_thread.reset(new WorkerThread(
251  parsed_uri.path().c_str(),
252  O_RDWR | O_NONBLOCK,
253  0xff,
254  0xff,
255  can_baudrate,
256  300,
257  8000),
258  can_buffer_size));
259 
260  // Check if CAN device was initialized successfully.
261  if (m_worker_thread->canDevice()->IsInitialized())
262  {
263  LOGGING_DEBUG(CAN, "CAN device successfully initialized." << endl);
264  m_worker_thread->start();
265  }
266  else
267  {
268  m_worker_thread->canDevice().reset();
269  LOGGING_ERROR(CAN, "Error initializing CAN device." << endl);
270  return;
271  }
272 
273  // Set CAN mask
274  if (can_id > 0)
275  {
276  LOGGING_DEBUG(CAN, "Using single CAN ID: " << can_id << endl);
277  m_worker_thread->setSingleCanID(can_id);
278  }
279  else if (can_mask != "")
280  {
281  tCanMatrixParser parser(can_mask);
282  if (parser.isActive())
283  {
284  m_worker_thread->setCanMask(parser.getCanMatrix());
285  LOGGING_DEBUG(CAN, "Using CAN Mask: " << can_mask << endl);
286  }
287  else
288  {
289  LOGGING_WARNING(CAN, "Could not use provided CAN mask " << can_mask << endl);
290  }
291  }
292 
293  m_sleep_time = 500000/can_baudrate;
294 
295  // Wait for first CAN message.
296  m_is_good = advance();
297 }
298 
300 {
301  if (m_worker_thread)
302  {
303  m_worker_thread->stop();
304  m_worker_thread->join();
305  }
306 }
307 
309 {
310  if (!m_worker_thread)
311  {
312  LOGGING_ERROR(CAN, "Worker thread nonexistent for some reason." << endl);
313  m_is_good = false;
314  }
315  else if (!m_worker_thread->canDevice())
316  {
317  LOGGING_ERROR(CAN, "CAN device object disappeared." << endl);
318  m_is_good = false;
319  }
320  else if (!m_worker_thread->canDevice()->IsInitialized())
321  {
322  LOGGING_ERROR(CAN, "CAN device is closed." << endl);
323  m_is_good = false;
324  }
325  else if (!m_worker_thread->running())
326  {
327  LOGGING_ERROR(CAN, "Worker thread was terminated unexpectedly." << endl);
328  m_is_good = false;
329  }
330  m_buffer = m_worker_thread->get();
331  return bool(m_buffer);
332 }
333 
334 }
335 }
void setSingleCanID(uint16_t can_id)
Sets a single CAN ID used to filter out relevant CAN messages.
unsigned int uint32_t
bool hasData() const
Returns true if the buffer is nonempty.
icl_sourcesink::DataSource< tCanMessage > HardwareCanSource
Base type for all sources providing tCanMessage data.
Contains icl_hardware::tException.
#define LOGGING_DEBUG(streamname, arg)
HardwareCanSourcePeak(const std::string &uri="", const std::string &name="HardwareCanSourcePeak")
Constructor.
static tCanDevice * Create(const char *device_name, int flags, unsigned char acceptance_code, unsigned char acceptance_mask, unsigned int baud_rate, unsigned send_fifo_size, unsigned receive_fifo_size)
Definition: tCanDevice.cpp:96
bool advance()
Advance to the next data element.
boost::shared_ptr< Stamped< DataType > > Ptr
CanMessageStamped::Ptr m_buffer
Buffers the latest data element.
#define LOGGING_ERROR(streamname, arg)
static void CheckLXRTInterface()
Definition: tCanDevice.cpp:72
Thread(const icl_core::String &description, icl_core::ThreadPriority priority=0)
ThreadStream & endl(ThreadStream &stream)
icl_core::thread::Semaphore m_buffer_semaphore
Handles producer/consumer notification.
boost::scoped_ptr< tCanDevice > m_can_device
The CAN device.
boost::scoped_ptr< tCanDevice > & canDevice()
Access the CAN device.
boost::scoped_ptr< WorkerThread > m_worker_thread
virtual void run()
Worker thread main method which receives the CAN messages.
#define LOGGING_WARNING(streamname, arg)
icl_core::thread::Mutex m_buffer_mutex
Synchronizes receive buffer access.
void setCanMask(const CanMatrix &can_mask)
Sets the CAN mask used to filter out relevant CAN messages.
WorkerThread(tCanDevice *can_device, std::size_t max_buffer_size=256)
unsigned short uint16_t
bool m_use_can_mask
If true, m_can_mask is used to filter the CAN stream.
const CanMatrix & getCanMatrix() const
std::map< unsigned int, std::vector< CanMatrixElement > > CanMatrix
int usleep(unsigned long useconds)


fzi_icl_can
Author(s):
autogenerated on Mon Jun 10 2019 13:17:02