async_manager.hpp
Go to the documentation of this file.
1 // *****************************************************************************
2 //
3 // © Copyright 2020, Septentrio NV/SA.
4 // All rights reserved.
5 //
6 // Redistribution and use in source and binary forms, with or without
7 // modification, are permitted provided that the following conditions are met:
8 // 1. Redistributions of source code must retain the above copyright
9 // notice, this list of conditions and the following disclaimer.
10 // 2. Redistributions in binary form must reproduce the above copyright
11 // notice, this list of conditions and the following disclaimer in the
12 // documentation and/or other materials provided with the distribution.
13 // 3. Neither the name of the copyright holder nor the names of its
14 // contributors may be used to endorse or promote products derived
15 // from this software without specific prior written permission.
16 //
17 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 // AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 // ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21 // LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 // CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 // SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 // INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 // CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 // POSSIBILITY OF SUCH DAMAGE.
28 //
29 // *****************************************************************************
30 
31 // *****************************************************************************
32 //
33 // Boost Software License - Version 1.0 - August 17th, 2003
34 //
35 // Permission is hereby granted, free of charge, to any person or organization
36 // obtaining a copy of the software and accompanying documentation covered by
37 // this license (the "Software") to use, reproduce, display, distribute,
38 // execute, and transmit the Software, and to prepare derivative works of the
39 // Software, and to permit third-parties to whom the Software is furnished to
40 // do so, all subject to the following:
41 
42 // The copyright notices in the Software and this entire statement, including
43 // the above license grant, this restriction and the following disclaimer,
44 // must be included in all copies of the Software, in whole or in part, and
45 // all derivative works of the Software, unless such copies or derivative
46 // works are solely in the form of machine-executable object code generated by
47 // a source language processor.
48 //
49 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
50 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
51 // FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
52 // SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
53 // FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
54 // ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
55 // DEALINGS IN THE SOFTWARE.
56 //
57 // *****************************************************************************
58 
59 // Boost includes
60 #include <boost/algorithm/string/join.hpp>
61 #include <boost/asio.hpp>
62 #include <boost/bind.hpp>
63 #include <boost/date_time/posix_time/posix_time.hpp>
64 #include <boost/function.hpp>
65 #include <boost/system/error_code.hpp>
66 #include <boost/thread.hpp>
67 #include <boost/thread/condition.hpp>
68 
69 // ROSaic includes
71 
72 #ifndef ASYNC_MANAGER_HPP
73 #define ASYNC_MANAGER_HPP
74 
84 namespace io_comm_rx {
85 
91  class Manager
92  {
93  public:
94  typedef boost::function<void(Timestamp, const uint8_t*, std::size_t&)> Callback;
95  virtual ~Manager() {}
97  virtual void setCallback(const Callback& callback) = 0;
99  virtual bool send(std::string cmd, std::size_t size) = 0;
102  virtual void wait(uint16_t* count) = 0;
104  virtual bool isOpen() const = 0;
105  };
106 
114  template <typename StreamT>
115  class AsyncManager : public Manager
116  {
117  public:
129  std::size_t buffer_size = 16384);
130  virtual ~AsyncManager();
131 
137  void setCallback(const Callback& callback) { read_callback_ = callback; }
138 
139  void wait(uint16_t* count);
140 
146  bool send(std::string cmd, std::size_t size);
147 
148  bool isOpen() const { return stream_->is_open(); }
149 
150  private:
153 
154  protected:
157  void read();
158 
160  void asyncReadSomeHandler(const boost::system::error_code& error,
161  std::size_t bytes_transferred);
162 
164  void write(std::string cmd, std::size_t size);
165 
167  void close();
168 
171  void tryParsing();
172 
174  boost::mutex parse_mutex_;
175 
178 
182 
184  boost::condition_variable parsing_condition_;
185 
188 
191 
193  std::vector<uint8_t> in_;
194 
198 
201 
204 
207 
210 
212  bool stopping_;
213 
215  const std::size_t buffer_size_;
216 
219  boost::asio::deadline_timer timer_;
220 
223  const uint16_t count_max_;
224 
226  void callAsyncWait(uint16_t* count);
227 
230  uint16_t do_read_count_;
231 
234  };
235 
236  template <typename StreamT>
238  {
239  uint8_t* to_be_parsed = new uint8_t[buffer_size_ * 16];
240  uint8_t* to_be_parsed_index = to_be_parsed;
241  bool timed_out = false;
242  std::size_t shift_bytes = 0;
243  std::size_t arg_for_read_callback = 0;
244 
245  while (!timed_out && !stopping_) // Loop will stop if condition variable timed out
246  {
247  boost::mutex::scoped_lock lock(parse_mutex_);
248  parsing_condition_.wait_for(lock, boost::chrono::seconds(10),
249  [this]() { return try_parsing_; });
250  bool timed_out = !try_parsing_;
251  if (timed_out)
252  break;
253  try_parsing_ = false;
254  allow_writing_ = true;
255  std::size_t current_buffer_size = circular_buffer_.size();
256  arg_for_read_callback += current_buffer_size;
257  circular_buffer_.read(to_be_parsed + shift_bytes, current_buffer_size);
258  Timestamp revcTime = recvTime_;
259  lock.unlock();
260  parsing_condition_.notify_one();
261 
262  try
263  {
264  node_->log(LogLevel::DEBUG,
265  "Calling read_callback_() method, with number of bytes to be parsed being " +
266  std::to_string(arg_for_read_callback));
267  read_callback_(revcTime, to_be_parsed_index, arg_for_read_callback);
268  } catch (std::size_t& parsing_failed_here)
269  {
270  to_be_parsed_index += parsing_failed_here;
271  arg_for_read_callback -= parsing_failed_here;
272  node_->log(LogLevel::DEBUG,
273  "Current buffer size is " + std::to_string(current_buffer_size) +
274  " and parsing_failed_here is " + std::to_string(parsing_failed_here));
275  if (arg_for_read_callback < 0) // In case some parsing error was not
276  // caught, which should never happen..
277  {
278  to_be_parsed_index = to_be_parsed;
279  shift_bytes = 0;
280  arg_for_read_callback = 0;
281  continue;
282  }
283  shift_bytes += current_buffer_size;
284  continue;
285  }
286  to_be_parsed_index = to_be_parsed;
287  shift_bytes = 0;
288  arg_for_read_callback = 0;
289  }
290  node_->log(LogLevel::INFO,
291  "TryParsing() method finished since it did not receive anything to parse for 10 seconds..");
292  delete[] to_be_parsed; // Freeing memory
293  }
294 
295  template <typename StreamT>
296  bool AsyncManager<StreamT>::send(std::string cmd, std::size_t size)
297  {
298  if (size == 0)
299  {
300  node_->log(LogLevel::ERROR, "Message size to be sent to the Rx would be 0");
301  return true;
302  }
303 
304  std::vector<uint8_t> vector_temp(cmd.begin(), cmd.end());
305  uint8_t* p = &vector_temp[0];
306 
307  io_service_->post(
308  boost::bind(&AsyncManager<StreamT>::write, this, cmd, size));
309  return true;
310  }
311 
312  template <typename StreamT>
313  void AsyncManager<StreamT>::write(std::string cmd, std::size_t size)
314  {
315  boost::asio::write(*stream_, boost::asio::buffer(cmd.data(), size));
316  // Prints the data that was sent
317  node_->log(LogLevel::DEBUG, "Sent the following " + std::to_string(size) + " bytes to the Rx: \n" + cmd);
318  }
319 
320  template <typename StreamT>
322  {
323  timer_.async_wait(boost::bind(&AsyncManager::wait, this, count));
324  }
325 
326  template <typename StreamT>
328  ROSaicNodeBase* node,
331  std::size_t buffer_size) :
332  node_(node),
333  timer_(*(io_service.get()), boost::posix_time::seconds(1)),
334  stopping_(false), try_parsing_(false), allow_writing_(true),
335  do_read_count_(0), buffer_size_(buffer_size), count_max_(6),
336  circular_buffer_(node, buffer_size)
337  // Since buffer_size = 16384 in declaration, no need in definition anymore (even
338  // yields error message, due to "overwrite").
339  {
341  "Setting the private stream variable of the AsyncManager instance.");
342  stream_ = stream;
343  io_service_ = io_service;
344  in_.resize(buffer_size_);
345 
346  io_service_->post(boost::bind(&AsyncManager<StreamT>::read, this));
347  // This function is used to ask the io_service to execute the given handler,
348  // but without allowing the io_service to call the handler from inside this
349  // function. The function signature of the handler must be: void handler();
350  // The io_service guarantees that the handler (given as parameter) will only
351  // be called in a thread in which the run(), run_one(), poll() or poll_one()
352  // member functions is currently being invoked. So the fundamental difference
353  // is that dispatch will execute the work right away if it can and queue it
354  // otherwise while post queues the work no matter what.
355  async_background_thread_.reset(new boost::thread(
356  boost::bind(&boost::asio::io_service::run, io_service_)));
357  // Note that io_service_ is already pointer, hence need dereferencing
358  // operator & (ampersand). If the value of the pointer for the current thread
359  // is changed using reset(), then the previous value is destroyed by calling
360  // the cleanup routine. Alternatively, the stored value can be reset to NULL
361  // and the prior value returned by calling the release() member function,
362  // allowing the application to take back responsibility for destroying the
363  // object.
364  uint16_t count = 0;
365  waiting_thread_.reset(new boost::thread(boost::bind(&AsyncManager::callAsyncWait, this, &count)));
366 
367  node_->log(LogLevel::DEBUG, "Launching tryParsing() thread..");
368  parsing_thread_.reset(new boost::thread(boost::bind(&AsyncManager::tryParsing, this)));
369  } // Calls std::terminate() on thread just created
370 
371  template <typename StreamT>
373  {
374  close();
375  io_service_->stop();
376  try_parsing_ = true;
377  parsing_condition_.notify_one();
378  parsing_thread_->join();
379  waiting_thread_->join();
380  async_background_thread_->join();
381  }
382 
383  template <typename StreamT>
385  {
386  stream_->async_read_some(
387  boost::asio::buffer(in_.data(), in_.size()),
389  boost::asio::placeholders::error,
390  boost::asio::placeholders::bytes_transferred));
391  // The handler is async_read_some_handler, whose call is postponed to
392  // when async_read_some completes.
393  if (do_read_count_ < 5)
394  ++do_read_count_;
395  }
396 
397  template <typename StreamT>
399  const boost::system::error_code& error, std::size_t bytes_transferred)
400  {
401  if (error)
402  {
403  node_->log(LogLevel::ERROR, "Rx ASIO input buffer read error: " + error.message() + ", " +
404  std::to_string(bytes_transferred));
405  } else if (bytes_transferred > 0)
406  {
407  Timestamp inTime = node_->getTime();
408  if (read_callback_ && !stopping_) // Will be false in InitializeSerial (first call)
409  // since read_callback_ not added yet..
410  {
411  boost::mutex::scoped_lock lock(parse_mutex_);
412  parsing_condition_.wait(lock, [this]() { return allow_writing_; });
413  circular_buffer_.write(in_.data(), bytes_transferred);
414  allow_writing_ = false;
415  try_parsing_ = true;
416  recvTime_ = inTime;
417  lock.unlock();
418  parsing_condition_.notify_one();
419  std::vector<uint8_t> empty;
420  in_ = empty;
421  in_.resize(buffer_size_);
422  }
423  }
424 
425  if (!stopping_)
426  io_service_->post(boost::bind(&AsyncManager<StreamT>::read, this));
427  }
428 
429  template <typename StreamT>
431  {
432  stopping_ = true;
433  boost::system::error_code error;
434  stream_->close(error);
435  if (error)
436  {
438  "Error while closing the AsyncManager: " + error.message());
439  }
440  }
441 
442  template <typename StreamT>
443  void AsyncManager<StreamT>::wait(uint16_t* count)
444  {
445  if (*count < count_max_)
446  {
447  ++(*count);
448  timer_.expires_at(timer_.expires_at() + boost::posix_time::seconds(1));
449  if (!(*count == count_max_))
450  {
451  timer_.async_wait(boost::bind(&AsyncManager::wait, this, count));
452  }
453  }
454  if ((*count == count_max_) && (do_read_count_ < 3))
455  // Why 3? Even if there are no incoming messages, read() is called once.
456  // It will be called a second time in TCP/IP mode since (just example)
457  // "IP10<" is transmitted.
458  {
460  "No incoming messages, driver stopped, ros::spin() will spin forever unless you hit Ctrl+C.");
461  async_background_thread_->interrupt();
462  }
463  }
464 } // namespace io_comm_rx
465 
466 #endif // for ASYNC_MANAGER_HPP
const std::size_t buffer_size_
Size of in_ buffers.
virtual bool send(std::string cmd, std::size_t size)=0
Sends commands to the receiver.
string cmd
ROSaicNodeBase * node_
Pointer to the node.
bool isOpen() const
Determines whether or not the connection is open.
bool try_parsing_
Determines when the tryParsing() method will attempt parsing SBF/NMEA.
void setCallback(const Callback &callback)
Allows to connect to the CallbackHandlers class.
bool stopping_
Whether or not we want to sever the connection to the Rx.
Timestamp recvTime_
Timestamp of receiving buffer.
void wait(uint16_t *count)
void log(LogLevel logLevel, const std::string &s)
Log function to provide abstraction of ROS loggers.
Definition: typedefs.hpp:208
boost::asio::deadline_timer timer_
uint64_t Timestamp
Definition: typedefs.hpp:77
virtual void wait(uint16_t *count)=0
std::size_t write(const uint8_t *data, std::size_t bytes)
Returns number of bytes written.
boost::shared_ptr< boost::asio::io_service > io_service_
io_context object
virtual bool isOpen() const =0
Determines whether or not the connection is open.
AsyncManager(ROSaicNodeBase *node, boost::shared_ptr< StreamT > stream, boost::shared_ptr< boost::asio::io_service > io_service, std::size_t buffer_size=16384)
Class constructor.
boost::mutex parse_mutex_
Mutex to control changes of class variable "try_parsing".
void close()
Closes stream "stream_".
boost::shared_ptr< boost::thread > waiting_thread_
New thread for receiving incoming messages.
Class for creating, writing to and reading from a circular buffer.
virtual void setCallback(const Callback &callback)=0
Sets the callback function.
ROSCPP_DECL bool get(const std::string &key, std::string &s)
boost::shared_ptr< boost::thread > async_background_thread_
New thread for receiving incoming messages.
void callAsyncWait(uint16_t *count)
Handles the ROS_INFO throwing (if no incoming message)
boost::condition_variable parsing_condition_
Condition variable complementing "parse_mutex".
bool send(std::string cmd, std::size_t size)
Sends commands via the I/O stream.
boost::function< void(Timestamp, const uint8_t *, std::size_t &)> Callback
Callback read_callback_
Callback to be called once message arrives.
Declares a class for creating, writing to and reading from a circular bufffer.
This is the central interface between ROSaic and the Rx(s), managing I/O operations such as reading m...
Timestamp getTime()
Gets current timestamp.
Definition: typedefs.hpp:236
CircularBuffer circular_buffer_
void write(std::string cmd, std::size_t size)
Sends command "cmd" to the Rx.
This class is the base class for abstraction.
Definition: typedefs.hpp:160
void callback(const sensor_msgs::NavSatFixConstPtr &fix)
boost::shared_ptr< boost::thread > parsing_thread_
New thread for receiving incoming messages.
std::vector< uint8_t > in_
Buffer for async_read_some() to read continuous SBF/NMEA stream.
boost::shared_ptr< StreamT > stream_
Stream, represents either serial or TCP/IP connection.
Interface (in C++ terms), that could be used for any I/O manager, synchronous and asynchronous alike...
void asyncReadSomeHandler(const boost::system::error_code &error, std::size_t bytes_transferred)
Handler for async_read_some (Boost library)..


septentrio_gnss_driver
Author(s): Tibor Dome
autogenerated on Thu Jun 23 2022 02:11:38