connection.h
Go to the documentation of this file.
00001 /*
00002  * Copyright 2018 The urg_stamped Authors
00003  *
00004  * Licensed under the Apache License, Version 2.0 (the "License");
00005  * you may not use this file except in compliance with the License.
00006  * You may obtain a copy of the License at
00007  *
00008  * http://www.apache.org/licenses/LICENSE-2.0
00009  *
00010  * Unless required by applicable law or agreed to in writing, software
00011  * distributed under the License is distributed on an "AS IS" BASIS,
00012  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00013  * See the License for the specific language governing permissions and
00014  * limitations under the License.
00015  */
00016 
00017 #ifndef SCIP2_CONNECTION_H
00018 #define SCIP2_CONNECTION_H
00019 
00020 #include <boost/asio.hpp>
00021 #include <boost/bind/bind.hpp>
00022 
00023 #include <string>
00024 
00025 namespace scip2
00026 {
00027 class Protocol;
00028 class Connection
00029 {
00030   friend class Protocol;
00031 
00032 protected:
00033   using CallbackConnect = boost::function<void(void)>;
00034   using CallbackClose = boost::function<void(void)>;
00035   using CallbackReceive = boost::function<void(
00036       boost::asio::streambuf &, const boost::posix_time::ptime &)>;
00037   using CallbackSend = boost::function<void(
00038       const boost::posix_time::ptime &)>;
00039 
00040   CallbackConnect cb_connect_;
00041   CallbackClose cb_close_;
00042   CallbackReceive cb_receive_;
00043 
00044   void close()
00045   {
00046     if (cb_close_)
00047       cb_close_();
00048   }
00049   void connect()
00050   {
00051     if (cb_connect_)
00052       cb_connect_();
00053   }
00054   void receive(
00055       boost::asio::streambuf &buf,
00056       const boost::posix_time::ptime &time_read)
00057   {
00058     if (cb_receive_)
00059       cb_receive_(buf, time_read);
00060   }
00061 
00062 public:
00063   using Ptr = std::shared_ptr<Connection>;
00064 
00065   virtual void spin() = 0;
00066   virtual void stop() = 0;
00067   virtual void send(const std::string &, CallbackSend = CallbackSend()) = 0;
00068   virtual void startWatchdog(const boost::posix_time::time_duration &) = 0;
00069 
00070   void registerCloseCallback(CallbackClose cb)
00071   {
00072     cb_close_ = cb;
00073   }
00074   void registerReceiveCallback(CallbackReceive cb)
00075   {
00076     cb_receive_ = cb;
00077   }
00078   void registerConnectCallback(CallbackConnect cb)
00079   {
00080     cb_connect_ = cb;
00081   }
00082   Connection()
00083   {
00084   }
00085 };
00086 
00087 class ConnectionTcp : public Connection
00088 {
00089 protected:
00090   boost::asio::io_service io_;
00091   boost::asio::ip::tcp::socket socket_;
00092   boost::asio::streambuf buf_;
00093   boost::asio::deadline_timer timeout_;
00094   boost::asio::deadline_timer watchdog_;
00095   boost::posix_time::time_duration watchdog_duration_;
00096 
00097   void clearWatchdog()
00098   {
00099     if (watchdog_duration_ == boost::posix_time::time_duration())
00100       return;
00101     watchdog_.cancel();
00102 
00103     watchdog_.expires_from_now(watchdog_duration_);
00104     watchdog_.async_wait(
00105         boost::bind(&ConnectionTcp::onWatchdog, this, boost::asio::placeholders::error));
00106   }
00107   void onWatchdog(const boost::system::error_code &error)
00108   {
00109     if (!error)
00110     {
00111       std::cerr << "Watchdog timeout" << std::endl;
00112       close();
00113     }
00114   }
00115 
00116   void onReceive(const boost::system::error_code &error)
00117   {
00118     const auto time_read = boost::posix_time::microsec_clock::universal_time();
00119     if (error)
00120     {
00121       std::cerr << "Receive error" << std::endl;
00122       close();
00123       return;
00124     }
00125     clearWatchdog();
00126     receive(buf_, time_read);
00127     asyncRead();
00128   }
00129   void onSend(const boost::system::error_code &error, CallbackSend cb)
00130   {
00131     const auto time_send = boost::posix_time::microsec_clock::universal_time();
00132     if (error)
00133     {
00134       std::cerr << "Send error" << std::endl;
00135       close();
00136       return;
00137     }
00138     if (cb)
00139       cb(time_send);
00140   }
00141 
00142   void asyncRead()
00143   {
00144     boost::asio::async_read_until(
00145         socket_, buf_, "\n\n",
00146         boost::bind(&ConnectionTcp::onReceive, this, boost::asio::placeholders::error));
00147   }
00148   void onConnect(const boost::system::error_code &error)
00149   {
00150     if (error)
00151     {
00152       std::cerr << "Connection error" << std::endl;
00153       close();
00154       return;
00155     }
00156     timeout_.cancel();
00157     connect();
00158     asyncRead();
00159   }
00160   void onConnectTimeout(const boost::system::error_code &error)
00161   {
00162     if (!error)
00163     {
00164       std::cerr << "Connection timeout" << std::endl;
00165       close();
00166       return;
00167     }
00168   }
00169 
00170 public:
00171   using Ptr = std::shared_ptr<ConnectionTcp>;
00172 
00173   ConnectionTcp(const std::string &ip, const uint16_t port)
00174     : socket_(io_)
00175     , timeout_(io_)
00176     , watchdog_(io_)
00177   {
00178     boost::asio::ip::tcp::endpoint endpoint(
00179         boost::asio::ip::address::from_string(ip), port);
00180     socket_.async_connect(
00181         endpoint,
00182         boost::bind(
00183             &ConnectionTcp::onConnect,
00184             this, boost::asio::placeholders::error));
00185 
00186     timeout_.expires_from_now(boost::posix_time::seconds(2));
00187     timeout_.async_wait(
00188         boost::bind(
00189             &ConnectionTcp::onConnectTimeout,
00190             this, boost::asio::placeholders::error));
00191   }
00192   void spin()
00193   {
00194     io_.run();
00195   }
00196   void stop()
00197   {
00198     io_.stop();
00199   }
00200   void send(const std::string &data, CallbackSend cb = CallbackSend())
00201   {
00202     boost::shared_ptr<std::string> buf(new std::string(data));
00203     boost::asio::async_write(
00204         socket_, boost::asio::buffer(*buf),
00205         boost::bind(
00206             &ConnectionTcp::onSend,
00207             this, boost::asio::placeholders::error, cb));
00208   }
00209   void startWatchdog(const boost::posix_time::time_duration &duration)
00210   {
00211     watchdog_duration_ = duration;
00212     clearWatchdog();
00213   }
00214 };
00215 
00216 }  // namespace scip2
00217 
00218 #endif  // SCIP2_CONNECTION_H


urg_stamped
Author(s): Atsushi Watanabe
autogenerated on Thu Jun 6 2019 18:59:51