comm.cpp
Go to the documentation of this file.
1 /*
2  * Software License Agreement (BSD-3 License)
3  *
4  * Copyright (c) 2018 Daniel Koch.
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * * Redistributions of source code must retain the above copyright notice, this
11  * list of conditions and the following disclaimer.
12  *
13  * * Redistributions in binary form must reproduce the above copyright notice,
14  * this list of conditions and the following disclaimer in the documentation
15  * and/or other materials provided with the distribution.
16  *
17  * * Neither the name of the copyright holder nor the names of its
18  * contributors may be used to endorse or promote products derived from
19  * this software without specific prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
22  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
24  * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
25  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
26  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
27  * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
28  * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
29  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32 
38 #include <async_comm/comm.h>
39 
40 #include <iostream>
41 #include <boost/bind.hpp>
42 
43 namespace async_comm
44 {
45 
46 DefaultMessageHandler Comm::default_message_handler_;
47 
48 Comm::Comm(MessageHandler& message_handler) :
49  message_handler_(message_handler),
50  io_service_(),
51  new_data_(false),
52  shutdown_requested_(false),
53  write_in_progress_(false)
54 {
55 }
56 
58 {
59 }
60 
61 bool Comm::init()
62 {
63  if (!do_init())
64  return false;
65 
66  callback_thread_ = std::thread(std::bind(&Comm::process_callbacks, this));
67 
68  async_read();
69  io_thread_ = std::thread(boost::bind(&boost::asio::io_service::run, &this->io_service_));
70 
71  return true;
72 }
73 
75 {
76  // send shutdown signal to callback thread
77  {
78  std::unique_lock<std::mutex> lock(callback_mutex_);
79  shutdown_requested_ = true;
80  }
81  condition_variable_.notify_one();
82 
83  io_service_.stop();
84  do_close();
85 
86  if (io_thread_.joinable())
87  {
88  io_thread_.join();
89  }
90 
91  if (callback_thread_.joinable())
92  {
93  callback_thread_.join();
94  }
95 }
96 
97 void Comm::send_bytes(const uint8_t *src, size_t len)
98 {
100 
101  for (size_t pos = 0; pos < len; pos += WRITE_BUFFER_SIZE)
102  {
103  size_t num_bytes = (len - pos) > WRITE_BUFFER_SIZE ? WRITE_BUFFER_SIZE : (len - pos);
104  write_queue_.emplace_back(src + pos, num_bytes);
105  }
106 
107  async_write(true);
108 }
109 
110 void Comm::register_receive_callback(std::function<void(const uint8_t*, size_t)> fun)
111 {
112  receive_callback_ = fun;
113 }
114 
116 {
117  listeners_.push_back(listener);
118 }
119 
121 {
122  if (!is_open()) return;
123 
124  do_async_read(boost::asio::buffer(read_buffer_, READ_BUFFER_SIZE),
125  boost::bind(&Comm::async_read_end,
126  this,
127  boost::asio::placeholders::error,
128  boost::asio::placeholders::bytes_transferred));
129 }
130 
131 void Comm::async_read_end(const boost::system::error_code &error, size_t bytes_transferred)
132 {
133  if (error)
134  {
135  message_handler_.error(error.message());
136  close();
137  return;
138  }
139 
140  {
141  std::unique_lock<std::mutex> lock(callback_mutex_);
142  read_queue_.emplace_back(read_buffer_, bytes_transferred);
143  new_data_ = true;
144  }
145  condition_variable_.notify_one();
146 
147  async_read();
148 }
149 
150 void Comm::async_write(bool check_write_state)
151 {
152  if (check_write_state && write_in_progress_)
153  return;
154 
155  mutex_lock lock(write_mutex_);
156  if (write_queue_.empty())
157  return;
158 
159  write_in_progress_ = true;
160  WriteBuffer& buffer = write_queue_.front();
161  do_async_write(boost::asio::buffer(buffer.dpos(), buffer.nbytes()),
162  boost::bind(&Comm::async_write_end,
163  this,
164  boost::asio::placeholders::error,
165  boost::asio::placeholders::bytes_transferred));
166 }
167 
168 void Comm::async_write_end(const boost::system::error_code &error, size_t bytes_transferred)
169 {
170  if (error)
171  {
172  message_handler_.error(error.message());
173  close();
174  return;
175  }
176 
177  mutex_lock lock(write_mutex_);
178  if (write_queue_.empty())
179  {
180  write_in_progress_ = false;
181  return;
182  }
183 
184  WriteBuffer& buffer = write_queue_.front();
185  buffer.pos += bytes_transferred;
186  if (buffer.nbytes() == 0)
187  {
188  write_queue_.pop_front();
189  }
190 
191  if (write_queue_.empty())
192  write_in_progress_ = false;
193  else
194  async_write(false);
195 }
196 
198 {
199  std::list<ReadBuffer> local_queue;
200 
201  while (true)
202  {
203  // wait for either new data or a shutdown request
204  std::unique_lock<std::mutex> lock(callback_mutex_);
205  condition_variable_.wait(lock, [this]{ return new_data_ || shutdown_requested_; });
206 
207  // if shutdown requested, end thread execution
209  {
210  break;
211  }
212 
213  // move data to local buffer
214  local_queue.splice(local_queue.end(), read_queue_);
215 
216  // release mutex to allow continued asynchronous read operations
217  new_data_ = false;
218  lock.unlock();
219 
220  // execute callbacks for all new data
221  while (!local_queue.empty())
222  {
223  ReadBuffer buffer = local_queue.front();
224  if (receive_callback_)
225  {
226  receive_callback_(buffer.data, buffer.len);
227  }
228  for (std::reference_wrapper<CommListener> listener_ref : listeners_)
229  {
230  listener_ref.get().receive_callback(buffer.data, buffer.len);
231  }
232  local_queue.pop_front();
233  }
234  }
235 }
236 
237 } // namespace async_comm
std::thread callback_thread_
Definition: comm.h:203
std::vector< std::reference_wrapper< CommListener > > listeners_
Definition: comm.h:217
Abstract base class for getting comm events via a listener interface.
Definition: comm.h:61
virtual void do_async_read(const boost::asio::mutable_buffers_1 &buffer, boost::function< void(const boost::system::error_code &, size_t)> handler)=0
static constexpr size_t WRITE_BUFFER_SIZE
Definition: comm.h:144
boost::asio::io_service io_service_
Definition: comm.h:157
bool write_in_progress_
Definition: comm.h:214
uint8_t read_buffer_[READ_BUFFER_SIZE]
Definition: comm.h:205
void async_read_end(const boost::system::error_code &error, size_t bytes_transferred)
Definition: comm.cpp:131
void send_bytes(const uint8_t *src, size_t len)
Send bytes from a buffer over the port.
Definition: comm.cpp:97
MessageHandler & message_handler_
Definition: comm.h:156
virtual void error(const std::string &message)=0
std::function< void(const uint8_t *, size_t)> receive_callback_
Definition: comm.h:216
bool init()
Initializes and opens the port.
Definition: comm.cpp:61
std::lock_guard< std::recursive_mutex > mutex_lock
Definition: comm.h:192
virtual bool is_open()=0
static DefaultMessageHandler default_message_handler_
Definition: comm.h:146
size_t nbytes() const
Definition: comm.h:189
void register_receive_callback(std::function< void(const uint8_t *, size_t)> fun)
Register a callback function for when bytes are received on the port.
Definition: comm.cpp:110
std::thread io_thread_
Definition: comm.h:202
std::list< WriteBuffer > write_queue_
Definition: comm.h:212
std::recursive_mutex write_mutex_
Definition: comm.h:213
Comm(MessageHandler &message_handler=default_message_handler_)
Set up asynchronous communication base class.
Definition: comm.cpp:48
void async_write(bool check_write_state)
Definition: comm.cpp:150
void close()
Closes the port.
Definition: comm.cpp:74
const uint8_t * dpos() const
Definition: comm.h:187
static constexpr size_t READ_BUFFER_SIZE
Definition: comm.h:143
void async_write_end(const boost::system::error_code &error, size_t bytes_transferred)
Definition: comm.cpp:168
std::condition_variable condition_variable_
Definition: comm.h:208
Abstract base class for message handler.
void async_read()
Definition: comm.cpp:120
std::list< ReadBuffer > read_queue_
Definition: comm.h:206
virtual bool do_init()=0
bool new_data_
Definition: comm.h:209
void register_listener(CommListener &listener)
Register a listener for when bytes are received on the port.
Definition: comm.cpp:115
virtual ~Comm()
Definition: comm.cpp:57
virtual void do_async_write(const boost::asio::const_buffers_1 &buffer, boost::function< void(const boost::system::error_code &, size_t)> handler)=0
bool shutdown_requested_
Definition: comm.h:210
uint8_t data[READ_BUFFER_SIZE]
Definition: comm.h:163
void process_callbacks()
Definition: comm.cpp:197
std::mutex callback_mutex_
Definition: comm.h:207
virtual void do_close()=0


async_comm
Author(s):
autogenerated on Fri May 14 2021 02:35:38