Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00029
00030 #include "HardwareCanSourcePeak.h"
00031
00032 #include <deque>
00033
00034 #include <boost/date_time/posix_time/posix_time.hpp>
00035 #include <boost/filesystem/operations.hpp>
00036 #include <boost/filesystem/fstream.hpp>
00037
00038 #include "icl_hardware_can/Logging.h"
00039 #include "icl_hardware_can/tException.h"
00040
00041 #include <icl_core_thread/Thread.h>
00042 #include <icl_core_thread/Mutex.h>
00043 #include <icl_core_thread/ScopedMutexLock.h>
00044 #include <icl_core_thread/Sem.h>
00045 #include <icl_core_config/Config.h>
00046 #include <icl_sourcesink/SimpleURI.h>
00047
00048 namespace icl_hardware {
00049 namespace can {
00050
00054 class HardwareCanSourcePeak::WorkerThread : public icl_core::thread::Thread
00055 {
00056 public:
00057 WorkerThread(tCanDevice *can_device, std::size_t max_buffer_size = 256)
00058 : icl_core::thread::Thread("CanWorkerThread", -10),
00059 m_can_device(can_device),
00060 m_recv_buffer(),
00061 m_max_buffer_size(max_buffer_size),
00062 m_buffer_mutex(),
00063 m_buffer_semaphore(0),
00064 m_use_can_mask(false),
00065 m_can_mask(),
00066 m_single_can_id(0),
00067 m_dsin(0),
00068 m_sequence_number(0)
00069 { }
00070
00072 virtual void run();
00073
00075 bool hasData() const;
00076
00080 CanMessageStamped::Ptr get();
00081
00083 boost::scoped_ptr<tCanDevice>& canDevice() { return m_can_device; }
00084
00086 void setCanMask(const CanMatrix& can_mask)
00087 {
00088 m_use_can_mask = true;
00089 m_can_mask = can_mask;
00090 }
00091
00093 void setSingleCanID(uint16_t can_id)
00094 {
00095 m_single_can_id = can_id;
00096 }
00097
00098 private:
00100 boost::scoped_ptr<tCanDevice> m_can_device;
00104 std::deque<CanMessageStamped::Ptr> m_recv_buffer;
00106 std::size_t m_max_buffer_size;
00108 mutable icl_core::thread::Mutex m_buffer_mutex;
00110 icl_core::thread::Semaphore m_buffer_semaphore;
00112 bool m_use_can_mask;
00117 CanMatrix m_can_mask;
00121 uint16_t m_single_can_id;
00125 std::size_t m_dsin;
00130 std::size_t m_sequence_number;
00131 };
00132
00133 void HardwareCanSourcePeak::WorkerThread::run()
00134 {
00135 while (execute())
00136 {
00137 if (!m_can_device->IsInitialized())
00138 {
00139
00140 throw tException(ePEAK_DEVICE_ERROR, "Lost connection to peak device.");
00141 }
00142 CanMessageStamped::Ptr msg(new CanMessageStamped);
00143 while (m_can_device->Receive(**msg) > 0)
00144 {
00145 msg->header().timestamp = boost::posix_time::microsec_clock::local_time();
00146 msg->header().dsin = m_dsin++;
00147
00148 {
00149 msg->header().sequence_number = m_sequence_number++;
00150 msg->header().coordinate_system = "";
00151 icl_core::thread::ScopedMutexLock lock(m_buffer_mutex);
00152 m_recv_buffer.push_front(msg);
00153 msg.reset(new CanMessageStamped);
00154 if (m_recv_buffer.size() > m_max_buffer_size)
00155 {
00156 LOGGING_WARNING(CAN, "Buffer overflow, too many outstanding messages!" << endl);
00157 double message_rate = double(m_recv_buffer.size())
00158 / ((m_recv_buffer.front()->header().timestamp
00159 - m_recv_buffer.back()->header().timestamp).total_microseconds()
00160 * 1e-6);
00161 LOGGING_DEBUG(CAN, "Received " << message_rate << " msg/s." << endl);
00162 while (m_recv_buffer.size() > m_max_buffer_size)
00163 {
00164 m_recv_buffer.pop_back();
00165 }
00166 }
00167 }
00168 m_buffer_semaphore.post();
00169
00170
00171
00172 usleep(300);
00173 }
00174 usleep(5000);
00175 }
00176 }
00177
00178 CanMessageStamped::Ptr HardwareCanSourcePeak::WorkerThread::get()
00179 {
00180 m_buffer_semaphore.wait();
00181 icl_core::thread::ScopedMutexLock lock(m_buffer_mutex);
00182 CanMessageStamped::Ptr msg = m_recv_buffer.back();
00183 m_recv_buffer.pop_back();
00184 return msg;
00185 }
00186
00187 bool HardwareCanSourcePeak::WorkerThread::hasData() const
00188 {
00189 icl_core::thread::ScopedMutexLock lock(m_buffer_mutex);
00190 return !m_recv_buffer.empty();
00191 }
00192
00193
00194
00195
00196
00197 HardwareCanSourcePeak::HardwareCanSourcePeak(const std::string& uri, const std::string& name)
00198 : HardwareCanSource(uri, name),
00199 m_buffer(),
00200 m_worker_thread()
00201 {
00202 m_is_good = false;
00203 icl_sourcesink::SimpleURI parsed_uri(uri);
00204
00205 uint32_t can_baudrate = 500;
00206 boost::optional<uint32_t> uri_baudrate = parsed_uri.getQuery<uint32_t>("baudrate");
00207 if (uri_baudrate)
00208 {
00209 can_baudrate = *uri_baudrate;
00210 }
00211
00212 uint32_t can_buffer_size = 256;
00213 boost::optional<uint32_t> uri_buffer_size = parsed_uri.getQuery<uint32_t>("buffer_size");
00214 if (uri_buffer_size)
00215 {
00216 can_buffer_size = *uri_buffer_size;
00217 }
00218
00219 uint16_t can_id = 0;
00220 boost::optional<uint16_t> uri_can_id = parsed_uri.getQuery<uint16_t>("can_id");
00221 if (uri_can_id)
00222 {
00223 if (*uri_can_id > 2047)
00224 {
00225 LOGGING_ERROR(CAN, "Illegal value for 'can_id' (must be <= 2047). Ignoring." << endl);
00226 }
00227 else
00228 {
00229 can_id = *uri_can_id;
00230 }
00231 }
00232
00233 std::string can_mask = "";
00234 boost::optional<std::string> uri_can_mask = parsed_uri.getQuery<std::string>("can_mask");
00235 if (uri_can_mask)
00236 {
00237 can_mask = *uri_can_mask;
00238 }
00239 else
00240 {
00241 can_mask = icl_core::config::getDefault<std::string>("/icl_hardware_can/can_mask", "");
00242 }
00243
00244 LOGGING_DEBUG(CAN, "Device: " << parsed_uri.path() << endl);
00245 LOGGING_DEBUG(CAN, "Baudrate: " << can_baudrate << " kbps" << endl);
00246
00247 LOGGING_DEBUG(CAN, "Opening CAN device... " << endl);
00248 tCanDevice::CheckLXRTInterface();
00249 m_worker_thread.reset(new WorkerThread(
00250 tCanDevice::Create(
00251 parsed_uri.path().c_str(),
00252 O_RDWR | O_NONBLOCK,
00253 0xff,
00254 0xff,
00255 can_baudrate,
00256 300,
00257 8000),
00258 can_buffer_size));
00259
00260
00261 if (m_worker_thread->canDevice()->IsInitialized())
00262 {
00263 LOGGING_DEBUG(CAN, "CAN device successfully initialized." << endl);
00264 m_worker_thread->start();
00265 }
00266 else
00267 {
00268 m_worker_thread->canDevice().reset();
00269 LOGGING_ERROR(CAN, "Error initializing CAN device." << endl);
00270 return;
00271 }
00272
00273
00274 if (can_id > 0)
00275 {
00276 LOGGING_DEBUG(CAN, "Using single CAN ID: " << can_id << endl);
00277 m_worker_thread->setSingleCanID(can_id);
00278 }
00279 else if (can_mask != "")
00280 {
00281 tCanMatrixParser parser(can_mask);
00282 if (parser.isActive())
00283 {
00284 m_worker_thread->setCanMask(parser.getCanMatrix());
00285 LOGGING_DEBUG(CAN, "Using CAN Mask: " << can_mask << endl);
00286 }
00287 else
00288 {
00289 LOGGING_WARNING(CAN, "Could not use provided CAN mask " << can_mask << endl);
00290 }
00291 }
00292
00293 m_sleep_time = 500000/can_baudrate;
00294
00295
00296 m_is_good = advance();
00297 }
00298
00299 HardwareCanSourcePeak::~HardwareCanSourcePeak()
00300 {
00301 if (m_worker_thread)
00302 {
00303 m_worker_thread->stop();
00304 m_worker_thread->join();
00305 }
00306 }
00307
00308 bool HardwareCanSourcePeak::advance()
00309 {
00310 if (!m_worker_thread)
00311 {
00312 LOGGING_ERROR(CAN, "Worker thread nonexistent for some reason." << endl);
00313 m_is_good = false;
00314 }
00315 else if (!m_worker_thread->canDevice())
00316 {
00317 LOGGING_ERROR(CAN, "CAN device object disappeared." << endl);
00318 m_is_good = false;
00319 }
00320 else if (!m_worker_thread->canDevice()->IsInitialized())
00321 {
00322 LOGGING_ERROR(CAN, "CAN device is closed." << endl);
00323 m_is_good = false;
00324 }
00325 else if (!m_worker_thread->running())
00326 {
00327 LOGGING_ERROR(CAN, "Worker thread was terminated unexpectedly." << endl);
00328 m_is_good = false;
00329 }
00330 m_buffer = m_worker_thread->get();
00331 return bool(m_buffer);
00332 }
00333
00334 }
00335 }