00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 
00031 
00032 
00033 
00034 
00035 
00036 #include "remote_interface.h"
00037 #include "unexpected_receive_timeout.h"
00038 
00039 #include <cpr/cpr.h>
00040 #include <json.hpp>
00041 
00042 
00043 using namespace std;
00044 using json = nlohmann::json;
00045 
00046 namespace rc
00047 {
00048 namespace dynamics
00049 {
00050 
00051 string toString(cpr::Response resp)
00052 {
00053   stringstream s;
00054   s << "status code: " << resp.status_code << endl
00055     << "url: " << resp.url << endl
00056     << "text: " << resp.text << endl
00057     << "error: " << resp.error.message;
00058   return s.str();
00059 }
00060 
00061 string toString(list<string> list)
00062 {
00063   stringstream s;
00064   s << "[";
00065   for (auto it = list.begin(); it != list.end();)
00066   {
00067     s << *it;
00068     if (++it != list.end())
00069     {
00070       s << ", ";
00071     }
00072   }
00073   s << "]";
00074   return s.str();
00075 }
00076 
00077 void handleCPRResponse(cpr::Response r)
00078 {
00079   if (r.status_code != 200)
00080   {
00081     throw runtime_error(toString(r));
00082   }
00083 }
00084 
00085 
00091 class TrackedDataReceiver : public DataReceiver
00092 {
00093   public:
00094 
00095     static shared_ptr<TrackedDataReceiver>
00096     create(const string &ip_address, unsigned int &port,
00097            const string &stream,
00098            shared_ptr<RemoteInterface> creator)
00099     {
00100       return shared_ptr<TrackedDataReceiver>(
00101               new TrackedDataReceiver(ip_address, port, stream, creator));
00102     }
00103 
00104     virtual ~TrackedDataReceiver()
00105     {
00106       try
00107       {
00108         _creator->deleteDestinationFromStream(_stream, _dest);
00109       }
00110       catch (exception &e)
00111       {
00112         cerr
00113                 << "[TrackedDataReceiver] Could not remove my destination "
00114                 << _dest << " for stream type " << _stream
00115                 << " from rc_visard: "
00116                 << e.what() << endl;
00117       }
00118     }
00119 
00120   protected:
00121 
00122     TrackedDataReceiver(const string &ip_address, unsigned int &port,
00123                         const string &stream,
00124                         shared_ptr<RemoteInterface> creator)
00125             : DataReceiver(ip_address, port),
00126               _dest(ip_address + ":" + to_string(port)),
00127               _stream(stream),
00128               _creator(creator)
00129     {}
00130 
00131     string _dest, _stream;
00132     shared_ptr<RemoteInterface> _creator;
00133 };
00134 
00135 
00136 map<string, RemoteInterface::Ptr> RemoteInterface::_remoteInterfaces = map<string,RemoteInterface::Ptr>();
00137 
00138 RemoteInterface::Ptr
00139 RemoteInterface::create(const string &rcVisardInetAddrs,
00140                         unsigned int requestsTimeout)
00141 {
00142   
00143   auto found = RemoteInterface::_remoteInterfaces.find(rcVisardInetAddrs);
00144   if (found != RemoteInterface::_remoteInterfaces.end())
00145   {
00146     return found->second;
00147   }
00148 
00149   
00150   auto newRemoteInterface = Ptr(
00151           new RemoteInterface(rcVisardInetAddrs, requestsTimeout));
00152   RemoteInterface::_remoteInterfaces[rcVisardInetAddrs] = newRemoteInterface;
00153 
00154   return newRemoteInterface;
00155 }
00156 
00157 
00158 RemoteInterface::RemoteInterface(const string &rcVisardIP,
00159                                  unsigned int requestsTimeout) :
00160         _visardAddrs(rcVisardIP),
00161         _baseUrl("http://" + _visardAddrs + "/api/v1"),
00162         _timeoutCurl(requestsTimeout)
00163 {
00164   _reqStreams.clear();
00165   _protobufMap.clear();
00166 
00167   
00168   if (!isValidIPAddress(rcVisardIP))
00169   {
00170     throw invalid_argument("Given IP address is not a valid address: "
00171                            + rcVisardIP);
00172   }
00173 
00174   
00175   
00176   cpr::Url url = cpr::Url{_baseUrl + "/datastreams"};
00177   auto get = cpr::Get(url, cpr::Timeout{_timeoutCurl});
00178   handleCPRResponse(get);
00179 
00180   
00181   auto j = json::parse(get.text);
00182   for (const auto& stream : j) {
00183     _availStreams.push_back(stream["name"]);
00184     _protobufMap[stream["name"]] = stream["protobuf"];
00185   }
00186 }
00187 
00188 
00189 RemoteInterface::~RemoteInterface()
00190 {
00191   cleanUpRequestedStreams();
00192   for (const auto& s : _reqStreams)
00193   {
00194     if (s.second.size() > 0)
00195     {
00196       cerr << "[RemoteInterface] Could not stop all previously requested"
00197               " streams of type " << s.first <<  " on rc_visard. Please check "
00198               "device manually"
00199               " (" << _baseUrl << "/datastreams/" << s.first << ")"
00200               " for not containing any of the following legacy streams and"
00201               " delete them otherwise, e.g. using the swagger UI ("
00202            << "http://" + _visardAddrs + "/api/swagger/)"
00203            << ": "
00204            << toString(s.second)
00205            << endl;
00206     }
00207   }
00208 }
00209 
00210 
00211 void RemoteInterface::start(bool flagRestart)
00212 {
00213   
00214   string serviceToCall = (flagRestart) ? "restart" : "start";
00215   cpr::Url url = cpr::Url{
00216           _baseUrl + "/nodes/rc_dynamics/services/" + serviceToCall};
00217   auto put = cpr::Put(url, cpr::Timeout{_timeoutCurl});
00218   handleCPRResponse(put);
00219 }
00220 
00221 
00222 void RemoteInterface::stop()
00223 {
00224   
00225   cpr::Url url = cpr::Url{_baseUrl + "/nodes/rc_dynamics/services/stop"};
00226   auto put = cpr::Put(url, cpr::Timeout{_timeoutCurl});
00227   handleCPRResponse(put);
00228 }
00229 
00230 
00231 RemoteInterface::State RemoteInterface::getState()
00232 {
00233   
00234   cpr::Url url = cpr::Url{_baseUrl + "/nodes/rc_dynamics/status"};
00235   auto get = cpr::Get(url, cpr::Timeout{_timeoutCurl});
00236   handleCPRResponse(get);
00237 
00238   
00239   auto j = json::parse(get.text);
00240   if (j["status"].get<string>() == "running")
00241     return State::RUNNING;
00242   else
00243     return State::STOPPED;
00244 }
00245 
00246 list<string> RemoteInterface::getAvailableStreams()
00247 {
00248   return _availStreams;
00249 }
00250 
00251 
00252 string RemoteInterface::getPbMsgTypeOfStream(const string &stream)
00253 {
00254   checkStreamTypeAvailable(stream);
00255   return _protobufMap[stream];
00256 }
00257 
00258 
00259 list<string> RemoteInterface::getDestinationsOfStream(const string &stream)
00260 {
00261   checkStreamTypeAvailable(stream);
00262 
00263   list<string> destinations;
00264 
00265   
00266   cpr::Url url = cpr::Url{_baseUrl + "/datastreams/" + stream};
00267   auto get = cpr::Get(url, cpr::Timeout{_timeoutCurl});
00268   handleCPRResponse(get);
00269 
00270   
00271   auto j = json::parse(get.text);
00272   for (auto dest : j["destinations"])
00273   {
00274     destinations.push_back(dest.get<string>());
00275   }
00276   return destinations;
00277 }
00278 
00279 
00280 void RemoteInterface::addDestinationToStream(const string &stream,
00281                                              const string &destination)
00282 {
00283   checkStreamTypeAvailable(stream);
00284 
00285   
00286   cpr::Url url = cpr::Url{_baseUrl + "/datastreams/" + stream};
00287   auto put = cpr::Put(url, cpr::Timeout{_timeoutCurl},
00288                       cpr::Parameters{{"destination", destination}});
00289   handleCPRResponse(put);
00290 
00291   
00292   _reqStreams[stream].push_back(destination);
00293 }
00294 
00295 
00296 void RemoteInterface::deleteDestinationFromStream(const string &stream,
00297                                                   const string &destination)
00298 {
00299   checkStreamTypeAvailable(stream);
00300 
00301   
00302   cpr::Url url = cpr::Url{_baseUrl + "/datastreams/" + stream};
00303   auto del = cpr::Delete(url, cpr::Timeout{_timeoutCurl},
00304                          cpr::Parameters{{"destination", destination}});
00305   handleCPRResponse(del);
00306 
00307   
00308   auto& destinations = _reqStreams[stream];
00309   auto found = find(destinations.begin(), destinations.end(), destination);
00310   if (found != destinations.end())
00311     destinations.erase(found);
00312 }
00313 
00314 
00315 DataReceiver::Ptr
00316 RemoteInterface::createReceiverForStream(const string &stream,
00317                                          const string &destInterface,
00318                                          unsigned int destPort)
00319 {
00320   checkStreamTypeAvailable(stream);
00321 
00322   
00323   string destAddress;
00324   if (!getThisHostsIP(destAddress, _visardAddrs, destInterface))
00325   {
00326     stringstream msg;
00327     msg << "Could not infer a valid IP address "
00328             "for this host as the destination of the stream! "
00329             "Given network interface specification was '" << destInterface
00330         << "'.";
00331     throw invalid_argument(msg.str());
00332   }
00333 
00334   
00335   DataReceiver::Ptr receiver = TrackedDataReceiver::create(destAddress,
00336                                                            destPort, stream,
00337                                                            shared_from_this());
00338 
00339   
00340   string destination = destAddress + ":" + to_string(destPort);
00341   addDestinationToStream(stream, destination);
00342 
00343   
00344   unsigned int initialTimeOut = 5000;
00345   receiver->setTimeout(initialTimeOut);
00346   if (!receiver->receive(_protobufMap[stream]))
00347   {
00348     throw UnexpectedReceiveTimeout(initialTimeOut);
00349 
00350 
00351 
00352 
00353 
00354 
00355 
00356   }
00357 
00358   
00359   receiver->setTimeout(100);
00360   return receiver;
00361 }
00362 
00363 
00364 void RemoteInterface::cleanUpRequestedStreams()
00365 {
00366   
00367   for (auto const &s : _reqStreams)
00368   {
00369     
00370     list<string> rcVisardsActiveStreams;
00371     try
00372     {
00373       rcVisardsActiveStreams = getDestinationsOfStream(s.first);
00374     }
00375     catch (exception &e)
00376     {
00377       cerr << "[RemoteInterface] Could not get list of active " << s.first
00378            << " streams for cleaning up previously requested streams: "
00379            << e.what() << endl;
00380       continue;
00381     }
00382 
00383     
00384     for (auto activeStream : rcVisardsActiveStreams)
00385     {
00386       auto found = find(s.second.begin(), s.second.end(), activeStream);
00387       if (found != s.second.end())
00388       {
00389         try {
00390           deleteDestinationFromStream(s.first, activeStream);
00391         } catch (exception &e) {
00392           cerr << "[RemoteInterface] Could not delete destination "
00393                << activeStream << " from " << s.first << " stream: "
00394                << e.what() << endl;
00395         }
00396       }
00397     }
00398 
00399   }
00400 }
00401 
00402 void RemoteInterface::checkStreamTypeAvailable(const string& stream) {
00403   auto found = find(_availStreams.begin(), _availStreams.end(), stream);
00404   if (found == _availStreams.end())
00405   {
00406     stringstream msg;
00407     msg << "Stream of type '" << stream << "' is not available on rc_visard "
00408         << _visardAddrs;
00409     throw invalid_argument(msg.str());
00410   }
00411 }
00412 
00413 
00414 }
00415 }