connection.h
Go to the documentation of this file.
1 /*
2  * Copyright 2018 The urg_stamped Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef SCIP2_CONNECTION_H
18 #define SCIP2_CONNECTION_H
19 
20 #include <memory>
21 #include <string>
22 
23 #include <boost/asio.hpp>
24 #include <boost/bind/bind.hpp>
25 
26 #include <scip2/logger.h>
27 
28 namespace scip2
29 {
30 class Protocol;
32 {
33  friend class Protocol;
34 
35 protected:
36  using CallbackConnect = boost::function<void(void)>;
37  using CallbackClose = boost::function<void(void)>;
38  using CallbackReceive = boost::function<void(
39  boost::asio::streambuf&, const boost::posix_time::ptime&)>;
40  using CallbackSend = boost::function<void(
41  const boost::posix_time::ptime&)>;
42 
46 
47  void close()
48  {
49  if (cb_close_)
50  cb_close_();
51  }
52  void connect()
53  {
54  if (cb_connect_)
55  cb_connect_();
56  }
57  void receive(
58  boost::asio::streambuf& buf,
59  const boost::posix_time::ptime& time_read)
60  {
61  if (cb_receive_)
62  cb_receive_(buf, time_read);
63  }
64 
65 public:
66  using Ptr = std::shared_ptr<Connection>;
67 
68  virtual void spin() = 0;
69  virtual void stop() = 0;
70  virtual void send(const std::string&, CallbackSend = CallbackSend()) = 0;
71  virtual void startWatchdog(const boost::posix_time::time_duration&) = 0;
72 
74  {
75  cb_close_ = cb;
76  }
78  {
79  cb_receive_ = cb;
80  }
82  {
83  cb_connect_ = cb;
84  }
86  {
87  }
88 };
89 
90 class ConnectionTcp : public Connection
91 {
92 protected:
93  boost::asio::io_service io_;
94  boost::asio::ip::tcp::socket socket_;
95  boost::asio::streambuf buf_;
96  boost::asio::deadline_timer timeout_;
97  boost::asio::deadline_timer watchdog_;
98  boost::posix_time::time_duration watchdog_duration_;
99 
101  {
102  if (watchdog_duration_ == boost::posix_time::time_duration())
103  return;
104  watchdog_.cancel();
105 
106  watchdog_.expires_from_now(watchdog_duration_);
107  watchdog_.async_wait(
109  }
110  void onWatchdog(const boost::system::error_code& error)
111  {
112  if (!error)
113  {
114  logger::fatal() << "Watchdog timeout" << std::endl;
115  close();
116  }
117  }
118 
119  void onReceive(const boost::system::error_code& error)
120  {
121  const auto time_read = boost::posix_time::microsec_clock::universal_time();
122  if (error)
123  {
124  logger::fatal() << "Receive error" << std::endl;
125  close();
126  return;
127  }
128  clearWatchdog();
129  receive(buf_, time_read);
130  asyncRead();
131  }
132  void onSend(const boost::system::error_code& error, CallbackSend cb)
133  {
134  const auto time_send = boost::posix_time::microsec_clock::universal_time();
135  if (error)
136  {
137  logger::fatal() << "Send error" << std::endl;
138  close();
139  return;
140  }
141  if (cb)
142  cb(time_send);
143  }
144 
145  void asyncRead()
146  {
147  boost::asio::async_read_until(
148  socket_, buf_, "\n\n",
150  }
151  void onConnect(const boost::system::error_code& error)
152  {
153  if (error)
154  {
155  logger::fatal() << "Connection error" << std::endl;
156  close();
157  return;
158  }
159  timeout_.cancel();
160  connect();
161  asyncRead();
162  }
163  void onConnectTimeout(const boost::system::error_code& error)
164  {
165  if (!error)
166  {
167  logger::fatal() << "Connection timeout" << std::endl;
168  close();
169  return;
170  }
171  }
172 
173 public:
174  using Ptr = std::shared_ptr<ConnectionTcp>;
175 
176  ConnectionTcp(const std::string& ip, const uint16_t port)
177  : socket_(io_)
178  , timeout_(io_)
179  , watchdog_(io_)
180  {
181  boost::asio::ip::tcp::endpoint endpoint(
182  boost::asio::ip::address::from_string(ip), port);
183  socket_.async_connect(
184  endpoint,
185  boost::bind(
188 
189  timeout_.expires_from_now(boost::posix_time::seconds(2));
190  timeout_.async_wait(
191  boost::bind(
194  }
195  void spin()
196  {
197  io_.run();
198  }
199  void stop()
200  {
201  io_.stop();
202  }
203  void send(const std::string& data, CallbackSend cb = CallbackSend())
204  {
205  boost::shared_ptr<std::string> buf(new std::string(data));
206  boost::asio::async_write(
207  socket_, boost::asio::buffer(*buf),
208  boost::bind(
211  }
212  void startWatchdog(const boost::posix_time::time_duration& duration)
213  {
214  watchdog_duration_ = duration;
215  clearWatchdog();
216  }
217 };
218 
219 } // namespace scip2
220 
221 #endif // SCIP2_CONNECTION_H
virtual void send(const std::string &, CallbackSend=CallbackSend())=0
boost::asio::deadline_timer watchdog_
Definition: connection.h:97
std::shared_ptr< Connection > Ptr
Definition: connection.h:66
void receive(boost::asio::streambuf &buf, const boost::posix_time::ptime &time_read)
Definition: connection.h:57
void onConnectTimeout(const boost::system::error_code &error)
Definition: connection.h:163
void onConnect(const boost::system::error_code &error)
Definition: connection.h:151
void onSend(const boost::system::error_code &error, CallbackSend cb)
Definition: connection.h:132
void registerConnectCallback(CallbackConnect cb)
Definition: connection.h:81
ConnectionTcp(const std::string &ip, const uint16_t port)
Definition: connection.h:176
void startWatchdog(const boost::posix_time::time_duration &duration)
Definition: connection.h:212
virtual void startWatchdog(const boost::posix_time::time_duration &)=0
std::ostream & fatal()
Definition: logger.cpp:109
void send(const std::string &data, CallbackSend cb=CallbackSend())
Definition: connection.h:203
boost::function< void(const boost::posix_time::ptime &)> CallbackSend
Definition: connection.h:41
void registerReceiveCallback(CallbackReceive cb)
Definition: connection.h:77
void onReceive(const boost::system::error_code &error)
Definition: connection.h:119
boost::function< void(boost::asio::streambuf &, const boost::posix_time::ptime &)> CallbackReceive
Definition: connection.h:39
boost::function< void(void)> CallbackConnect
Definition: connection.h:36
boost::asio::ip::tcp::socket socket_
Definition: connection.h:94
boost::function< void(void)> CallbackClose
Definition: connection.h:37
void registerCloseCallback(CallbackClose cb)
Definition: connection.h:73
void onWatchdog(const boost::system::error_code &error)
Definition: connection.h:110
boost::asio::streambuf buf_
Definition: connection.h:95
CallbackConnect cb_connect_
Definition: connection.h:43
CallbackClose cb_close_
Definition: connection.h:44
boost::asio::deadline_timer timeout_
Definition: connection.h:96
boost::asio::io_service io_
Definition: connection.h:93
virtual void spin()=0
boost::posix_time::time_duration watchdog_duration_
Definition: connection.h:98
std::ostream & error()
Definition: logger.cpp:105
virtual void stop()=0
CallbackReceive cb_receive_
Definition: connection.h:45


urg_stamped
Author(s): Atsushi Watanabe
autogenerated on Tue May 11 2021 02:14:05