async_manager.hpp
Go to the documentation of this file.
1 // *****************************************************************************
2 //
3 // © Copyright 2020, Septentrio NV/SA.
4 // All rights reserved.
5 //
6 // Redistribution and use in source and binary forms, with or without
7 // modification, are permitted provided that the following conditions are met:
8 // 1. Redistributions of source code must retain the above copyright
9 // notice, this list of conditions and the following disclaimer.
10 // 2. Redistributions in binary form must reproduce the above copyright
11 // notice, this list of conditions and the following disclaimer in the
12 // documentation and/or other materials provided with the distribution.
13 // 3. Neither the name of the copyright holder nor the names of its
14 // contributors may be used to endorse or promote products derived
15 // from this software without specific prior written permission.
16 //
17 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 // AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 // ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21 // LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 // CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 // SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 // INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 // CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 // POSSIBILITY OF SUCH DAMAGE.
28 //
29 // *****************************************************************************
30 
31 // *****************************************************************************
32 //
33 // Boost Software License - Version 1.0 - August 17th, 2003
34 //
35 // Permission is hereby granted, free of charge, to any person or organization
36 // obtaining a copy of the software and accompanying documentation covered by
37 // this license (the "Software") to use, reproduce, display, distribute,
38 // execute, and transmit the Software, and to prepare derivative works of the
39 // Software, and to permit third-parties to whom the Software is furnished to
40 // do so, all subject to the following:
41 
42 // The copyright notices in the Software and this entire statement, including
43 // the above license grant, this restriction and the following disclaimer,
44 // must be included in all copies of the Software, in whole or in part, and
45 // all derivative works of the Software, unless such copies or derivative
46 // works are solely in the form of machine-executable object code generated by
47 // a source language processor.
48 //
49 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
50 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
51 // FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
52 // SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
53 // FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
54 // ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
55 // DEALINGS IN THE SOFTWARE.
56 //
57 // *****************************************************************************
58 
59 // Boost includes
60 #include <boost/algorithm/string/join.hpp>
61 #include <boost/asio.hpp>
62 #include <boost/bind.hpp>
63 #include <boost/date_time/posix_time/posix_time.hpp>
64 #include <boost/function.hpp>
65 #include <boost/system/error_code.hpp>
66 #include <boost/thread.hpp>
67 #include <boost/thread/condition.hpp>
68 
69 // ROSaic includes
71 
72 #ifndef ASYNC_MANAGER_HPP
73 #define ASYNC_MANAGER_HPP
74 
84 namespace io_comm_rx {
85 
91  class Manager
92  {
93  public:
94  typedef boost::function<void(Timestamp, const uint8_t*, std::size_t&)>
96  virtual ~Manager() {}
98  virtual void setCallback(const Callback& callback) = 0;
100  virtual bool send(const std::string& cmd) = 0;
103  virtual void wait(uint16_t* count) = 0;
105  virtual bool isOpen() const = 0;
106  };
107 
115  template <typename StreamT>
116  class AsyncManager : public Manager
117  {
118  public:
129  std::size_t buffer_size = 16384);
130  virtual ~AsyncManager();
131 
137  void setCallback(const Callback& callback) { read_callback_ = callback; }
138 
139  void wait(uint16_t* count);
140 
145  bool send(const std::string& cmd);
146 
147  bool isOpen() const { return stream_->is_open(); }
148 
149  private:
152 
153  protected:
156  void read();
157 
159  void asyncReadSomeHandler(const boost::system::error_code& error,
160  std::size_t bytes_transferred);
161 
163  void write(const std::string& cmd);
164 
166  void close();
167 
170  void tryParsing();
171 
173  boost::mutex parse_mutex_;
174 
177 
181 
183  boost::condition_variable parsing_condition_;
184 
187 
190 
192  std::vector<uint8_t> in_;
193 
197 
200 
203 
206 
209 
211  bool stopping_;
212 
214  const std::size_t buffer_size_;
215 
218  boost::asio::deadline_timer timer_;
219 
222  const uint16_t count_max_;
223 
225  void callAsyncWait(uint16_t* count);
226 
229  uint16_t do_read_count_;
230 
233  };
234 
235  template <typename StreamT>
237  {
238  uint8_t* to_be_parsed = new uint8_t[buffer_size_ * 16];
239  uint8_t* to_be_parsed_index = to_be_parsed;
240  bool timed_out = false;
241  std::size_t shift_bytes = 0;
242  std::size_t arg_for_read_callback = 0;
243 
244  while (!timed_out &&
245  !stopping_) // Loop will stop if condition variable timed out
246  {
247  boost::mutex::scoped_lock lock(parse_mutex_);
248  parsing_condition_.wait_for(lock, boost::chrono::seconds(10),
249  [this]() { return try_parsing_; });
250  bool timed_out = !try_parsing_;
251  if (timed_out)
252  break;
253  try_parsing_ = false;
254  allow_writing_ = true;
255  std::size_t current_buffer_size = circular_buffer_.size();
256  arg_for_read_callback += current_buffer_size;
257  circular_buffer_.read(to_be_parsed + shift_bytes, current_buffer_size);
258  Timestamp revcTime = recvTime_;
259  lock.unlock();
260  parsing_condition_.notify_one();
261 
262  try
263  {
264  node_->log(
266  "Calling read_callback_() method, with number of bytes to be parsed being " +
267  std::to_string(arg_for_read_callback));
268  read_callback_(revcTime, to_be_parsed_index, arg_for_read_callback);
269  } catch (std::size_t& parsing_failed_here)
270  {
271  to_be_parsed_index += parsing_failed_here;
272  arg_for_read_callback -= parsing_failed_here;
273  node_->log(LogLevel::DEBUG, "Current buffer size is " +
274  std::to_string(current_buffer_size) +
275  " and parsing_failed_here is " +
276  std::to_string(parsing_failed_here));
277  if (arg_for_read_callback < 0) // In case some parsing error was not
278  // caught, which should never happen..
279  {
280  to_be_parsed_index = to_be_parsed;
281  shift_bytes = 0;
282  arg_for_read_callback = 0;
283  continue;
284  }
285  shift_bytes += current_buffer_size;
286  continue;
287  }
288  to_be_parsed_index = to_be_parsed;
289  shift_bytes = 0;
290  arg_for_read_callback = 0;
291  }
292  node_->log(
294  "TryParsing() method finished since it did not receive anything to parse for 10 seconds..");
295  delete[] to_be_parsed; // Freeing memory
296  }
297 
298  template <typename StreamT>
299  bool AsyncManager<StreamT>::send(const std::string& cmd)
300  {
301  if (cmd.size() == 0)
302  {
303  node_->log(LogLevel::ERROR,
304  "Message size to be sent to the Rx would be 0");
305  return true;
306  }
307 
308  io_service_->post(boost::bind(&AsyncManager<StreamT>::write, this, cmd));
309  return true;
310  }
311 
312  template <typename StreamT>
313  void AsyncManager<StreamT>::write(const std::string& cmd)
314  {
315  boost::asio::write(*stream_, boost::asio::buffer(cmd.data(), cmd.size()));
316  // Prints the data that was sent
317  node_->log(LogLevel::DEBUG, "Sent the following " +
318  std::to_string(cmd.size()) +
319  " bytes to the Rx: \n" + cmd);
320  }
321 
322  template <typename StreamT>
324  {
325  timer_.async_wait(boost::bind(&AsyncManager::wait, this, count));
326  }
327 
328  template <typename StreamT>
332  std::size_t buffer_size) :
333  node_(node),
334  timer_(*(io_service.get()), boost::posix_time::seconds(1)), stopping_(false),
335  try_parsing_(false), allow_writing_(true), do_read_count_(0),
336  buffer_size_(buffer_size), count_max_(6), circular_buffer_(node, buffer_size)
337  // Since buffer_size = 16384 in declaration, no need in definition anymore (even
338  // yields error message, due to "overwrite").
339  {
340  node_->log(
342  "Setting the private stream variable of the AsyncManager instance.");
343  stream_ = stream;
344  io_service_ = io_service;
345  in_.resize(buffer_size_);
346 
347  io_service_->post(boost::bind(&AsyncManager<StreamT>::read, this));
348  // This function is used to ask the io_service to execute the given handler,
349  // but without allowing the io_service to call the handler from inside this
350  // function. The function signature of the handler must be: void handler();
351  // The io_service guarantees that the handler (given as parameter) will only
352  // be called in a thread in which the run(), run_one(), poll() or poll_one()
353  // member functions is currently being invoked. So the fundamental difference
354  // is that dispatch will execute the work right away if it can and queue it
355  // otherwise while post queues the work no matter what.
356  async_background_thread_.reset(new boost::thread(
357  boost::bind(&boost::asio::io_service::run, io_service_)));
358  // Note that io_service_ is already pointer, hence need dereferencing
359  // operator & (ampersand). If the value of the pointer for the current thread
360  // is changed using reset(), then the previous value is destroyed by calling
361  // the cleanup routine. Alternatively, the stored value can be reset to NULL
362  // and the prior value returned by calling the release() member function,
363  // allowing the application to take back responsibility for destroying the
364  // object.
365  uint16_t count = 0;
366  waiting_thread_.reset(new boost::thread(
367  boost::bind(&AsyncManager::callAsyncWait, this, &count)));
368 
369  node_->log(LogLevel::DEBUG, "Launching tryParsing() thread..");
370  parsing_thread_.reset(
371  new boost::thread(boost::bind(&AsyncManager::tryParsing, this)));
372  } // Calls std::terminate() on thread just created
373 
374  template <typename StreamT>
376  {
377  close();
378  io_service_->stop();
379  try_parsing_ = true;
380  parsing_condition_.notify_one();
381  parsing_thread_->join();
382  waiting_thread_->join();
383  async_background_thread_->join();
384  }
385 
386  template <typename StreamT>
388  {
389  stream_->async_read_some(
390  boost::asio::buffer(in_.data(), in_.size()),
392  boost::asio::placeholders::error,
393  boost::asio::placeholders::bytes_transferred));
394  // The handler is async_read_some_handler, whose call is postponed to
395  // when async_read_some completes.
396  if (do_read_count_ < 5)
397  ++do_read_count_;
398  }
399 
400  template <typename StreamT>
402  const boost::system::error_code& error, std::size_t bytes_transferred)
403  {
404  if (error)
405  {
407  "Rx ASIO input buffer read error: " + error.message() + ", " +
408  std::to_string(bytes_transferred));
409  } else if (bytes_transferred > 0)
410  {
411  Timestamp inTime = node_->getTime();
412  if (read_callback_ &&
413  !stopping_) // Will be false in InitializeSerial (first call)
414  // since read_callback_ not added yet..
415  {
416  boost::mutex::scoped_lock lock(parse_mutex_);
417  parsing_condition_.wait(lock, [this]() { return allow_writing_; });
418  circular_buffer_.write(in_.data(), bytes_transferred);
419  allow_writing_ = false;
420  try_parsing_ = true;
421  recvTime_ = inTime;
422  lock.unlock();
423  parsing_condition_.notify_one();
424  }
425  }
426 
427  if (!stopping_)
428  io_service_->post(boost::bind(&AsyncManager<StreamT>::read, this));
429  }
430 
431  template <typename StreamT>
433  {
434  stopping_ = true;
435  boost::system::error_code error;
436  stream_->close(error);
437  if (error)
438  {
440  "Error while closing the AsyncManager: " + error.message());
441  }
442  }
443 
444  template <typename StreamT>
445  void AsyncManager<StreamT>::wait(uint16_t* count)
446  {
447  if (*count < count_max_)
448  {
449  ++(*count);
450  timer_.expires_at(timer_.expires_at() + boost::posix_time::seconds(1));
451  if (!(*count == count_max_))
452  {
453  timer_.async_wait(boost::bind(&AsyncManager::wait, this, count));
454  }
455  }
456  if ((*count == count_max_) && (do_read_count_ < 3))
457  // Why 3? Even if there are no incoming messages, read() is called once.
458  // It will be called a second time in TCP/IP mode since (just example)
459  // "IP10<" is transmitted.
460  {
461  node_->log(
463  "No incoming messages, driver stopped, ros::spin() will spin forever unless you hit Ctrl+C.");
464  async_background_thread_->interrupt();
465  }
466  }
467 } // namespace io_comm_rx
468 
469 #endif // for ASYNC_MANAGER_HPP
const std::size_t buffer_size_
Size of in_ buffers.
string cmd
ROSaicNodeBase * node_
Pointer to the node.
bool isOpen() const
Determines whether or not the connection is open.
bool try_parsing_
Determines when the tryParsing() method will attempt parsing SBF/NMEA.
void setCallback(const Callback &callback)
Allows to connect to the CallbackHandlers class.
bool stopping_
Whether or not we want to sever the connection to the Rx.
Timestamp recvTime_
Timestamp of receiving buffer.
void wait(uint16_t *count)
void log(LogLevel logLevel, const std::string &s)
Log function to provide abstraction of ROS loggers.
Definition: typedefs.hpp:231
boost::asio::deadline_timer timer_
uint64_t Timestamp
Definition: typedefs.hpp:88
virtual void wait(uint16_t *count)=0
std::size_t write(const uint8_t *data, std::size_t bytes)
Returns number of bytes written.
boost::shared_ptr< boost::asio::io_service > io_service_
io_context object
virtual bool isOpen() const =0
Determines whether or not the connection is open.
AsyncManager(ROSaicNodeBase *node, boost::shared_ptr< StreamT > stream, boost::shared_ptr< boost::asio::io_service > io_service, std::size_t buffer_size=16384)
Class constructor.
boost::mutex parse_mutex_
Mutex to control changes of class variable "try_parsing".
void close()
Closes stream "stream_".
boost::shared_ptr< boost::thread > waiting_thread_
New thread for receiving incoming messages.
Class for creating, writing to and reading from a circular buffer.
virtual void setCallback(const Callback &callback)=0
Sets the callback function.
ROSCPP_DECL bool get(const std::string &key, std::string &s)
boost::shared_ptr< boost::thread > async_background_thread_
New thread for receiving incoming messages.
void callAsyncWait(uint16_t *count)
Handles the ROS_INFO throwing (if no incoming message)
boost::condition_variable parsing_condition_
Condition variable complementing "parse_mutex".
boost::function< void(Timestamp, const uint8_t *, std::size_t &)> Callback
Callback read_callback_
Callback to be called once message arrives.
bool send(const std::string &cmd)
Sends commands via the I/O stream.
Declares a class for creating, writing to and reading from a circular bufffer.
This is the central interface between ROSaic and the Rx(s), managing I/O operations such as reading m...
Timestamp getTime()
Gets current timestamp.
Definition: typedefs.hpp:259
CircularBuffer circular_buffer_
void write(const std::string &cmd)
Sends command "cmd" to the Rx.
This class is the base class for abstraction.
Definition: typedefs.hpp:174
void callback(const sensor_msgs::NavSatFixConstPtr &fix)
virtual bool send(const std::string &cmd)=0
Sends commands to the receiver.
boost::shared_ptr< boost::thread > parsing_thread_
New thread for receiving incoming messages.
std::vector< uint8_t > in_
Buffer for async_read_some() to read continuous SBF/NMEA stream.
boost::shared_ptr< StreamT > stream_
Stream, represents either serial or TCP/IP connection.
Interface (in C++ terms), that could be used for any I/O manager, synchronous and asynchronous alike...
void asyncReadSomeHandler(const boost::system::error_code &error, std::size_t bytes_transferred)
Handler for async_read_some (Boost library)..


septentrio_gnss_driver
Author(s): Tibor Dome
autogenerated on Sat Mar 11 2023 03:12:55