udp_client.h
Go to the documentation of this file.
1 // Copyright (c) 2020-2021 Pilz GmbH & Co. KG
2 //
3 // This program is free software: you can redistribute it and/or modify
4 // it under the terms of the GNU Lesser General Public License as published by
5 // the Free Software Foundation, either version 3 of the License, or
6 // (at your option) any later version.
7 //
8 // This program is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 // GNU Lesser General Public License for more details.
12 //
13 // You should have received a copy of the GNU Lesser General Public License
14 // along with this program. If not, see <https://www.gnu.org/licenses/>.
15 #ifndef PSEN_SCAN_V2_STANDALONE_UDP_CLIENT_H
16 #define PSEN_SCAN_V2_STANDALONE_UDP_CLIENT_H
17 
18 #include <functional>
19 #include <iostream>
20 #include <memory>
21 #include <stdexcept>
22 #include <string>
23 #include <thread>
24 #include <future>
25 
26 #ifdef __linux__
27 #include <arpa/inet.h>
28 #endif
29 
30 #ifdef _WIN32
31 #include <WinSock2.h>
32 #endif
33 
34 #include <boost/asio.hpp>
35 #include <boost/bind.hpp>
36 
40 
42 {
46 namespace communication_layer
47 {
48 using NewMessageCallback =
49  std::function<void(const data_conversion_layer::RawDataConstPtr&, const std::size_t&, const int64_t& timestamp)>;
50 using ErrorCallback = std::function<void(const std::string&)>;
51 
55 enum class ReceiveMode
56 {
58  single,
62 };
63 
75 {
76 public:
80  class CloseConnectionFailure : public std::runtime_error
81  {
82  public:
83  CloseConnectionFailure(const std::string& msg = "Failure while closing connection");
84  };
85 
89  class OpenConnectionFailure : public std::runtime_error
90  {
91  public:
92  OpenConnectionFailure(const std::string& msg = "Failure while opening connection");
93  };
94 
95 public:
108  UdpClientImpl(const NewMessageCallback& msg_callback,
109  const ErrorCallback& error_callback,
110  const unsigned short& host_port,
111  const unsigned int& endpoint_ip,
112  const unsigned short& endpoint_port);
113 
117  ~UdpClientImpl();
118 
119 public:
129 
135  void write(const data_conversion_layer::RawData& data);
136 
143  void stop();
144 
148  void close();
149 
153  boost::asio::ip::address_v4 hostIp();
154 
155 private:
156  void asyncReceive(const ReceiveMode& modi);
157 
158  void sendCompleteHandler(const boost::system::error_code& error, std::size_t bytes_transferred);
159 
160 private:
161  boost::asio::io_service io_service_;
162  // Prevent the run() method of the io_service from returning when there is no more work.
163  boost::asio::io_service::work work_{ io_service_ };
164  std::thread io_service_thread_;
165 
167 
170 
171  boost::asio::ip::udp::socket socket_;
172  boost::asio::ip::udp::endpoint endpoint_;
173 };
174 
176  const ErrorCallback& error_callback,
177  const unsigned short& host_port,
178  const unsigned int& endpoint_ip,
179  const unsigned short& endpoint_port)
180  : message_callback_(message_callback)
181  , error_callback_(error_callback)
182  , socket_(io_service_, boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), host_port))
183  , endpoint_(boost::asio::ip::address_v4(endpoint_ip), endpoint_port)
184 {
185  if (!message_callback)
186  {
187  throw std::invalid_argument("New message callback is invalid");
188  }
189 
190  if (!error_callback)
191  {
192  throw std::invalid_argument("Error callback is invalid");
193  }
194 
196  try
197  {
198  socket_.connect(endpoint_);
199  }
200  // LCOV_EXCL_START
201  // No coverage check because testing the socket is not the objective here.
202  catch (const boost::system::system_error& ex)
203  {
204  throw OpenConnectionFailure(ex.what());
205  }
206  // LCOV_EXCL_STOP
207 
208  assert(!io_service_thread_.joinable() && "io_service_thread_ is joinable!");
209  io_service_thread_ = std::thread([this]() { io_service_.run(); });
210 }
211 
212 inline void UdpClientImpl::stop()
213 {
214  io_service_.stop();
215 }
216 
217 inline void UdpClientImpl::close()
218 {
219  io_service_.stop();
220  if (io_service_thread_.joinable())
221  {
222  io_service_thread_.join();
223  }
224 
225  try
226  {
227  // Function is intended to be called from the main thread,
228  // therefore, the following socket operation happens on the main thread.
229  // To avoid concurrency issues, it has to be ensured that the io_service thread
230  // is finished before the socket close operation is performed.
231  socket_.close();
232  }
233  // LCOV_EXCL_START
234  // No coverage check because testing the socket is not the objective here.
235  catch (const boost::system::system_error& ex)
236  {
237  throw CloseConnectionFailure(ex.what());
238  }
239  // LCOV_EXCL_STOP
240 }
241 
242 inline boost::asio::ip::address_v4 UdpClientImpl::hostIp()
243 {
244  return socket_.local_endpoint().address().to_v4();
245 }
246 
248 {
249  try
250  {
251  close();
252  }
253  // LCOV_EXCL_START
254  // No coverage check because testing the socket is not the objective here.
255  catch (const CloseConnectionFailure& ex)
256  {
257  PSENSCAN_ERROR("UdpClient", ex.what());
258  }
259  // LCOV_EXCL_STOP
260 }
261 
262 inline void UdpClientImpl::sendCompleteHandler(const boost::system::error_code& error, std::size_t bytes_transferred)
263 {
264  // LCOV_EXCL_START
265  // No coverage check because testing the if-loop is extremly difficult.
266  if (error || bytes_transferred == 0)
267  {
268  PSENSCAN_ERROR("UdpClient", "Failed to send data. Error message: {}", error.message());
269  }
270 
271  // LCOV_EXCL_STOP
272  PSENSCAN_DEBUG("UdpClient", "Data successfully send.");
273 }
274 
276 {
277  io_service_.post([this, data]() {
278  socket_.async_send(boost::asio::buffer(data.data(), data.size()),
280  this,
281  boost::asio::placeholders::error,
282  boost::asio::placeholders::bytes_transferred));
283  });
284 }
285 
287 {
288  std::promise<void> post_done_barrier;
289  const auto post_done_future{ post_done_barrier.get_future() };
290  // Function is intended to be called from main thread.
291  // To ensure that socket operations only happen on one strand (in this case an implicit one),
292  // the asyncReceive() operation is scheduled as task to the io_service thread.
293  io_service_.post([this, modi, &post_done_barrier]() {
294  asyncReceive(modi);
295  post_done_barrier.set_value();
296  });
297  post_done_future.wait();
298 }
299 
300 inline void UdpClientImpl::asyncReceive(const ReceiveMode& modi)
301 {
302  socket_.async_receive(boost::asio::buffer(*received_data_, received_data_->size()),
303  [this, modi](const boost::system::error_code& error_code, const std::size_t& bytes_received) {
304  if (error_code || bytes_received == 0)
305  {
306  error_callback_(error_code.message());
307  }
308  else
309  {
310  message_callback_(received_data_, bytes_received, util::getCurrentTime());
311  }
312  if (modi == ReceiveMode::continuous)
313  {
314  asyncReceive(modi);
315  }
316  });
317 }
318 
319 inline UdpClientImpl::OpenConnectionFailure::OpenConnectionFailure(const std::string& msg) : std::runtime_error(msg)
320 {
321 }
322 
323 inline UdpClientImpl::CloseConnectionFailure::CloseConnectionFailure(const std::string& msg) : std::runtime_error(msg)
324 {
325 }
326 } // namespace communication_layer
327 } // namespace psen_scan_v2_standalone
328 #endif // PSEN_SCAN_V2_STANDALONE_UDP_CLIENT_H
psen_scan_v2_standalone::communication_layer::UdpClientImpl
Helper for asynchronously sending and receiving data via UDP.
Definition: udp_client.h:74
psen_scan_v2_standalone::communication_layer::UdpClientImpl::io_service_
boost::asio::io_service io_service_
Definition: udp_client.h:161
psen_scan_v2_standalone::communication_layer::UdpClientImpl::CloseConnectionFailure::CloseConnectionFailure
CloseConnectionFailure(const std::string &msg="Failure while closing connection")
Definition: udp_client.h:323
psen_scan_v2_standalone::communication_layer::UdpClientImpl::error_callback_
ErrorCallback error_callback_
Definition: udp_client.h:169
psen_scan_v2_standalone::communication_layer::UdpClientImpl::write
void write(const data_conversion_layer::RawData &data)
Asynchronously sends the specified data to the other endpoint.
Definition: udp_client.h:275
psen_scan_v2_standalone::communication_layer::UdpClientImpl::message_callback_
NewMessageCallback message_callback_
Definition: udp_client.h:168
psen_scan_v2_standalone::data_conversion_layer::RawDataConstPtr
std::shared_ptr< const RawData > RawDataConstPtr
Definition: raw_scanner_data.h:28
psen_scan_v2_standalone::communication_layer::UdpClientImpl::stop
void stop()
Stops the underlying io_service so that no messages are received anymore.
Definition: udp_client.h:212
psen_scan_v2_standalone::communication_layer::UdpClientImpl::OpenConnectionFailure::OpenConnectionFailure
OpenConnectionFailure(const std::string &msg="Failure while opening connection")
Definition: udp_client.h:319
boost
PSENSCAN_DEBUG
#define PSENSCAN_DEBUG(name,...)
Definition: logging.h:63
psen_scan_v2_standalone::communication_layer::UdpClientImpl::~UdpClientImpl
~UdpClientImpl()
Closes the UDP connection and stops all pending asynchronous operation.
Definition: udp_client.h:247
psen_scan_v2_standalone::communication_layer::ReceiveMode::single
@ single
Wait for one message and then stop listening.
psen_scan_v2_standalone::data_conversion_layer::RawDataPtr
std::shared_ptr< RawData > RawDataPtr
Definition: raw_scanner_data.h:27
psen_scan_v2_standalone::communication_layer::UdpClientImpl::work_
boost::asio::io_service::work work_
Definition: udp_client.h:163
psen_scan_v2_standalone::communication_layer::UdpClientImpl::io_service_thread_
std::thread io_service_thread_
Definition: udp_client.h:164
psen_scan_v2_standalone::communication_layer::UdpClientImpl::socket_
boost::asio::ip::udp::socket socket_
Definition: udp_client.h:171
raw_scanner_data.h
psen_scan_v2_standalone::communication_layer::UdpClientImpl::UdpClientImpl
UdpClientImpl(const NewMessageCallback &msg_callback, const ErrorCallback &error_callback, const unsigned short &host_port, const unsigned int &endpoint_ip, const unsigned short &endpoint_port)
Opens an UDP connection.
Definition: udp_client.h:175
psen_scan_v2_standalone::communication_layer::UdpClientImpl::asyncReceive
void asyncReceive(const ReceiveMode &modi)
Definition: udp_client.h:300
psen_scan_v2_standalone::data_conversion_layer::MAX_UDP_PAKET_SIZE
static constexpr std::size_t MAX_UDP_PAKET_SIZE
Definition: raw_scanner_data.h:26
psen_scan_v2_standalone::communication_layer::UdpClientImpl::close
void close()
Closes the UDP connection and stops all pending asynchronous operation.
Definition: udp_client.h:217
psen_scan_v2_standalone::communication_layer::UdpClientImpl::OpenConnectionFailure
Exception thrown if the UDP socket cannot be opened.
Definition: udp_client.h:89
psen_scan_v2_standalone::communication_layer::UdpClientImpl::received_data_
data_conversion_layer::RawDataPtr received_data_
Definition: udp_client.h:166
std
psen_scan_v2_standalone::communication_layer::ReceiveMode
ReceiveMode
Lists the different possible receive modi.
Definition: udp_client.h:55
psen_scan_v2_standalone::communication_layer::UdpClientImpl::endpoint_
boost::asio::ip::udp::endpoint endpoint_
Definition: udp_client.h:172
logging.h
psen_scan_v2_standalone
Root namespace in which the software components to communicate with the scanner (firmware-version: 2)...
Definition: udp_client.h:41
psen_scan_v2_standalone::communication_layer::UdpClientImpl::hostIp
boost::asio::ip::address_v4 hostIp()
Returns local ip address of current socket connection.
Definition: udp_client.h:242
PSENSCAN_ERROR
#define PSENSCAN_ERROR(name,...)
Definition: logging.h:60
psen_scan_v2_standalone::communication_layer::ErrorCallback
std::function< void(const std::string &)> ErrorCallback
Definition: udp_client.h:50
psen_scan_v2_standalone::communication_layer::NewMessageCallback
std::function< void(const data_conversion_layer::RawDataConstPtr &, const std::size_t &, const int64_t &timestamp)> NewMessageCallback
Definition: udp_client.h:49
psen_scan_v2_standalone::data_conversion_layer::RawData
std::vector< char > RawData
Definition: raw_scanner_data.h:25
timestamp.h
psen_scan_v2_standalone::communication_layer::ReceiveMode::continuous
@ continuous
Continuously wait for new messages. In other words, after a message is received, automatically start ...
psen_scan_v2_standalone::communication_layer::UdpClientImpl::CloseConnectionFailure
Exception thrown if the UDP socket cannot be closed.
Definition: udp_client.h:80
psen_scan_v2_standalone::communication_layer::UdpClientImpl::sendCompleteHandler
void sendCompleteHandler(const boost::system::error_code &error, std::size_t bytes_transferred)
Definition: udp_client.h:262
psen_scan_v2_standalone::communication_layer::UdpClientImpl::startAsyncReceiving
void startAsyncReceiving(const ReceiveMode &modi=ReceiveMode::continuous)
Starts an asynchronous process listing to (a) new message(s) from the other endpoint....
Definition: udp_client.h:286
error
def error(*args, **kwargs)


psen_scan_v2
Author(s): Pilz GmbH + Co. KG
autogenerated on Sat Nov 25 2023 03:46:26