Go to the documentation of this file.00001 #ifndef H_CAN_BUFFERED_READER
00002 #define H_CAN_BUFFERED_READER
00003
00004 #include <socketcan_interface/interface.h>
00005 #include <deque>
00006
00007 #include <boost/thread/mutex.hpp>
00008 #include <boost/thread/condition_variable.hpp>
00009 #include <boost/chrono.hpp>
00010
00011 namespace can{
00012
00013 class BufferedReader {
00014 std::deque<can::Frame> buffer_;
00015 boost::mutex mutex_;
00016 boost::condition_variable cond_;
00017 CommInterface::FrameListener::Ptr listener_;
00018 bool enabled_;
00019 size_t max_len_;
00020
00021 void trim(){
00022 if(max_len_ > 0){
00023 while(buffer_.size() > max_len_){
00024 LOG("buffer overflow, discarded oldest message " );
00025 buffer_.pop_front();
00026 }
00027 }
00028 }
00029 void handleFrame(const can::Frame & msg){
00030 boost::mutex::scoped_lock lock(mutex_);
00031 if(enabled_){
00032 buffer_.push_back(msg);
00033 trim();
00034 cond_.notify_one();
00035 }else{
00036 LOG("discarded message " );
00037 }
00038 }
00039 public:
00040 class ScopedEnabler{
00041 BufferedReader &reader_;
00042 bool before_;
00043 public:
00044 ScopedEnabler(BufferedReader &reader) : reader_(reader), before_(reader_.setEnabled(true)) {}
00045 ~ScopedEnabler() { reader_.setEnabled(before_); }
00046 };
00047
00048 BufferedReader() : enabled_(true), max_len_(0) {}
00049 BufferedReader(bool enable, size_t max_len = 0) : enabled_(enable), max_len_(max_len) {}
00050
00051 void flush(){
00052 boost::mutex::scoped_lock lock(mutex_);
00053 buffer_.clear();
00054 }
00055 void setMaxLen(size_t max_len){
00056 boost::mutex::scoped_lock lock(mutex_);
00057 max_len_ = max_len;
00058 trim();
00059 }
00060 bool isEnabled(){
00061 boost::mutex::scoped_lock lock(mutex_);
00062 return enabled_;
00063 }
00064 bool setEnabled(bool enabled){
00065 boost::mutex::scoped_lock lock(mutex_);
00066 bool before = enabled_;
00067 enabled_ = enabled;
00068 return before;
00069 }
00070 void enable(){
00071 boost::mutex::scoped_lock lock(mutex_);
00072 enabled_ = true;
00073 }
00074
00075 void disable(){
00076 boost::mutex::scoped_lock lock(mutex_);
00077 enabled_ = false;
00078 }
00079
00080 void listen(boost::shared_ptr<CommInterface> interface){
00081 boost::mutex::scoped_lock lock(mutex_);
00082 listener_ = interface->createMsgListener(CommInterface::FrameDelegate(this, &BufferedReader::handleFrame));
00083 buffer_.clear();
00084 }
00085 void listen(boost::shared_ptr<CommInterface> interface, const Frame::Header& h){
00086 boost::mutex::scoped_lock lock(mutex_);
00087 listener_ = interface->createMsgListener(h, CommInterface::FrameDelegate(this, &BufferedReader::handleFrame));
00088 buffer_.clear();
00089 }
00090
00091 template<typename DurationType> bool read(can::Frame * msg, const DurationType &duration){
00092 return readUntil(msg, boost::chrono::high_resolution_clock::now() + duration);
00093 }
00094 bool readUntil(can::Frame * msg, boost::chrono::high_resolution_clock::time_point abs_time){
00095 boost::mutex::scoped_lock lock(mutex_);
00096
00097 while(buffer_.empty() && cond_.wait_until(lock,abs_time) != boost::cv_status::timeout)
00098 {}
00099
00100 if(buffer_.empty()){
00101 return false;
00102 }
00103
00104 if(msg){
00105 *msg = buffer_.front();
00106 buffer_.pop_front();
00107 }
00108 return true;
00109 }
00110
00111 };
00112
00113 }
00114 #endif