connection.h
Go to the documentation of this file.
1 /*
2  * Copyright 2018 The urg_stamped Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef SCIP2_CONNECTION_H
18 #define SCIP2_CONNECTION_H
19 
20 #include <boost/asio.hpp>
21 #include <boost/bind/bind.hpp>
22 
23 #include <string>
24 
25 namespace scip2
26 {
27 class Protocol;
29 {
30  friend class Protocol;
31 
32 protected:
33  using CallbackConnect = boost::function<void(void)>;
34  using CallbackClose = boost::function<void(void)>;
35  using CallbackReceive = boost::function<void(
36  boost::asio::streambuf &, const boost::posix_time::ptime &)>;
37  using CallbackSend = boost::function<void(
38  const boost::posix_time::ptime &)>;
39 
43 
44  void close()
45  {
46  if (cb_close_)
47  cb_close_();
48  }
49  void connect()
50  {
51  if (cb_connect_)
52  cb_connect_();
53  }
54  void receive(
55  boost::asio::streambuf &buf,
56  const boost::posix_time::ptime &time_read)
57  {
58  if (cb_receive_)
59  cb_receive_(buf, time_read);
60  }
61 
62 public:
63  using Ptr = std::shared_ptr<Connection>;
64 
65  virtual void spin() = 0;
66  virtual void stop() = 0;
67  virtual void send(const std::string &, CallbackSend = CallbackSend()) = 0;
68  virtual void startWatchdog(const boost::posix_time::time_duration &) = 0;
69 
71  {
72  cb_close_ = cb;
73  }
75  {
76  cb_receive_ = cb;
77  }
79  {
80  cb_connect_ = cb;
81  }
83  {
84  }
85 };
86 
87 class ConnectionTcp : public Connection
88 {
89 protected:
90  boost::asio::io_service io_;
91  boost::asio::ip::tcp::socket socket_;
92  boost::asio::streambuf buf_;
93  boost::asio::deadline_timer timeout_;
94  boost::asio::deadline_timer watchdog_;
95  boost::posix_time::time_duration watchdog_duration_;
96 
98  {
99  if (watchdog_duration_ == boost::posix_time::time_duration())
100  return;
101  watchdog_.cancel();
102 
103  watchdog_.expires_from_now(watchdog_duration_);
104  watchdog_.async_wait(
105  boost::bind(&ConnectionTcp::onWatchdog, this, boost::asio::placeholders::error));
106  }
107  void onWatchdog(const boost::system::error_code &error)
108  {
109  if (!error)
110  {
111  std::cerr << "Watchdog timeout" << std::endl;
112  close();
113  }
114  }
115 
116  void onReceive(const boost::system::error_code &error)
117  {
118  const auto time_read = boost::posix_time::microsec_clock::universal_time();
119  if (error)
120  {
121  std::cerr << "Receive error" << std::endl;
122  close();
123  return;
124  }
125  clearWatchdog();
126  receive(buf_, time_read);
127  asyncRead();
128  }
129  void onSend(const boost::system::error_code &error, CallbackSend cb)
130  {
131  const auto time_send = boost::posix_time::microsec_clock::universal_time();
132  if (error)
133  {
134  std::cerr << "Send error" << std::endl;
135  close();
136  return;
137  }
138  if (cb)
139  cb(time_send);
140  }
141 
142  void asyncRead()
143  {
144  boost::asio::async_read_until(
145  socket_, buf_, "\n\n",
146  boost::bind(&ConnectionTcp::onReceive, this, boost::asio::placeholders::error));
147  }
148  void onConnect(const boost::system::error_code &error)
149  {
150  if (error)
151  {
152  std::cerr << "Connection error" << std::endl;
153  close();
154  return;
155  }
156  timeout_.cancel();
157  connect();
158  asyncRead();
159  }
160  void onConnectTimeout(const boost::system::error_code &error)
161  {
162  if (!error)
163  {
164  std::cerr << "Connection timeout" << std::endl;
165  close();
166  return;
167  }
168  }
169 
170 public:
171  using Ptr = std::shared_ptr<ConnectionTcp>;
172 
173  ConnectionTcp(const std::string &ip, const uint16_t port)
174  : socket_(io_)
175  , timeout_(io_)
176  , watchdog_(io_)
177  {
178  boost::asio::ip::tcp::endpoint endpoint(
179  boost::asio::ip::address::from_string(ip), port);
180  socket_.async_connect(
181  endpoint,
182  boost::bind(
184  this, boost::asio::placeholders::error));
185 
186  timeout_.expires_from_now(boost::posix_time::seconds(2));
187  timeout_.async_wait(
188  boost::bind(
190  this, boost::asio::placeholders::error));
191  }
192  void spin()
193  {
194  io_.run();
195  }
196  void stop()
197  {
198  io_.stop();
199  }
200  void send(const std::string &data, CallbackSend cb = CallbackSend())
201  {
202  boost::shared_ptr<std::string> buf(new std::string(data));
203  boost::asio::async_write(
204  socket_, boost::asio::buffer(*buf),
205  boost::bind(
207  this, boost::asio::placeholders::error, cb));
208  }
209  void startWatchdog(const boost::posix_time::time_duration &duration)
210  {
211  watchdog_duration_ = duration;
212  clearWatchdog();
213  }
214 };
215 
216 } // namespace scip2
217 
218 #endif // SCIP2_CONNECTION_H
virtual void send(const std::string &, CallbackSend=CallbackSend())=0
boost::asio::deadline_timer watchdog_
Definition: connection.h:94
std::shared_ptr< Connection > Ptr
Definition: connection.h:63
void receive(boost::asio::streambuf &buf, const boost::posix_time::ptime &time_read)
Definition: connection.h:54
boost::function< void(const boost::posix_time::ptime &)> CallbackSend
Definition: connection.h:38
void onConnectTimeout(const boost::system::error_code &error)
Definition: connection.h:160
void onConnect(const boost::system::error_code &error)
Definition: connection.h:148
void onSend(const boost::system::error_code &error, CallbackSend cb)
Definition: connection.h:129
boost::function< void(boost::asio::streambuf &, const boost::posix_time::ptime &)> CallbackReceive
Definition: connection.h:36
void registerConnectCallback(CallbackConnect cb)
Definition: connection.h:78
ConnectionTcp(const std::string &ip, const uint16_t port)
Definition: connection.h:173
void startWatchdog(const boost::posix_time::time_duration &duration)
Definition: connection.h:209
virtual void startWatchdog(const boost::posix_time::time_duration &)=0
void send(const std::string &data, CallbackSend cb=CallbackSend())
Definition: connection.h:200
void registerReceiveCallback(CallbackReceive cb)
Definition: connection.h:74
void onReceive(const boost::system::error_code &error)
Definition: connection.h:116
boost::function< void(void)> CallbackConnect
Definition: connection.h:33
boost::asio::ip::tcp::socket socket_
Definition: connection.h:91
boost::function< void(void)> CallbackClose
Definition: connection.h:34
void registerCloseCallback(CallbackClose cb)
Definition: connection.h:70
void onWatchdog(const boost::system::error_code &error)
Definition: connection.h:107
boost::asio::streambuf buf_
Definition: connection.h:92
CallbackConnect cb_connect_
Definition: connection.h:40
CallbackClose cb_close_
Definition: connection.h:41
boost::asio::deadline_timer timeout_
Definition: connection.h:93
boost::asio::io_service io_
Definition: connection.h:90
virtual void spin()=0
boost::posix_time::time_duration watchdog_duration_
Definition: connection.h:95
virtual void stop()=0
CallbackReceive cb_receive_
Definition: connection.h:42


urg_stamped
Author(s): Atsushi Watanabe
autogenerated on Thu Jun 6 2019 19:55:58