comm.cpp
Go to the documentation of this file.
1 /*
2  * Software License Agreement (BSD-3 License)
3  *
4  * Copyright (c) 2018 Daniel Koch.
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * * Redistributions of source code must retain the above copyright notice, this
11  * list of conditions and the following disclaimer.
12  *
13  * * Redistributions in binary form must reproduce the above copyright notice,
14  * this list of conditions and the following disclaimer in the documentation
15  * and/or other materials provided with the distribution.
16  *
17  * * Neither the name of the copyright holder nor the names of its
18  * contributors may be used to endorse or promote products derived from
19  * this software without specific prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
22  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
24  * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
25  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
26  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
27  * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
28  * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
29  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32 
38 #include <async_comm/comm.h>
39 
40 #include <iostream>
41 #include <boost/bind.hpp>
42 
43 namespace async_comm
44 {
45 
47  io_service_(),
48  new_data_(false),
49  shutdown_requested_(false),
50  write_in_progress_(false)
51 {
52 }
53 
55 {
56 }
57 
58 bool Comm::init()
59 {
60  if (!do_init())
61  return false;
62 
63  callback_thread_ = std::thread(std::bind(&Comm::process_callbacks, this));
64 
65  async_read();
66  io_thread_ = std::thread(boost::bind(&boost::asio::io_service::run, &this->io_service_));
67 
68  return true;
69 }
70 
72 {
73  // send shutdown signal to callback thread
74  {
75  std::unique_lock<std::mutex> lock(callback_mutex_);
76  shutdown_requested_ = true;
77  }
78  condition_variable_.notify_one();
79 
80  io_service_.stop();
81  do_close();
82 
83  if (io_thread_.joinable())
84  {
85  io_thread_.join();
86  }
87 
88  if (callback_thread_.joinable())
89  {
90  callback_thread_.join();
91  }
92 }
93 
94 void Comm::send_bytes(const uint8_t *src, size_t len)
95 {
97 
98  for (size_t pos = 0; pos < len; pos += WRITE_BUFFER_SIZE)
99  {
100  size_t num_bytes = (len - pos) > WRITE_BUFFER_SIZE ? WRITE_BUFFER_SIZE : (len - pos);
101  write_queue_.emplace_back(src + pos, num_bytes);
102  }
103 
104  async_write(true);
105 }
106 
107 void Comm::register_receive_callback(std::function<void(const uint8_t*, size_t)> fun)
108 {
109  receive_callback_ = fun;
110 }
111 
113 {
114  if (!is_open()) return;
115 
116  do_async_read(boost::asio::buffer(read_buffer_, READ_BUFFER_SIZE),
117  boost::bind(&Comm::async_read_end,
118  this,
119  boost::asio::placeholders::error,
120  boost::asio::placeholders::bytes_transferred));
121 }
122 
123 void Comm::async_read_end(const boost::system::error_code &error, size_t bytes_transferred)
124 {
125  if (error)
126  {
127  std::cerr << error.message() << std::endl;
128  close();
129  return;
130  }
131 
132  {
133  std::unique_lock<std::mutex> lock(callback_mutex_);
134  read_queue_.emplace_back(read_buffer_, bytes_transferred);
135  new_data_ = true;
136  }
137  condition_variable_.notify_one();
138 
139  async_read();
140 }
141 
142 void Comm::async_write(bool check_write_state)
143 {
144  if (check_write_state && write_in_progress_)
145  return;
146 
147  mutex_lock lock(write_mutex_);
148  if (write_queue_.empty())
149  return;
150 
151  write_in_progress_ = true;
152  WriteBuffer& buffer = write_queue_.front();
153  do_async_write(boost::asio::buffer(buffer.dpos(), buffer.nbytes()),
154  boost::bind(&Comm::async_write_end,
155  this,
156  boost::asio::placeholders::error,
157  boost::asio::placeholders::bytes_transferred));
158 }
159 
160 void Comm::async_write_end(const boost::system::error_code &error, size_t bytes_transferred)
161 {
162  if (error)
163  {
164  std::cerr << error.message() << std::endl;
165  close();
166  return;
167  }
168 
169  mutex_lock lock(write_mutex_);
170  if (write_queue_.empty())
171  {
172  write_in_progress_ = false;
173  return;
174  }
175 
176  WriteBuffer& buffer = write_queue_.front();
177  buffer.pos += bytes_transferred;
178  if (buffer.nbytes() == 0)
179  {
180  write_queue_.pop_front();
181  }
182 
183  if (write_queue_.empty())
184  write_in_progress_ = false;
185  else
186  async_write(false);
187 }
188 
190 {
191  std::list<ReadBuffer> local_queue;
192 
193  while (true)
194  {
195  // wait for either new data or a shutdown request
196  std::unique_lock<std::mutex> lock(callback_mutex_);
197  condition_variable_.wait(lock, [this]{ return new_data_ || shutdown_requested_; });
198 
199  // if shutdown requested, end thread execution
201  {
202  break;
203  }
204 
205  // move data to local buffer
206  local_queue.splice(local_queue.end(), read_queue_);
207 
208  // release mutex to allow continued asynchronous read operations
209  new_data_ = false;
210  lock.unlock();
211 
212  // execute callback for all new data
213  while (!local_queue.empty())
214  {
215  ReadBuffer buffer = local_queue.front();
216  receive_callback_(buffer.data, buffer.len);
217  local_queue.pop_front();
218  }
219  }
220 }
221 
222 } // namespace async_comm
std::thread callback_thread_
Definition: comm.h:163
virtual void do_async_read(const boost::asio::mutable_buffers_1 &buffer, boost::function< void(const boost::system::error_code &, size_t)> handler)=0
static constexpr size_t WRITE_BUFFER_SIZE
Definition: comm.h:107
boost::asio::io_service io_service_
Definition: comm.h:117
bool write_in_progress_
Definition: comm.h:174
uint8_t read_buffer_[READ_BUFFER_SIZE]
Definition: comm.h:165
void async_read_end(const boost::system::error_code &error, size_t bytes_transferred)
Definition: comm.cpp:123
void send_bytes(const uint8_t *src, size_t len)
Send bytes from a buffer over the port.
Definition: comm.cpp:94
bool init()
Initializes and opens the port.
Definition: comm.cpp:58
std::lock_guard< std::recursive_mutex > mutex_lock
Definition: comm.h:152
virtual bool is_open()=0
size_t nbytes() const
Definition: comm.h:149
void register_receive_callback(std::function< void(const uint8_t *, size_t)> fun)
Register a callback function for when bytes are received on the port.
Definition: comm.cpp:107
std::thread io_thread_
Definition: comm.h:162
std::list< WriteBuffer > write_queue_
Definition: comm.h:172
std::recursive_mutex write_mutex_
Definition: comm.h:173
void async_write(bool check_write_state)
Definition: comm.cpp:142
void close()
Closes the port.
Definition: comm.cpp:71
const uint8_t * dpos() const
Definition: comm.h:147
static constexpr size_t READ_BUFFER_SIZE
Definition: comm.h:106
void async_write_end(const boost::system::error_code &error, size_t bytes_transferred)
Definition: comm.cpp:160
std::condition_variable condition_variable_
Definition: comm.h:168
void async_read()
Definition: comm.cpp:112
std::list< ReadBuffer > read_queue_
Definition: comm.h:166
virtual bool do_init()=0
bool new_data_
Definition: comm.h:169
std::function< void(const uint8_t *, size_t)> receive_callback_
Definition: comm.h:176
virtual ~Comm()
Definition: comm.cpp:54
virtual void do_async_write(const boost::asio::const_buffers_1 &buffer, boost::function< void(const boost::system::error_code &, size_t)> handler)=0
bool shutdown_requested_
Definition: comm.h:170
uint8_t data[READ_BUFFER_SIZE]
Definition: comm.h:123
void process_callbacks()
Definition: comm.cpp:189
std::mutex callback_mutex_
Definition: comm.h:167
virtual void do_close()=0


async_comm
Author(s):
autogenerated on Thu May 16 2019 03:03:47