serial_listener.h
Go to the documentation of this file.
00001 
00036 #ifndef SERIAL_LISTENER_H
00037 #define SERIAL_LISTENER_H
00038 
00039 #ifndef SERIAL_LISTENER_DEBUG
00040 #define SERIAL_LISTENER_DEBUG 0
00041 #endif
00042 
00043 // STL
00044 #include <vector>
00045 #include <stdint.h>
00046 #include <iostream>
00047 
00048 // Serial
00049 #include <serial/serial.h>
00050 
00051 // Concurrent Queue
00052 #include "serial/utils/concurrent_queue.h"
00053 
00054 // Boost
00055 #include <boost/function.hpp>
00056 #include <boost/algorithm/string.hpp>
00057 #include <boost/date_time/posix_time/posix_time.hpp>
00058 #include <boost/thread.hpp>
00059 
00060 #if SERIAL_LISTENER_DEBUG
00061 # warning SerialListener in debug mode
00062 #endif
00063 
00064 namespace serial {
00065   namespace utils {
00066 
00079 typedef boost::shared_ptr<const std::string> TokenPtr;
00080 
00092 typedef boost::function<void(const std::string&)> DataCallback;
00093 
00104 typedef boost::function<bool(const std::string&)> ComparatorType;
00105 
00126 typedef boost::function<void(const std::string&, std::vector<TokenPtr>&)>
00127 TokenizerType;
00128 
00138 typedef boost::function<void(const std::exception&)> ExceptionCallback;
00139 
00157 class Filter
00158 {
00159 public:
00160   Filter (ComparatorType comparator, DataCallback callback)
00161   : comparator_(comparator), callback_(callback) {}
00162   virtual ~Filter () {}
00163 
00164   ComparatorType comparator_;
00165   DataCallback callback_;
00166 
00167 private:
00168   // Disable copy constructors
00169   Filter(const Filter&);
00170   void operator=(const Filter&);
00171   const Filter& operator=(Filter);
00172 };
00173 
00184 typedef boost::shared_ptr<Filter> FilterPtr;
00185 
00186 class BlockingFilter; // Forward declaration
00187 
00194 typedef boost::shared_ptr<BlockingFilter> BlockingFilterPtr;
00195 
00196 class BufferedFilter; // Forward declaration
00197 
00204 typedef boost::shared_ptr<BufferedFilter> BufferedFilterPtr;
00205 
00213 class SerialListenerException : public std::exception {
00214   const std::string e_what_;
00215 public:
00216   SerialListenerException(const std::string &e_what) : e_what_(e_what) {}
00217   ~SerialListenerException() throw() {}
00218 
00219   virtual const char* what() const throw() {
00220     std::stringstream ss;
00221     ss << "SerialListenerException: " << this->e_what_;
00222     return ss.str().c_str();
00223   }
00224 };
00225 
00229 class SerialListener
00230 {
00231 public:
00235   SerialListener (size_t num_threads = 0);
00236 
00240   virtual ~SerialListener ();
00241 
00242 /***** Configurations ******/
00243 
00256   void
00257   setTokenizer (TokenizerType tokenizer) {
00258     this->tokenize = tokenizer;
00259   }
00260 
00266   void
00267   setChunkSize (size_t chunk_size) {
00268     this->chunk_size_ = chunk_size;
00269   }
00270 
00271 /***** Start and Stop Listening ******/
00272 
00279   void
00280   startListening (serial::Serial &serial_port);
00281 
00288   void
00289   stopListening ();
00290 
00294   bool
00295   isListening () {
00296     return this->listening;
00297   }
00298 
00299 /***** Filter Functions ******/
00300 
00320   FilterPtr
00321   createFilter (ComparatorType comparator, DataCallback callback);
00322 
00343   BlockingFilterPtr
00344   createBlockingFilter (ComparatorType comparator);
00345 
00369   BufferedFilterPtr
00370   createBufferedFilter (ComparatorType comparator, size_t buffer_size = 1024);
00371 
00379   void
00380   removeFilter (FilterPtr filter_ptr);
00381 
00391   void
00392   removeFilter (BlockingFilterPtr blocking_filter);
00393 
00403   void
00404   removeFilter (BufferedFilterPtr buffered_filter);
00405 
00409   void
00410   removeAllFilters ();
00411 
00412 /***** Hooks and Handlers ******/
00413 
00427   void setDefaultHandler(DataCallback default_handler) {
00428     this->_default_handler = default_handler;
00429   }
00430 
00442   void
00443   setExceptionHandler (ExceptionCallback exception_handler) {
00444     this->handle_exc = exception_handler;
00445   }
00446 
00447 /***** Static Functions ******/
00448 
00454   static void
00455   sleep (long milliseconds) {
00456     boost::int64_t ms(milliseconds);
00457     boost::this_thread::sleep(boost::posix_time::milliseconds(ms));
00458   }
00459 
00479   static TokenizerType
00480   delimeter_tokenizer (std::string delimeter) {
00481     return boost::bind(&SerialListener::_delimeter_tokenizer,
00482                        _1, _2, delimeter);
00483   }
00484 
00505   static ComparatorType
00506   exactly (std::string exact_str) {
00507     return boost::bind(&SerialListener::_exactly, _1, exact_str);
00508   }
00509 
00529   static ComparatorType
00530   startsWith (std::string prefix) {
00531     return boost::bind(&SerialListener::_startsWith, _1, prefix);
00532   }
00533 
00553   static ComparatorType
00554   endsWith (std::string postfix) {
00555     return boost::bind(&SerialListener::_endsWith, _1, postfix);
00556   }
00557 
00578   static ComparatorType
00579   contains (std::string substr) {
00580     return boost::bind(_contains, _1, substr);
00581   }
00582 
00583 private:
00584   // Disable copy constructors
00585   SerialListener(const SerialListener&);
00586   void operator=(const SerialListener&);
00587   const SerialListener& operator=(SerialListener);
00588   // delimeter tokenizer function
00589   static void
00590   _delimeter_tokenizer (const std::string &data,
00591                         std::vector<TokenPtr> &tokens,
00592                         std::string delimeter)
00593   {
00594     typedef std::vector<std::string> find_vector_type;
00595     find_vector_type t;
00596     boost::split(t, data, boost::is_any_of(delimeter));
00597     for (find_vector_type::iterator it = t.begin(); it != t.end(); ++it)
00598       tokens.push_back(TokenPtr( new std::string(*it) ));
00599   }
00600   // exact comparator function
00601   static bool
00602   _exactly (const std::string& token, std::string exact_str) {
00603 #if SERIAL_LISTENER_DEBUG
00604     std::cerr << "In exactly callback(" << token.length() << "): `";
00605     std::cerr << token << "` == `" << exact_str << "`?: ";
00606     if (token == exact_str)
00607       std::cerr << "True";
00608     else
00609       std::cerr << "False";
00610     std::cerr << std::endl;
00611 #endif
00612     return token == exact_str;
00613   }
00614   // startswith comparator function
00615   static bool
00616   _startsWith (const std::string& token, std::string prefix) {
00617 #if SERIAL_LISTENER_DEBUG
00618     std::cerr << "In startsWith callback(" << token.length() << "): `";
00619     std::cerr << token << "` starts with `" << prefix;
00620     std::cerr << "`?: ";
00621     if (token.substr(0,prefix.length()) == prefix)
00622       std::cerr << "True";
00623     else
00624       std::cerr << "False";
00625     std::cerr << std::endl;
00626 #endif
00627     return token.substr(0,prefix.length()) == prefix;
00628   }
00629   // endswith comparator function
00630   static bool
00631   _endsWith (const std::string& token, std::string postfix) {
00632 #if SERIAL_LISTENER_DEBUG
00633     std::cerr << "In endsWith callback(";
00634     std::cerr << token.length();
00635     std::cerr << "): `" << token;
00636     std::cerr << "` ends with `" << postfix << "`?: ";
00637     if (token.substr(token.length()-postfix.length()) == postfix)
00638       std::cerr << "True";
00639     else
00640       std::cerr << "False";
00641     std::cerr << std::endl;
00642 #endif
00643     return token.substr(token.length()-postfix.length()) == postfix;
00644   }
00645   // contains comparator function
00646   static bool
00647   _contains (const std::string& token, std::string substr) {
00648 #if SERIAL_LISTENER_DEBUG
00649     std::cerr << "In contains callback(";
00650     std::cerr << token.length();
00651     std::cerr << "): `" << token;
00652     std::cerr << "` contains `" << substr << "`?: ";
00653     if (token.find(substr) != std::string::npos)
00654       std::cerr << "True";
00655     else
00656       std::cerr << "False";
00657     std::cerr << std::endl;
00658 #endif
00659     return token.find(substr) != std::string::npos;
00660   }
00661 
00662   // Gets some data from the serial port
00663   void readSomeData (std::string &temp, size_t this_many) {
00664     // Make sure there is a serial port
00665     if (this->serial_port_ == NULL && this->listening) {
00666       this->handle_exc(SerialListenerException("Invalid serial port."));
00667     }
00668     // Make sure the serial port is open
00669     if (!this->serial_port_->isOpen() && this->listening) {
00670       this->handle_exc(SerialListenerException("Serial port not open."));
00671     }
00672     if (!this->listening) {
00673       temp = "";
00674       return;
00675     }
00676     temp = this->serial_port_->read(this_many);
00677   }
00678   // Runs the new_tokens through all the filters
00679   void filter (std::vector<TokenPtr> &tokens);
00680   // Function that loops while listening is true
00681   void listen ();
00682   // Target of callback thread
00683   void callback (size_t);
00684   // Determines how much to read on each loop of listen
00685   size_t determineAmountToRead ();
00686 
00687   // Tokenizer
00688   TokenizerType tokenize;
00689 
00690   // Exception handler
00691   ExceptionCallback handle_exc;
00692 
00693   // Default handler
00694   FilterPtr default_filter;
00695   DataCallback _default_handler;
00696   ComparatorType default_comparator;
00697   void default_handler(const std::string &token);
00698 
00699   // Persistent listening variables
00700   bool listening;
00701   char serial_port_padding[7];
00702   serial::Serial * serial_port_;
00703   boost::thread listen_thread;
00704   std::string data_buffer;
00705   size_t chunk_size_;
00706 
00707   // Callback related variables
00708   // filter id, token
00709   // filter id == 0 is going to be default handled
00710   ConcurrentQueue<std::pair<FilterPtr,TokenPtr> >
00711   callback_queue;
00712   size_t num_threads_;
00713   std::vector<boost::thread*> callback_threads;
00714 
00715   // Mutex for locking use of filters
00716   boost::mutex filter_mux;
00717   // vector of filter ids
00718   std::vector<FilterPtr> filters;
00719 
00720 };
00721 
00733 class BlockingFilter
00734 {
00735 public:
00736   BlockingFilter (ComparatorType comparator, SerialListener &listener) {
00737     this->listener_ = &listener;
00738     DataCallback cb = boost::bind(&BlockingFilter::callback, this, _1);
00739     this->filter_ptr = this->listener_->createFilter(comparator, cb);
00740   }
00741 
00742   virtual ~BlockingFilter () {
00743     this->listener_->removeFilter(filter_ptr);
00744     this->result = "";
00745     this->cond.notify_all();
00746   }
00747 
00756  std::string wait(long ms) {
00757     this->result = "";
00758     boost::unique_lock<boost::mutex> lock(this->mutex);
00759     this->cond.timed_wait(lock, boost::posix_time::milliseconds(ms));
00760     return this->result;
00761   }
00762 
00763   FilterPtr filter_ptr;
00764 
00765   void callback(const std::string& token) {
00766 #if SERIAL_LISTENER_DEBUG
00767     std::cerr << "In BlockingFilter callback(" << token.length() << "): ";
00768     std::cerr << token << std::endl;
00769 #endif
00770     this->cond.notify_all();
00771     this->result = token;
00772   }
00773 
00774 private:
00775   SerialListener * listener_;
00776   boost::condition_variable cond;
00777   boost::mutex mutex;
00778   std::string result;
00779 
00780 };
00781 
00798 class BufferedFilter
00799 {
00800 public:
00801   BufferedFilter (ComparatorType comparator, size_t buffer_size,
00802                   SerialListener &listener)
00803   : buffer_size_(buffer_size)
00804   {
00805     this->listener_ = &listener;
00806     DataCallback cb = boost::bind(&BufferedFilter::callback, this, _1);
00807     this->filter_ptr = this->listener_->createFilter(comparator, cb);
00808   }
00809 
00810   virtual ~BufferedFilter () {
00811     this->listener_->removeFilter(filter_ptr);
00812     this->queue.clear();
00813     this->result = "";
00814   }
00815 
00827   std::string wait(long ms) {
00828     if (ms == 0) {
00829       if (!this->queue.try_pop(this->result)) {
00830         this->result = "";
00831       }
00832     } else {
00833       if (!this->queue.timed_wait_and_pop(this->result, ms)) {
00834         this->result = "";
00835       }
00836     }
00837     return result;
00838   }
00839 
00843   void clear() {
00844     queue.clear();
00845   }
00846 
00850   size_t count() {
00851     return queue.size();
00852   }
00853 
00857   size_t capacity() {
00858     return buffer_size_;
00859   }
00860 
00861   FilterPtr filter_ptr;
00862 
00863   void callback(const std::string &token) {
00864 #if SERIAL_LISTENER_DEBUG
00865     std::cerr << "In BufferedFilter callback(" << token.length() << "): ";
00866     std::cerr << token << std::endl;
00867 #endif
00868     std::string throw_away;
00869     if (this->queue.size() == this->buffer_size_) {
00870       this->queue.wait_and_pop(throw_away);
00871     }
00872     this->queue.push(token);
00873   }
00874 
00875 private:
00876   size_t buffer_size_;
00877   SerialListener * listener_;
00878   ConcurrentQueue<std::string> queue;
00879   std::string result;
00880 
00881 };
00882 
00883   } // namespace utils
00884 } // namespace serial
00885 
00886 #endif // SERIAL_LISTENER_H


serial_utils
Author(s): William Woodall , John Harrison
autogenerated on Thu Jun 6 2019 19:02:26