60 #include <boost/algorithm/string/join.hpp>
61 #include <boost/asio.hpp>
62 #include <boost/bind.hpp>
63 #include <boost/date_time/posix_time/posix_time.hpp>
64 #include <boost/function.hpp>
65 #include <boost/system/error_code.hpp>
66 #include <boost/thread.hpp>
67 #include <boost/thread/condition.hpp>
72 #ifndef ASYNC_MANAGER_HPP
73 #define ASYNC_MANAGER_HPP
94 typedef boost::function<void(
Timestamp,
const uint8_t*, std::size_t&)>
100 virtual bool send(
const std::string& cmd) = 0;
103 virtual void wait(uint16_t* count) = 0;
105 virtual bool isOpen()
const = 0;
115 template <
typename StreamT>
129 std::size_t buffer_size = 16384);
139 void wait(uint16_t* count);
145 bool send(
const std::string& cmd);
160 std::size_t bytes_transferred);
163 void write(
const std::string& cmd);
235 template <
typename StreamT>
238 uint8_t* to_be_parsed =
new uint8_t[buffer_size_ * 16];
239 uint8_t* to_be_parsed_index = to_be_parsed;
240 bool timed_out =
false;
241 std::size_t shift_bytes = 0;
242 std::size_t arg_for_read_callback = 0;
247 boost::mutex::scoped_lock lock(parse_mutex_);
248 parsing_condition_.wait_for(lock, boost::chrono::seconds(10),
249 [
this]() {
return try_parsing_; });
250 bool timed_out = !try_parsing_;
253 try_parsing_ =
false;
254 allow_writing_ =
true;
255 std::size_t current_buffer_size = circular_buffer_.size();
256 arg_for_read_callback += current_buffer_size;
257 circular_buffer_.read(to_be_parsed + shift_bytes, current_buffer_size);
260 parsing_condition_.notify_one();
266 "Calling read_callback_() method, with number of bytes to be parsed being " +
267 std::to_string(arg_for_read_callback));
268 read_callback_(revcTime, to_be_parsed_index, arg_for_read_callback);
269 }
catch (std::size_t& parsing_failed_here)
271 to_be_parsed_index += parsing_failed_here;
272 arg_for_read_callback -= parsing_failed_here;
274 std::to_string(current_buffer_size) +
275 " and parsing_failed_here is " +
276 std::to_string(parsing_failed_here));
277 if (arg_for_read_callback < 0)
280 to_be_parsed_index = to_be_parsed;
282 arg_for_read_callback = 0;
285 shift_bytes += current_buffer_size;
288 to_be_parsed_index = to_be_parsed;
290 arg_for_read_callback = 0;
294 "TryParsing() method finished since it did not receive anything to parse for 10 seconds..");
295 delete[] to_be_parsed;
298 template <
typename StreamT>
304 "Message size to be sent to the Rx would be 0");
312 template <
typename StreamT>
315 boost::asio::write(*stream_, boost::asio::buffer(cmd.data(), cmd.size()));
318 std::to_string(cmd.size()) +
319 " bytes to the Rx: \n" + cmd);
322 template <
typename StreamT>
328 template <
typename StreamT>
332 std::size_t buffer_size) :
334 timer_(*(io_service.
get()),
boost::posix_time::seconds(1)), stopping_(false),
335 try_parsing_(false), allow_writing_(true), do_read_count_(0),
336 buffer_size_(buffer_size), count_max_(6), circular_buffer_(node, buffer_size)
342 "Setting the private stream variable of the AsyncManager instance.");
357 boost::bind(&boost::asio::io_service::run,
io_service_)));
374 template <
typename StreamT>
380 parsing_condition_.notify_one();
381 parsing_thread_->join();
382 waiting_thread_->join();
383 async_background_thread_->join();
386 template <
typename StreamT>
389 stream_->async_read_some(
390 boost::asio::buffer(in_.data(), in_.size()),
392 boost::asio::placeholders::error,
393 boost::asio::placeholders::bytes_transferred));
396 if (do_read_count_ < 5)
400 template <
typename StreamT>
402 const boost::system::error_code& error, std::size_t bytes_transferred)
407 "Rx ASIO input buffer read error: " + error.message() +
", " +
408 std::to_string(bytes_transferred));
409 }
else if (bytes_transferred > 0)
412 if (read_callback_ &&
416 boost::mutex::scoped_lock lock(parse_mutex_);
417 parsing_condition_.wait(lock, [
this]() {
return allow_writing_; });
418 circular_buffer_.write(in_.data(), bytes_transferred);
419 allow_writing_ =
false;
423 parsing_condition_.notify_one();
431 template <
typename StreamT>
435 boost::system::error_code error;
436 stream_->close(error);
440 "Error while closing the AsyncManager: " + error.message());
444 template <
typename StreamT>
447 if (*count < count_max_)
450 timer_.expires_at(timer_.expires_at() + boost::posix_time::seconds(1));
451 if (!(*count == count_max_))
456 if ((*count == count_max_) && (do_read_count_ < 3))
463 "No incoming messages, driver stopped, ros::spin() will spin forever unless you hit Ctrl+C.");
464 async_background_thread_->interrupt();
469 #endif // for ASYNC_MANAGER_HPP