Go to the documentation of this file.00001
00002
00003 #include "serial/utils/serial_listener.h"
00004
00005
00006
00007 inline void defaultExceptionCallback(const std::exception &error) {
00008 std::cerr << "SerialListener Unhandled Exception: " << error.what();
00009 std::cerr << std::endl;
00010 }
00011
00012 inline bool defaultComparator(const std::string &token) {
00013 return true;
00014 }
00015
00016 using namespace serial;
00017 using namespace serial::utils;
00018
00019
00020
00021 void
00022 SerialListener::default_handler(const std::string &token) {
00023 if (this->_default_handler != NULL)
00024 this->_default_handler(token);
00025 }
00026
00027 SerialListener::SerialListener(size_t num_threads) : listening(false), chunk_size_(5) {
00028
00029 this->handle_exc = defaultExceptionCallback;
00030
00031
00032 this->_default_handler = NULL;
00033 this->default_comparator = defaultComparator;
00034 DataCallback tmp = boost::bind(&SerialListener::default_handler, this, _1);
00035 this->default_filter = FilterPtr(new Filter(default_comparator, tmp));
00036
00037
00038 this->setTokenizer(delimeter_tokenizer("\r"));
00039
00040
00041 if (num_threads == 0) {
00042 unsigned int hardware_concurrency = boost::thread::hardware_concurrency();
00043 if (hardware_concurrency > 0) {
00044 this->num_threads_ = hardware_concurrency;
00045 } else {
00046 this->num_threads_ = 1;
00047 }
00048 } else {
00049 this->num_threads_ = num_threads;
00050 }
00051 }
00052
00053 SerialListener::~SerialListener() {
00054 if (this->listening) {
00055 this->stopListening();
00056 }
00057 }
00058
00059 void
00060 SerialListener::callback(size_t thread_index) {
00061 try {
00062 while (this->listening) {
00063
00064 std::pair<FilterPtr,TokenPtr> pair;
00065 this->callback_queue.wait_and_pop(pair);
00066 if (this->listening) {
00067 try {
00068 if (pair.first != NULL && pair.second != NULL) {
00069 pair.first->callback_((*pair.second));
00070 }
00071 } catch (std::exception &e) {
00072 this->handle_exc(e);
00073 }
00074 }
00075 }
00076 } catch (std::exception &e) {
00077 this->handle_exc(SerialListenerException(e.what()));
00078 }
00079 }
00080
00081 void
00082 SerialListener::startListening(Serial &serial_port) {
00083 if (this->listening) {
00084 throw(SerialListenerException("Already listening."));
00085 }
00086
00087 this->serial_port_ = &serial_port;
00088 if (!this->serial_port_->isOpen()) {
00089 throw(SerialListenerException("Serial port not open."));
00090 }
00091
00092 this->listening = true;
00093
00094 listen_thread = boost::thread(boost::bind(&SerialListener::listen, this));
00095
00096
00097 for(size_t i = 0; i < this->num_threads_; ++i) {
00098 callback_threads.push_back(new
00099 boost::thread(boost::bind(&SerialListener::callback, this, i)));
00100 }
00101 }
00102
00103 void
00104 SerialListener::stopListening() {
00105
00106 listening = false;
00107 callback_queue.cancel();
00108
00109 listen_thread.join();
00110
00111 callback_queue.cancel();
00112 for(size_t i = 0; i < this->num_threads_; ++i) {
00113 callback_threads[i]->join();
00114 delete callback_threads[i];
00115 }
00116 callback_threads.clear();
00117 callback_queue.clear();
00118 callback_queue.clear_cancel();
00119
00120 this->data_buffer = "";
00121 this->serial_port_ = NULL;
00122 }
00123
00124 size_t
00125 SerialListener::determineAmountToRead() {
00126
00127
00128
00129 return this->chunk_size_;
00130 }
00131
00132 void
00133 SerialListener::filter(std::vector<TokenPtr> &tokens) {
00134
00135 boost::mutex::scoped_lock lock(filter_mux);
00136
00137 std::vector<TokenPtr>::iterator it;
00138 for (it=tokens.begin(); it!=tokens.end(); ++it) {
00139 TokenPtr token = (*it);
00140
00141 if (token->empty()) {
00142 continue;
00143 }
00144 bool matched = false;
00145
00146 std::vector<FilterPtr>::iterator itt;
00147 for (itt=filters.begin(); itt!=filters.end(); ++itt) {
00148 FilterPtr filter = (*itt);
00149 if (filter->comparator_((*token))) {
00150 callback_queue.push(std::make_pair(filter,token));
00151 matched = true;
00152 break;
00153 }
00154 }
00155
00156 if (!matched) {
00157 callback_queue.push(std::make_pair(default_filter,token));
00158 }
00159 }
00160 }
00161
00162 void
00163 SerialListener::listen() {
00164 try {
00165 while (this->listening) {
00166
00167 std::string temp;
00168 this->readSomeData(temp, determineAmountToRead());
00169
00170
00171
00172 if (temp.length() != 0) {
00173
00174 this->data_buffer += temp;
00175
00176 std::vector<TokenPtr> new_tokens;
00177 this->tokenize(this->data_buffer, new_tokens);
00178
00179 this->data_buffer = (*new_tokens.back());
00180 new_tokens.pop_back();
00181
00182 this->filter(new_tokens);
00183 }
00184
00185 }
00186 } catch (std::exception &e) {
00187 this->handle_exc(SerialListenerException(e.what()));
00188 }
00189 }
00190
00191
00192
00193 FilterPtr
00194 SerialListener::createFilter(ComparatorType comparator, DataCallback callback)
00195 {
00196 FilterPtr filter_ptr(new Filter(comparator, callback));
00197
00198 boost::mutex::scoped_lock l(filter_mux);
00199 this->filters.push_back(filter_ptr);
00200
00201 return filter_ptr;
00202 }
00203
00204 BlockingFilterPtr
00205 SerialListener::createBlockingFilter(ComparatorType comparator) {
00206 return BlockingFilterPtr(
00207 new BlockingFilter(comparator, (*this)));
00208 }
00209
00210 BufferedFilterPtr
00211 SerialListener::createBufferedFilter(ComparatorType comparator,
00212 size_t buffer_size)
00213 {
00214 return BufferedFilterPtr(
00215 new BufferedFilter(comparator, buffer_size, (*this)));
00216 }
00217
00218 void
00219 SerialListener::removeFilter(FilterPtr filter_ptr) {
00220 boost::mutex::scoped_lock l(filter_mux);
00221 filters.erase(std::find(filters.begin(),filters.end(),filter_ptr));
00222 }
00223
00224 void
00225 SerialListener::removeFilter(BlockingFilterPtr blocking_filter) {
00226 this->removeFilter(blocking_filter->filter_ptr);
00227 }
00228
00229 void
00230 SerialListener::removeFilter(BufferedFilterPtr buffered_filter) {
00231 this->removeFilter(buffered_filter->filter_ptr);
00232 }
00233
00234 void
00235 SerialListener::removeAllFilters() {
00236 boost::mutex::scoped_lock l(filter_mux);
00237 filters.clear();
00238 callback_queue.clear();
00239 }
00240