Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 #ifndef SCIP2_CONNECTION_H
00018 #define SCIP2_CONNECTION_H
00019
00020 #include <boost/asio.hpp>
00021 #include <boost/bind/bind.hpp>
00022
00023 #include <string>
00024
00025 namespace scip2
00026 {
00027 class Protocol;
00028 class Connection
00029 {
00030 friend class Protocol;
00031
00032 protected:
00033 using CallbackConnect = boost::function<void(void)>;
00034 using CallbackClose = boost::function<void(void)>;
00035 using CallbackReceive = boost::function<void(
00036 boost::asio::streambuf &, const boost::posix_time::ptime &)>;
00037 using CallbackSend = boost::function<void(
00038 const boost::posix_time::ptime &)>;
00039
00040 CallbackConnect cb_connect_;
00041 CallbackClose cb_close_;
00042 CallbackReceive cb_receive_;
00043
00044 void close()
00045 {
00046 if (cb_close_)
00047 cb_close_();
00048 }
00049 void connect()
00050 {
00051 if (cb_connect_)
00052 cb_connect_();
00053 }
00054 void receive(
00055 boost::asio::streambuf &buf,
00056 const boost::posix_time::ptime &time_read)
00057 {
00058 if (cb_receive_)
00059 cb_receive_(buf, time_read);
00060 }
00061
00062 public:
00063 using Ptr = std::shared_ptr<Connection>;
00064
00065 virtual void spin() = 0;
00066 virtual void stop() = 0;
00067 virtual void send(const std::string &, CallbackSend = CallbackSend()) = 0;
00068 virtual void startWatchdog(const boost::posix_time::time_duration &) = 0;
00069
00070 void registerCloseCallback(CallbackClose cb)
00071 {
00072 cb_close_ = cb;
00073 }
00074 void registerReceiveCallback(CallbackReceive cb)
00075 {
00076 cb_receive_ = cb;
00077 }
00078 void registerConnectCallback(CallbackConnect cb)
00079 {
00080 cb_connect_ = cb;
00081 }
00082 Connection()
00083 {
00084 }
00085 };
00086
00087 class ConnectionTcp : public Connection
00088 {
00089 protected:
00090 boost::asio::io_service io_;
00091 boost::asio::ip::tcp::socket socket_;
00092 boost::asio::streambuf buf_;
00093 boost::asio::deadline_timer timeout_;
00094 boost::asio::deadline_timer watchdog_;
00095 boost::posix_time::time_duration watchdog_duration_;
00096
00097 void clearWatchdog()
00098 {
00099 if (watchdog_duration_ == boost::posix_time::time_duration())
00100 return;
00101 watchdog_.cancel();
00102
00103 watchdog_.expires_from_now(watchdog_duration_);
00104 watchdog_.async_wait(
00105 boost::bind(&ConnectionTcp::onWatchdog, this, boost::asio::placeholders::error));
00106 }
00107 void onWatchdog(const boost::system::error_code &error)
00108 {
00109 if (!error)
00110 {
00111 std::cerr << "Watchdog timeout" << std::endl;
00112 close();
00113 }
00114 }
00115
00116 void onReceive(const boost::system::error_code &error)
00117 {
00118 const auto time_read = boost::posix_time::microsec_clock::universal_time();
00119 if (error)
00120 {
00121 std::cerr << "Receive error" << std::endl;
00122 close();
00123 return;
00124 }
00125 clearWatchdog();
00126 receive(buf_, time_read);
00127 asyncRead();
00128 }
00129 void onSend(const boost::system::error_code &error, CallbackSend cb)
00130 {
00131 const auto time_send = boost::posix_time::microsec_clock::universal_time();
00132 if (error)
00133 {
00134 std::cerr << "Send error" << std::endl;
00135 close();
00136 return;
00137 }
00138 if (cb)
00139 cb(time_send);
00140 }
00141
00142 void asyncRead()
00143 {
00144 boost::asio::async_read_until(
00145 socket_, buf_, "\n\n",
00146 boost::bind(&ConnectionTcp::onReceive, this, boost::asio::placeholders::error));
00147 }
00148 void onConnect(const boost::system::error_code &error)
00149 {
00150 if (error)
00151 {
00152 std::cerr << "Connection error" << std::endl;
00153 close();
00154 return;
00155 }
00156 timeout_.cancel();
00157 connect();
00158 asyncRead();
00159 }
00160 void onConnectTimeout(const boost::system::error_code &error)
00161 {
00162 if (!error)
00163 {
00164 std::cerr << "Connection timeout" << std::endl;
00165 close();
00166 return;
00167 }
00168 }
00169
00170 public:
00171 using Ptr = std::shared_ptr<ConnectionTcp>;
00172
00173 ConnectionTcp(const std::string &ip, const uint16_t port)
00174 : socket_(io_)
00175 , timeout_(io_)
00176 , watchdog_(io_)
00177 {
00178 boost::asio::ip::tcp::endpoint endpoint(
00179 boost::asio::ip::address::from_string(ip), port);
00180 socket_.async_connect(
00181 endpoint,
00182 boost::bind(
00183 &ConnectionTcp::onConnect,
00184 this, boost::asio::placeholders::error));
00185
00186 timeout_.expires_from_now(boost::posix_time::seconds(2));
00187 timeout_.async_wait(
00188 boost::bind(
00189 &ConnectionTcp::onConnectTimeout,
00190 this, boost::asio::placeholders::error));
00191 }
00192 void spin()
00193 {
00194 io_.run();
00195 }
00196 void stop()
00197 {
00198 io_.stop();
00199 }
00200 void send(const std::string &data, CallbackSend cb = CallbackSend())
00201 {
00202 boost::shared_ptr<std::string> buf(new std::string(data));
00203 boost::asio::async_write(
00204 socket_, boost::asio::buffer(*buf),
00205 boost::bind(
00206 &ConnectionTcp::onSend,
00207 this, boost::asio::placeholders::error, cb));
00208 }
00209 void startWatchdog(const boost::posix_time::time_duration &duration)
00210 {
00211 watchdog_duration_ = duration;
00212 clearWatchdog();
00213 }
00214 };
00215
00216 }
00217
00218 #endif // SCIP2_CONNECTION_H