60 #include <boost/algorithm/string/join.hpp> 61 #include <boost/asio.hpp> 62 #include <boost/bind.hpp> 63 #include <boost/date_time/posix_time/posix_time.hpp> 64 #include <boost/function.hpp> 65 #include <boost/system/error_code.hpp> 66 #include <boost/thread.hpp> 67 #include <boost/thread/condition.hpp> 72 #ifndef ASYNC_MANAGER_HPP 73 #define ASYNC_MANAGER_HPP 94 typedef boost::function<void(Timestamp, const uint8_t*, std::size_t&)>
100 virtual bool send(
const std::string&
cmd) = 0;
103 virtual void wait(uint16_t* count) = 0;
105 virtual bool isOpen()
const = 0;
115 template <
typename StreamT>
129 std::size_t buffer_size = 16384);
139 void wait(uint16_t* count);
145 bool send(
const std::string&
cmd);
147 bool isOpen()
const {
return stream_->is_open(); }
159 void asyncReadSomeHandler(
const boost::system::error_code& error,
160 std::size_t bytes_transferred);
163 void write(
const std::string& cmd);
225 void callAsyncWait(uint16_t* count);
235 template <
typename StreamT>
238 uint8_t* to_be_parsed =
new uint8_t[buffer_size_ * 16];
239 uint8_t* to_be_parsed_index = to_be_parsed;
240 bool timed_out =
false;
241 std::size_t shift_bytes = 0;
242 std::size_t arg_for_read_callback = 0;
247 boost::mutex::scoped_lock lock(parse_mutex_);
248 parsing_condition_.wait_for(lock, boost::chrono::seconds(10),
249 [
this]() {
return try_parsing_; });
250 bool timed_out = !try_parsing_;
253 try_parsing_ =
false;
254 allow_writing_ =
true;
255 std::size_t current_buffer_size = circular_buffer_.size();
256 arg_for_read_callback += current_buffer_size;
257 circular_buffer_.read(to_be_parsed + shift_bytes, current_buffer_size);
260 parsing_condition_.notify_one();
266 "Calling read_callback_() method, with number of bytes to be parsed being " +
267 std::to_string(arg_for_read_callback));
268 read_callback_(revcTime, to_be_parsed_index, arg_for_read_callback);
269 }
catch (std::size_t& parsing_failed_here)
271 to_be_parsed_index += parsing_failed_here;
272 arg_for_read_callback -= parsing_failed_here;
274 std::to_string(current_buffer_size) +
275 " and parsing_failed_here is " +
276 std::to_string(parsing_failed_here));
277 if (arg_for_read_callback < 0)
280 to_be_parsed_index = to_be_parsed;
282 arg_for_read_callback = 0;
285 shift_bytes += current_buffer_size;
288 to_be_parsed_index = to_be_parsed;
290 arg_for_read_callback = 0;
294 "TryParsing() method finished since it did not receive anything to parse for 10 seconds..");
295 delete[] to_be_parsed;
298 template <
typename StreamT>
304 "Message size to be sent to the Rx would be 0");
312 template <
typename StreamT>
315 boost::asio::write(*stream_, boost::asio::buffer(cmd.data(), cmd.size()));
318 std::to_string(cmd.size()) +
319 " bytes to the Rx: \n" + cmd);
322 template <
typename StreamT>
328 template <
typename StreamT>
332 std::size_t buffer_size) :
334 timer_(*(io_service.
get()),
boost::posix_time::seconds(1)), stopping_(false),
335 try_parsing_(false), allow_writing_(true), do_read_count_(0),
336 buffer_size_(buffer_size), count_max_(6), circular_buffer_(node, buffer_size)
342 "Setting the private stream variable of the AsyncManager instance.");
357 boost::bind(&boost::asio::io_service::run,
io_service_)));
374 template <
typename StreamT>
386 template <
typename StreamT>
390 boost::asio::buffer(
in_.data(),
in_.size()),
392 boost::asio::placeholders::error,
393 boost::asio::placeholders::bytes_transferred));
400 template <
typename StreamT>
402 const boost::system::error_code& error, std::size_t bytes_transferred)
407 "Rx ASIO input buffer read error: " + error.message() +
", " +
408 std::to_string(bytes_transferred));
409 }
else if (bytes_transferred > 0)
431 template <
typename StreamT>
435 boost::system::error_code error;
440 "Error while closing the AsyncManager: " + error.message());
444 template <
typename StreamT>
450 timer_.expires_at(
timer_.expires_at() + boost::posix_time::seconds(1));
463 "No incoming messages, driver stopped, ros::spin() will spin forever unless you hit Ctrl+C.");
469 #endif // for ASYNC_MANAGER_HPP const std::size_t buffer_size_
Size of in_ buffers.
ROSaicNodeBase * node_
Pointer to the node.
bool isOpen() const
Determines whether or not the connection is open.
bool try_parsing_
Determines when the tryParsing() method will attempt parsing SBF/NMEA.
void setCallback(const Callback &callback)
Allows to connect to the CallbackHandlers class.
bool stopping_
Whether or not we want to sever the connection to the Rx.
Timestamp recvTime_
Timestamp of receiving buffer.
void wait(uint16_t *count)
void log(LogLevel logLevel, const std::string &s)
Log function to provide abstraction of ROS loggers.
boost::asio::deadline_timer timer_
virtual void wait(uint16_t *count)=0
std::size_t write(const uint8_t *data, std::size_t bytes)
Returns number of bytes written.
boost::shared_ptr< boost::asio::io_service > io_service_
io_context object
virtual bool isOpen() const =0
Determines whether or not the connection is open.
AsyncManager(ROSaicNodeBase *node, boost::shared_ptr< StreamT > stream, boost::shared_ptr< boost::asio::io_service > io_service, std::size_t buffer_size=16384)
Class constructor.
boost::mutex parse_mutex_
Mutex to control changes of class variable "try_parsing".
void close()
Closes stream "stream_".
boost::shared_ptr< boost::thread > waiting_thread_
New thread for receiving incoming messages.
Class for creating, writing to and reading from a circular buffer.
virtual void setCallback(const Callback &callback)=0
Sets the callback function.
ROSCPP_DECL bool get(const std::string &key, std::string &s)
const uint16_t count_max_
boost::shared_ptr< boost::thread > async_background_thread_
New thread for receiving incoming messages.
void callAsyncWait(uint16_t *count)
Handles the ROS_INFO throwing (if no incoming message)
boost::condition_variable parsing_condition_
Condition variable complementing "parse_mutex".
boost::function< void(Timestamp, const uint8_t *, std::size_t &)> Callback
Callback read_callback_
Callback to be called once message arrives.
bool send(const std::string &cmd)
Sends commands via the I/O stream.
Declares a class for creating, writing to and reading from a circular bufffer.
This is the central interface between ROSaic and the Rx(s), managing I/O operations such as reading m...
Timestamp getTime()
Gets current timestamp.
CircularBuffer circular_buffer_
void write(const std::string &cmd)
Sends command "cmd" to the Rx.
This class is the base class for abstraction.
void callback(const sensor_msgs::NavSatFixConstPtr &fix)
virtual bool send(const std::string &cmd)=0
Sends commands to the receiver.
boost::shared_ptr< boost::thread > parsing_thread_
New thread for receiving incoming messages.
std::vector< uint8_t > in_
Buffer for async_read_some() to read continuous SBF/NMEA stream.
boost::shared_ptr< StreamT > stream_
Stream, represents either serial or TCP/IP connection.
Interface (in C++ terms), that could be used for any I/O manager, synchronous and asynchronous alike...
void asyncReadSomeHandler(const boost::system::error_code &error, std::size_t bytes_transferred)
Handler for async_read_some (Boost library)..