remote_interface.cc
Go to the documentation of this file.
00001 /*
00002  * This file is part of the rc_dynamics_api package.
00003  *
00004  * Copyright (c) 2017 Roboception GmbH
00005  * All rights reserved
00006  *
00007  * Author: Christian Emmerich
00008  *
00009  * Redistribution and use in source and binary forms, with or without
00010  * modification, are permitted provided that the following conditions are met:
00011  *
00012  * 1. Redistributions of source code must retain the above copyright notice,
00013  * this list of conditions and the following disclaimer.
00014  *
00015  * 2. Redistributions in binary form must reproduce the above copyright notice,
00016  * this list of conditions and the following disclaimer in the documentation
00017  * and/or other materials provided with the distribution.
00018  *
00019  * 3. Neither the name of the copyright holder nor the names of its contributors
00020  * may be used to endorse or promote products derived from this software without
00021  * specific prior written permission.
00022  *
00023  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00024  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00025  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00026  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
00027  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00028  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00029  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00030  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00031  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00032  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00033  * POSSIBILITY OF SUCH DAMAGE.
00034  */
00035 
00036 #include "remote_interface.h"
00037 #include "unexpected_receive_timeout.h"
00038 
00039 #include <cpr/cpr.h>
00040 #include <json.hpp>
00041 
00042 
00043 using namespace std;
00044 using json = nlohmann::json;
00045 
00046 namespace rc
00047 {
00048 namespace dynamics
00049 {
00050 
00051 string toString(cpr::Response resp)
00052 {
00053   stringstream s;
00054   s << "status code: " << resp.status_code << endl
00055     << "url: " << resp.url << endl
00056     << "text: " << resp.text << endl
00057     << "error: " << resp.error.message;
00058   return s.str();
00059 }
00060 
00061 string toString(list<string> list)
00062 {
00063   stringstream s;
00064   s << "[";
00065   for (auto it = list.begin(); it != list.end();)
00066   {
00067     s << *it;
00068     if (++it != list.end())
00069     {
00070       s << ", ";
00071     }
00072   }
00073   s << "]";
00074   return s.str();
00075 }
00076 
00077 void handleCPRResponse(cpr::Response r)
00078 {
00079   if (r.status_code != 200)
00080   {
00081     throw runtime_error(toString(r));
00082   }
00083 }
00084 
00085 
00091 class TrackedDataReceiver : public DataReceiver
00092 {
00093   public:
00094 
00095     static shared_ptr<TrackedDataReceiver>
00096     create(const string &ip_address, unsigned int &port,
00097            const string &stream,
00098            shared_ptr<RemoteInterface> creator)
00099     {
00100       return shared_ptr<TrackedDataReceiver>(
00101               new TrackedDataReceiver(ip_address, port, stream, creator));
00102     }
00103 
00104     virtual ~TrackedDataReceiver()
00105     {
00106       try
00107       {
00108         _creator->deleteDestinationFromStream(_stream, _dest);
00109       }
00110       catch (exception &e)
00111       {
00112         cerr
00113                 << "[TrackedDataReceiver] Could not remove my destination "
00114                 << _dest << " for stream type " << _stream
00115                 << " from rc_visard: "
00116                 << e.what() << endl;
00117       }
00118     }
00119 
00120   protected:
00121 
00122     TrackedDataReceiver(const string &ip_address, unsigned int &port,
00123                         const string &stream,
00124                         shared_ptr<RemoteInterface> creator)
00125             : DataReceiver(ip_address, port),
00126               _dest(ip_address + ":" + to_string(port)),
00127               _stream(stream),
00128               _creator(creator)
00129     {}
00130 
00131     string _dest, _stream;
00132     shared_ptr<RemoteInterface> _creator;
00133 };
00134 
00135 // map to store already created RemoteInterface objects
00136 map<string, RemoteInterface::Ptr> RemoteInterface::_remoteInterfaces = map<string,RemoteInterface::Ptr>();
00137 
00138 RemoteInterface::Ptr
00139 RemoteInterface::create(const string &rcVisardInetAddrs,
00140                         unsigned int requestsTimeout)
00141 {
00142   // check if interface is already opened
00143   auto found = RemoteInterface::_remoteInterfaces.find(rcVisardInetAddrs);
00144   if (found != RemoteInterface::_remoteInterfaces.end())
00145   {
00146     return found->second;
00147   }
00148 
00149   // if not, create it
00150   auto newRemoteInterface = Ptr(
00151           new RemoteInterface(rcVisardInetAddrs, requestsTimeout));
00152   RemoteInterface::_remoteInterfaces[rcVisardInetAddrs] = newRemoteInterface;
00153 
00154   return newRemoteInterface;
00155 }
00156 
00157 
00158 RemoteInterface::RemoteInterface(const string &rcVisardIP,
00159                                  unsigned int requestsTimeout) :
00160         _visardAddrs(rcVisardIP),
00161         _baseUrl("http://" + _visardAddrs + "/api/v1"),
00162         _timeoutCurl(requestsTimeout)
00163 {
00164   _reqStreams.clear();
00165   _protobufMap.clear();
00166 
00167   // check if given string is a valid IP address
00168   if (!isValidIPAddress(rcVisardIP))
00169   {
00170     throw invalid_argument("Given IP address is not a valid address: "
00171                            + rcVisardIP);
00172   }
00173 
00174   // initial connection to rc_visard and get streams, i.e. do get request on
00175   // respective url (no parameters needed for this simple service call)
00176   cpr::Url url = cpr::Url{_baseUrl + "/datastreams"};
00177   auto get = cpr::Get(url, cpr::Timeout{_timeoutCurl});
00178   handleCPRResponse(get);
00179 
00180   // parse text of response into json object
00181   auto j = json::parse(get.text);
00182   for (const auto& stream : j) {
00183     _availStreams.push_back(stream["name"]);
00184     _protobufMap[stream["name"]] = stream["protobuf"];
00185   }
00186 }
00187 
00188 
00189 RemoteInterface::~RemoteInterface()
00190 {
00191   cleanUpRequestedStreams();
00192   for (const auto& s : _reqStreams)
00193   {
00194     if (s.second.size() > 0)
00195     {
00196       cerr << "[RemoteInterface] Could not stop all previously requested"
00197               " streams of type " << s.first <<  " on rc_visard. Please check "
00198               "device manually"
00199               " (" << _baseUrl << "/datastreams/" << s.first << ")"
00200               " for not containing any of the following legacy streams and"
00201               " delete them otherwise, e.g. using the swagger UI ("
00202            << "http://" + _visardAddrs + "/api/swagger/)"
00203            << ": "
00204            << toString(s.second)
00205            << endl;
00206     }
00207   }
00208 }
00209 
00210 
00211 void RemoteInterface::start(bool flagRestart)
00212 {
00213   // do put request on respective url (no parameters needed for this simple service call)
00214   string serviceToCall = (flagRestart) ? "restart" : "start";
00215   cpr::Url url = cpr::Url{
00216           _baseUrl + "/nodes/rc_dynamics/services/" + serviceToCall};
00217   auto put = cpr::Put(url, cpr::Timeout{_timeoutCurl});
00218   handleCPRResponse(put);
00219 }
00220 
00221 
00222 void RemoteInterface::stop()
00223 {
00224   // do put request on respective url (no parameters needed for this simple service call)
00225   cpr::Url url = cpr::Url{_baseUrl + "/nodes/rc_dynamics/services/stop"};
00226   auto put = cpr::Put(url, cpr::Timeout{_timeoutCurl});
00227   handleCPRResponse(put);
00228 }
00229 
00230 
00231 RemoteInterface::State RemoteInterface::getState()
00232 {
00233   // do get request on respective url (no parameters needed for this simple service call)
00234   cpr::Url url = cpr::Url{_baseUrl + "/nodes/rc_dynamics/status"};
00235   auto get = cpr::Get(url, cpr::Timeout{_timeoutCurl});
00236   handleCPRResponse(get);
00237 
00238   // parse text of response into json object
00239   auto j = json::parse(get.text);
00240   if (j["status"].get<string>() == "running")
00241     return State::RUNNING;
00242   else
00243     return State::STOPPED;
00244 }
00245 
00246 list<string> RemoteInterface::getAvailableStreams()
00247 {
00248   return _availStreams;
00249 }
00250 
00251 
00252 string RemoteInterface::getPbMsgTypeOfStream(const string &stream)
00253 {
00254   checkStreamTypeAvailable(stream);
00255   return _protobufMap[stream];
00256 }
00257 
00258 
00259 list<string> RemoteInterface::getDestinationsOfStream(const string &stream)
00260 {
00261   checkStreamTypeAvailable(stream);
00262 
00263   list<string> destinations;
00264 
00265   // do get request on respective url (no parameters needed for this simple service call)
00266   cpr::Url url = cpr::Url{_baseUrl + "/datastreams/" + stream};
00267   auto get = cpr::Get(url, cpr::Timeout{_timeoutCurl});
00268   handleCPRResponse(get);
00269 
00270   // parse result as json
00271   auto j = json::parse(get.text);
00272   for (auto dest : j["destinations"])
00273   {
00274     destinations.push_back(dest.get<string>());
00275   }
00276   return destinations;
00277 }
00278 
00279 
00280 void RemoteInterface::addDestinationToStream(const string &stream,
00281                                              const string &destination)
00282 {
00283   checkStreamTypeAvailable(stream);
00284 
00285   // do put request on respective url (no parameters needed for this simple service call)
00286   cpr::Url url = cpr::Url{_baseUrl + "/datastreams/" + stream};
00287   auto put = cpr::Put(url, cpr::Timeout{_timeoutCurl},
00288                       cpr::Parameters{{"destination", destination}});
00289   handleCPRResponse(put);
00290 
00291   // keep track of added destinations
00292   _reqStreams[stream].push_back(destination);
00293 }
00294 
00295 
00296 void RemoteInterface::deleteDestinationFromStream(const string &stream,
00297                                                   const string &destination)
00298 {
00299   checkStreamTypeAvailable(stream);
00300 
00301   // do delete request on respective url (no parameters needed for this simple service call)
00302   cpr::Url url = cpr::Url{_baseUrl + "/datastreams/" + stream};
00303   auto del = cpr::Delete(url, cpr::Timeout{_timeoutCurl},
00304                          cpr::Parameters{{"destination", destination}});
00305   handleCPRResponse(del);
00306 
00307   // delete destination also from list of requested streams
00308   auto& destinations = _reqStreams[stream];
00309   auto found = find(destinations.begin(), destinations.end(), destination);
00310   if (found != destinations.end())
00311     destinations.erase(found);
00312 }
00313 
00314 
00315 DataReceiver::Ptr
00316 RemoteInterface::createReceiverForStream(const string &stream,
00317                                          const string &destInterface,
00318                                          unsigned int destPort)
00319 {
00320   checkStreamTypeAvailable(stream);
00321 
00322   // figure out local inet address for streaming
00323   string destAddress;
00324   if (!getThisHostsIP(destAddress, _visardAddrs, destInterface))
00325   {
00326     stringstream msg;
00327     msg << "Could not infer a valid IP address "
00328             "for this host as the destination of the stream! "
00329             "Given network interface specification was '" << destInterface
00330         << "'.";
00331     throw invalid_argument(msg.str());
00332   }
00333 
00334   // create data receiver with port as specified
00335   DataReceiver::Ptr receiver = TrackedDataReceiver::create(destAddress,
00336                                                            destPort, stream,
00337                                                            shared_from_this());
00338 
00339   // do REST-API call requesting a UDP stream from rc_visard device
00340   string destination = destAddress + ":" + to_string(destPort);
00341   addDestinationToStream(stream, destination);
00342 
00343   // waiting for first message; we set a long timeout for receiving data
00344   unsigned int initialTimeOut = 5000;
00345   receiver->setTimeout(initialTimeOut);
00346   if (!receiver->receive(_protobufMap[stream]))
00347   {
00348     throw UnexpectedReceiveTimeout(initialTimeOut);
00349 //    stringstream msg;
00350 //    msg << "Did not receive any data within the last "
00351 //        << initialTimeOut << " ms. "
00352 //        << "Either rc_visard does not seem to send the data properly "
00353 //                "(is rc_dynamics module running?) or you seem to have serious "
00354 //                "network/connection problems!";
00355 //    throw runtime_error(msg.str());
00356   }
00357 
00358   // stream established, prepare everything for normal pose receiving
00359   receiver->setTimeout(100);
00360   return receiver;
00361 }
00362 
00363 
00364 void RemoteInterface::cleanUpRequestedStreams()
00365 {
00366   // for each stream type check currently running streams on rc_visard device
00367   for (auto const &s : _reqStreams)
00368   {
00369     // get a list of currently active streams of this type on rc_visard device
00370     list<string> rcVisardsActiveStreams;
00371     try
00372     {
00373       rcVisardsActiveStreams = getDestinationsOfStream(s.first);
00374     }
00375     catch (exception &e)
00376     {
00377       cerr << "[RemoteInterface] Could not get list of active " << s.first
00378            << " streams for cleaning up previously requested streams: "
00379            << e.what() << endl;
00380       continue;
00381     }
00382 
00383     // try to stop all previously requested streams of this type
00384     for (auto activeStream : rcVisardsActiveStreams)
00385     {
00386       auto found = find(s.second.begin(), s.second.end(), activeStream);
00387       if (found != s.second.end())
00388       {
00389         try {
00390           deleteDestinationFromStream(s.first, activeStream);
00391         } catch (exception &e) {
00392           cerr << "[RemoteInterface] Could not delete destination "
00393                << activeStream << " from " << s.first << " stream: "
00394                << e.what() << endl;
00395         }
00396       }
00397     }
00398 
00399   }
00400 }
00401 
00402 void RemoteInterface::checkStreamTypeAvailable(const string& stream) {
00403   auto found = find(_availStreams.begin(), _availStreams.end(), stream);
00404   if (found == _availStreams.end())
00405   {
00406     stringstream msg;
00407     msg << "Stream of type '" << stream << "' is not available on rc_visard "
00408         << _visardAddrs;
00409     throw invalid_argument(msg.str());
00410   }
00411 }
00412 
00413 
00414 }
00415 }


rc_visard_driver
Author(s): Heiko Hirschmueller , Christian Emmerich , Felix Ruess
autogenerated on Thu Jun 6 2019 20:43:06