serial_listener.cc
Go to the documentation of this file.
00001 /* Copyright 2012 William Woodall and John Harrison */
00002 
00003 #include "serial/utils/serial_listener.h"
00004 
00005 /***** Inline Functions *****/
00006 
00007 inline void defaultExceptionCallback(const std::exception &error) {
00008   std::cerr << "SerialListener Unhandled Exception: " << error.what();
00009   std::cerr << std::endl;
00010 }
00011 
00012 inline bool defaultComparator(const std::string &token) {
00013   return true;
00014 }
00015 
00016 using namespace serial;
00017 using namespace serial::utils;
00018 
00019 /***** Listener Class Functions *****/
00020 
00021 void
00022 SerialListener::default_handler(const std::string &token) {
00023   if (this->_default_handler != NULL)
00024     this->_default_handler(token);
00025 }
00026 
00027 SerialListener::SerialListener(size_t num_threads) : listening(false), chunk_size_(5) {
00028   // Set default callbacks
00029   this->handle_exc = defaultExceptionCallback;
00030 
00031   // Default handler stuff
00032   this->_default_handler = NULL;
00033   this->default_comparator = defaultComparator;
00034   DataCallback tmp = boost::bind(&SerialListener::default_handler, this, _1);
00035   this->default_filter = FilterPtr(new Filter(default_comparator, tmp));
00036 
00037   // Set default tokenizer
00038   this->setTokenizer(delimeter_tokenizer("\r"));
00039 
00040   // Set the number of callback threads
00041   if (num_threads == 0) {
00042      unsigned int hardware_concurrency = boost::thread::hardware_concurrency();
00043     if (hardware_concurrency > 0) {
00044       this->num_threads_ = hardware_concurrency;
00045     } else {
00046       this->num_threads_ = 1;
00047     }
00048   } else {
00049     this->num_threads_ = num_threads;
00050   }
00051 }
00052 
00053 SerialListener::~SerialListener() {
00054   if (this->listening) {
00055     this->stopListening();
00056   }
00057 }
00058 
00059 void
00060 SerialListener::callback(size_t thread_index) {
00061   try {
00062     while (this->listening) {
00063       // <filter id, token>
00064       std::pair<FilterPtr,TokenPtr> pair;
00065       this->callback_queue.wait_and_pop(pair);
00066       if (this->listening) {
00067         try {
00068           if (pair.first != NULL && pair.second != NULL) {
00069             pair.first->callback_((*pair.second));
00070           }
00071         } catch (std::exception &e) {
00072           this->handle_exc(e);
00073         }// try callback
00074       } // if listening
00075     } // while (this->listening)
00076   } catch (std::exception &e) {
00077     this->handle_exc(SerialListenerException(e.what()));
00078   }
00079 }
00080 
00081 void
00082 SerialListener::startListening(Serial &serial_port) {
00083   if (this->listening) {
00084     throw(SerialListenerException("Already listening."));
00085   }
00086 
00087   this->serial_port_ = &serial_port;
00088   if (!this->serial_port_->isOpen()) {
00089     throw(SerialListenerException("Serial port not open."));
00090   }
00091 
00092   this->listening = true;
00093 
00094   listen_thread = boost::thread(boost::bind(&SerialListener::listen, this));
00095 
00096   // Start the callback threads
00097   for(size_t i = 0; i < this->num_threads_; ++i) {
00098     callback_threads.push_back(new
00099       boost::thread(boost::bind(&SerialListener::callback, this, i)));
00100   }
00101 }
00102 
00103 void
00104 SerialListener::stopListening() {
00105   // Stop listening and clear buffers
00106   listening = false;
00107   callback_queue.cancel();
00108 
00109   listen_thread.join();
00110 
00111   callback_queue.cancel();
00112   for(size_t i = 0; i < this->num_threads_; ++i) {
00113     callback_threads[i]->join();
00114     delete callback_threads[i];
00115   }
00116   callback_threads.clear();
00117   callback_queue.clear();
00118   callback_queue.clear_cancel();
00119 
00120   this->data_buffer = "";
00121   this->serial_port_ = NULL;
00122 }
00123 
00124 size_t
00125 SerialListener::determineAmountToRead() {
00126   // TODO: Make a more intelligent method based on the length of the things
00127   //  filters are looking for.  e.g.: if the filter is looking for 'V=XX\r'
00128   //  make the read amount at least 5.
00129   return this->chunk_size_;
00130 }
00131 
00132 void
00133 SerialListener::filter(std::vector<TokenPtr> &tokens) {
00134   // Lock the filters while filtering
00135   boost::mutex::scoped_lock lock(filter_mux);
00136   // Iterate through each new token and filter them
00137   std::vector<TokenPtr>::iterator it;
00138   for (it=tokens.begin(); it!=tokens.end(); ++it) {
00139     TokenPtr token = (*it);
00140     // If it is empty then pass it
00141     if (token->empty()) {
00142       continue;
00143     }
00144     bool matched = false;
00145     // Iterate through each filter
00146     std::vector<FilterPtr>::iterator itt;
00147     for (itt=filters.begin(); itt!=filters.end(); ++itt) {
00148       FilterPtr filter = (*itt);
00149       if (filter->comparator_((*token))) {
00150         callback_queue.push(std::make_pair(filter,token));
00151         matched = true;
00152         break;
00153       }
00154     } // for (itt=filters.begin(); itt!=filters.end(); itt++)
00155     // If matched is false then send it to the default handler
00156     if (!matched) {
00157       callback_queue.push(std::make_pair(default_filter,token));
00158     }
00159   } // for (it=tokens.begin(); it!=tokens.end(); it++)
00160 }
00161 
00162 void
00163 SerialListener::listen() {
00164   try {
00165     while (this->listening) {
00166       // Read some data
00167       std::string temp;
00168       this->readSomeData(temp, determineAmountToRead());
00169       // std::cout << "SerialListener::listen read(" << temp.length() << "): " << temp << std::endl;
00170       // If nothing was read then we
00171       //  don't need to iterate through the filters
00172       if (temp.length() != 0) {
00173         // Add the new data to the buffer
00174         this->data_buffer += temp;
00175         // Call the tokenizer on the updated buffer
00176         std::vector<TokenPtr> new_tokens;
00177         this->tokenize(this->data_buffer, new_tokens);
00178         // Put the last token back in the data buffer
00179         this->data_buffer = (*new_tokens.back());
00180         new_tokens.pop_back();
00181         // Run the new tokens through existing filters
00182         this->filter(new_tokens);
00183       }
00184       // Done parsing lines and buffer should now be set to the left overs
00185     } // while (this->listening)
00186   } catch (std::exception &e) {
00187     this->handle_exc(SerialListenerException(e.what()));
00188   }
00189 }
00190 
00191 /***** Filter Functions *****/
00192 
00193 FilterPtr
00194 SerialListener::createFilter(ComparatorType comparator, DataCallback callback)
00195 {
00196   FilterPtr filter_ptr(new Filter(comparator, callback));
00197 
00198   boost::mutex::scoped_lock l(filter_mux);
00199   this->filters.push_back(filter_ptr);
00200 
00201   return filter_ptr;
00202 }
00203 
00204 BlockingFilterPtr
00205 SerialListener::createBlockingFilter(ComparatorType comparator) {
00206   return BlockingFilterPtr(
00207     new BlockingFilter(comparator, (*this)));
00208 }
00209 
00210 BufferedFilterPtr
00211 SerialListener::createBufferedFilter(ComparatorType comparator,
00212                                      size_t buffer_size)
00213 {
00214   return BufferedFilterPtr(
00215     new BufferedFilter(comparator, buffer_size, (*this)));
00216 }
00217 
00218 void
00219 SerialListener::removeFilter(FilterPtr filter_ptr) {
00220   boost::mutex::scoped_lock l(filter_mux);
00221   filters.erase(std::find(filters.begin(),filters.end(),filter_ptr));
00222 }
00223 
00224 void
00225 SerialListener::removeFilter(BlockingFilterPtr blocking_filter) {
00226   this->removeFilter(blocking_filter->filter_ptr);
00227 }
00228 
00229 void
00230 SerialListener::removeFilter(BufferedFilterPtr buffered_filter) {
00231   this->removeFilter(buffered_filter->filter_ptr);
00232 }
00233 
00234 void
00235 SerialListener::removeAllFilters() {
00236   boost::mutex::scoped_lock l(filter_mux);
00237   filters.clear();
00238   callback_queue.clear();
00239 }
00240 


serial_utils
Author(s): William Woodall , John Harrison
autogenerated on Thu Jun 6 2019 19:02:26