Go to the documentation of this file.00001
00036 #ifndef SERIAL_LISTENER_H
00037 #define SERIAL_LISTENER_H
00038
00039 #ifndef SERIAL_LISTENER_DEBUG
00040 #define SERIAL_LISTENER_DEBUG 0
00041 #endif
00042
00043
00044 #include <vector>
00045 #include <stdint.h>
00046 #include <iostream>
00047
00048
00049 #include <serial/serial.h>
00050
00051
00052 #include "serial/utils/concurrent_queue.h"
00053
00054
00055 #include <boost/function.hpp>
00056 #include <boost/algorithm/string.hpp>
00057 #include <boost/date_time/posix_time/posix_time.hpp>
00058 #include <boost/thread.hpp>
00059
00060 #if SERIAL_LISTENER_DEBUG
00061 # warning SerialListener in debug mode
00062 #endif
00063
00064 namespace serial {
00065 namespace utils {
00066
00079 typedef boost::shared_ptr<const std::string> TokenPtr;
00080
00092 typedef boost::function<void(const std::string&)> DataCallback;
00093
00104 typedef boost::function<bool(const std::string&)> ComparatorType;
00105
00126 typedef boost::function<void(const std::string&, std::vector<TokenPtr>&)>
00127 TokenizerType;
00128
00138 typedef boost::function<void(const std::exception&)> ExceptionCallback;
00139
00157 class Filter
00158 {
00159 public:
00160 Filter (ComparatorType comparator, DataCallback callback)
00161 : comparator_(comparator), callback_(callback) {}
00162 virtual ~Filter () {}
00163
00164 ComparatorType comparator_;
00165 DataCallback callback_;
00166
00167 private:
00168
00169 Filter(const Filter&);
00170 void operator=(const Filter&);
00171 const Filter& operator=(Filter);
00172 };
00173
00184 typedef boost::shared_ptr<Filter> FilterPtr;
00185
00186 class BlockingFilter;
00187
00194 typedef boost::shared_ptr<BlockingFilter> BlockingFilterPtr;
00195
00196 class BufferedFilter;
00197
00204 typedef boost::shared_ptr<BufferedFilter> BufferedFilterPtr;
00205
00213 class SerialListenerException : public std::exception {
00214 const std::string e_what_;
00215 public:
00216 SerialListenerException(const std::string &e_what) : e_what_(e_what) {}
00217 ~SerialListenerException() throw() {}
00218
00219 virtual const char* what() const throw() {
00220 std::stringstream ss;
00221 ss << "SerialListenerException: " << this->e_what_;
00222 return ss.str().c_str();
00223 }
00224 };
00225
00229 class SerialListener
00230 {
00231 public:
00235 SerialListener (size_t num_threads = 0);
00236
00240 virtual ~SerialListener ();
00241
00242
00243
00256 void
00257 setTokenizer (TokenizerType tokenizer) {
00258 this->tokenize = tokenizer;
00259 }
00260
00266 void
00267 setChunkSize (size_t chunk_size) {
00268 this->chunk_size_ = chunk_size;
00269 }
00270
00271
00272
00279 void
00280 startListening (serial::Serial &serial_port);
00281
00288 void
00289 stopListening ();
00290
00294 bool
00295 isListening () {
00296 return this->listening;
00297 }
00298
00299
00300
00320 FilterPtr
00321 createFilter (ComparatorType comparator, DataCallback callback);
00322
00343 BlockingFilterPtr
00344 createBlockingFilter (ComparatorType comparator);
00345
00369 BufferedFilterPtr
00370 createBufferedFilter (ComparatorType comparator, size_t buffer_size = 1024);
00371
00379 void
00380 removeFilter (FilterPtr filter_ptr);
00381
00391 void
00392 removeFilter (BlockingFilterPtr blocking_filter);
00393
00403 void
00404 removeFilter (BufferedFilterPtr buffered_filter);
00405
00409 void
00410 removeAllFilters ();
00411
00412
00413
00427 void setDefaultHandler(DataCallback default_handler) {
00428 this->_default_handler = default_handler;
00429 }
00430
00442 void
00443 setExceptionHandler (ExceptionCallback exception_handler) {
00444 this->handle_exc = exception_handler;
00445 }
00446
00447
00448
00454 static void
00455 sleep (long milliseconds) {
00456 boost::int64_t ms(milliseconds);
00457 boost::this_thread::sleep(boost::posix_time::milliseconds(ms));
00458 }
00459
00479 static TokenizerType
00480 delimeter_tokenizer (std::string delimeter) {
00481 return boost::bind(&SerialListener::_delimeter_tokenizer,
00482 _1, _2, delimeter);
00483 }
00484
00505 static ComparatorType
00506 exactly (std::string exact_str) {
00507 return boost::bind(&SerialListener::_exactly, _1, exact_str);
00508 }
00509
00529 static ComparatorType
00530 startsWith (std::string prefix) {
00531 return boost::bind(&SerialListener::_startsWith, _1, prefix);
00532 }
00533
00553 static ComparatorType
00554 endsWith (std::string postfix) {
00555 return boost::bind(&SerialListener::_endsWith, _1, postfix);
00556 }
00557
00578 static ComparatorType
00579 contains (std::string substr) {
00580 return boost::bind(_contains, _1, substr);
00581 }
00582
00583 private:
00584
00585 SerialListener(const SerialListener&);
00586 void operator=(const SerialListener&);
00587 const SerialListener& operator=(SerialListener);
00588
00589 static void
00590 _delimeter_tokenizer (const std::string &data,
00591 std::vector<TokenPtr> &tokens,
00592 std::string delimeter)
00593 {
00594 typedef std::vector<std::string> find_vector_type;
00595 find_vector_type t;
00596 boost::split(t, data, boost::is_any_of(delimeter));
00597 for (find_vector_type::iterator it = t.begin(); it != t.end(); ++it)
00598 tokens.push_back(TokenPtr( new std::string(*it) ));
00599 }
00600
00601 static bool
00602 _exactly (const std::string& token, std::string exact_str) {
00603 #if SERIAL_LISTENER_DEBUG
00604 std::cerr << "In exactly callback(" << token.length() << "): `";
00605 std::cerr << token << "` == `" << exact_str << "`?: ";
00606 if (token == exact_str)
00607 std::cerr << "True";
00608 else
00609 std::cerr << "False";
00610 std::cerr << std::endl;
00611 #endif
00612 return token == exact_str;
00613 }
00614
00615 static bool
00616 _startsWith (const std::string& token, std::string prefix) {
00617 #if SERIAL_LISTENER_DEBUG
00618 std::cerr << "In startsWith callback(" << token.length() << "): `";
00619 std::cerr << token << "` starts with `" << prefix;
00620 std::cerr << "`?: ";
00621 if (token.substr(0,prefix.length()) == prefix)
00622 std::cerr << "True";
00623 else
00624 std::cerr << "False";
00625 std::cerr << std::endl;
00626 #endif
00627 return token.substr(0,prefix.length()) == prefix;
00628 }
00629
00630 static bool
00631 _endsWith (const std::string& token, std::string postfix) {
00632 #if SERIAL_LISTENER_DEBUG
00633 std::cerr << "In endsWith callback(";
00634 std::cerr << token.length();
00635 std::cerr << "): `" << token;
00636 std::cerr << "` ends with `" << postfix << "`?: ";
00637 if (token.substr(token.length()-postfix.length()) == postfix)
00638 std::cerr << "True";
00639 else
00640 std::cerr << "False";
00641 std::cerr << std::endl;
00642 #endif
00643 return token.substr(token.length()-postfix.length()) == postfix;
00644 }
00645
00646 static bool
00647 _contains (const std::string& token, std::string substr) {
00648 #if SERIAL_LISTENER_DEBUG
00649 std::cerr << "In contains callback(";
00650 std::cerr << token.length();
00651 std::cerr << "): `" << token;
00652 std::cerr << "` contains `" << substr << "`?: ";
00653 if (token.find(substr) != std::string::npos)
00654 std::cerr << "True";
00655 else
00656 std::cerr << "False";
00657 std::cerr << std::endl;
00658 #endif
00659 return token.find(substr) != std::string::npos;
00660 }
00661
00662
00663 void readSomeData (std::string &temp, size_t this_many) {
00664
00665 if (this->serial_port_ == NULL && this->listening) {
00666 this->handle_exc(SerialListenerException("Invalid serial port."));
00667 }
00668
00669 if (!this->serial_port_->isOpen() && this->listening) {
00670 this->handle_exc(SerialListenerException("Serial port not open."));
00671 }
00672 if (!this->listening) {
00673 temp = "";
00674 return;
00675 }
00676 temp = this->serial_port_->read(this_many);
00677 }
00678
00679 void filter (std::vector<TokenPtr> &tokens);
00680
00681 void listen ();
00682
00683 void callback (size_t);
00684
00685 size_t determineAmountToRead ();
00686
00687
00688 TokenizerType tokenize;
00689
00690
00691 ExceptionCallback handle_exc;
00692
00693
00694 FilterPtr default_filter;
00695 DataCallback _default_handler;
00696 ComparatorType default_comparator;
00697 void default_handler(const std::string &token);
00698
00699
00700 bool listening;
00701 char serial_port_padding[7];
00702 serial::Serial * serial_port_;
00703 boost::thread listen_thread;
00704 std::string data_buffer;
00705 size_t chunk_size_;
00706
00707
00708
00709
00710 ConcurrentQueue<std::pair<FilterPtr,TokenPtr> >
00711 callback_queue;
00712 size_t num_threads_;
00713 std::vector<boost::thread*> callback_threads;
00714
00715
00716 boost::mutex filter_mux;
00717
00718 std::vector<FilterPtr> filters;
00719
00720 };
00721
00733 class BlockingFilter
00734 {
00735 public:
00736 BlockingFilter (ComparatorType comparator, SerialListener &listener) {
00737 this->listener_ = &listener;
00738 DataCallback cb = boost::bind(&BlockingFilter::callback, this, _1);
00739 this->filter_ptr = this->listener_->createFilter(comparator, cb);
00740 }
00741
00742 virtual ~BlockingFilter () {
00743 this->listener_->removeFilter(filter_ptr);
00744 this->result = "";
00745 this->cond.notify_all();
00746 }
00747
00756 std::string wait(long ms) {
00757 this->result = "";
00758 boost::unique_lock<boost::mutex> lock(this->mutex);
00759 this->cond.timed_wait(lock, boost::posix_time::milliseconds(ms));
00760 return this->result;
00761 }
00762
00763 FilterPtr filter_ptr;
00764
00765 void callback(const std::string& token) {
00766 #if SERIAL_LISTENER_DEBUG
00767 std::cerr << "In BlockingFilter callback(" << token.length() << "): ";
00768 std::cerr << token << std::endl;
00769 #endif
00770 this->cond.notify_all();
00771 this->result = token;
00772 }
00773
00774 private:
00775 SerialListener * listener_;
00776 boost::condition_variable cond;
00777 boost::mutex mutex;
00778 std::string result;
00779
00780 };
00781
00798 class BufferedFilter
00799 {
00800 public:
00801 BufferedFilter (ComparatorType comparator, size_t buffer_size,
00802 SerialListener &listener)
00803 : buffer_size_(buffer_size)
00804 {
00805 this->listener_ = &listener;
00806 DataCallback cb = boost::bind(&BufferedFilter::callback, this, _1);
00807 this->filter_ptr = this->listener_->createFilter(comparator, cb);
00808 }
00809
00810 virtual ~BufferedFilter () {
00811 this->listener_->removeFilter(filter_ptr);
00812 this->queue.clear();
00813 this->result = "";
00814 }
00815
00827 std::string wait(long ms) {
00828 if (ms == 0) {
00829 if (!this->queue.try_pop(this->result)) {
00830 this->result = "";
00831 }
00832 } else {
00833 if (!this->queue.timed_wait_and_pop(this->result, ms)) {
00834 this->result = "";
00835 }
00836 }
00837 return result;
00838 }
00839
00843 void clear() {
00844 queue.clear();
00845 }
00846
00850 size_t count() {
00851 return queue.size();
00852 }
00853
00857 size_t capacity() {
00858 return buffer_size_;
00859 }
00860
00861 FilterPtr filter_ptr;
00862
00863 void callback(const std::string &token) {
00864 #if SERIAL_LISTENER_DEBUG
00865 std::cerr << "In BufferedFilter callback(" << token.length() << "): ";
00866 std::cerr << token << std::endl;
00867 #endif
00868 std::string throw_away;
00869 if (this->queue.size() == this->buffer_size_) {
00870 this->queue.wait_and_pop(throw_away);
00871 }
00872 this->queue.push(token);
00873 }
00874
00875 private:
00876 size_t buffer_size_;
00877 SerialListener * listener_;
00878 ConcurrentQueue<std::string> queue;
00879 std::string result;
00880
00881 };
00882
00883 }
00884 }
00885
00886 #endif // SERIAL_LISTENER_H