00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 #include "remote_interface.h"
00037 #include "unexpected_receive_timeout.h"
00038
00039 #include <cpr/cpr.h>
00040 #include <json.hpp>
00041
00042
00043 using namespace std;
00044 using json = nlohmann::json;
00045
00046 namespace rc
00047 {
00048 namespace dynamics
00049 {
00050
00051 string toString(cpr::Response resp)
00052 {
00053 stringstream s;
00054 s << "status code: " << resp.status_code << endl
00055 << "url: " << resp.url << endl
00056 << "text: " << resp.text << endl
00057 << "error: " << resp.error.message;
00058 return s.str();
00059 }
00060
00061 string toString(list<string> list)
00062 {
00063 stringstream s;
00064 s << "[";
00065 for (auto it = list.begin(); it != list.end();)
00066 {
00067 s << *it;
00068 if (++it != list.end())
00069 {
00070 s << ", ";
00071 }
00072 }
00073 s << "]";
00074 return s.str();
00075 }
00076
00077 void handleCPRResponse(cpr::Response r)
00078 {
00079 if (r.status_code != 200)
00080 {
00081 throw runtime_error(toString(r));
00082 }
00083 }
00084
00085
00091 class TrackedDataReceiver : public DataReceiver
00092 {
00093 public:
00094
00095 static shared_ptr<TrackedDataReceiver>
00096 create(const string &ip_address, unsigned int &port,
00097 const string &stream,
00098 shared_ptr<RemoteInterface> creator)
00099 {
00100 return shared_ptr<TrackedDataReceiver>(
00101 new TrackedDataReceiver(ip_address, port, stream, creator));
00102 }
00103
00104 virtual ~TrackedDataReceiver()
00105 {
00106 try
00107 {
00108 _creator->deleteDestinationFromStream(_stream, _dest);
00109 }
00110 catch (exception &e)
00111 {
00112 cerr
00113 << "[TrackedDataReceiver] Could not remove my destination "
00114 << _dest << " for stream type " << _stream
00115 << " from rc_visard: "
00116 << e.what() << endl;
00117 }
00118 }
00119
00120 protected:
00121
00122 TrackedDataReceiver(const string &ip_address, unsigned int &port,
00123 const string &stream,
00124 shared_ptr<RemoteInterface> creator)
00125 : DataReceiver(ip_address, port),
00126 _dest(ip_address + ":" + to_string(port)),
00127 _stream(stream),
00128 _creator(creator)
00129 {}
00130
00131 string _dest, _stream;
00132 shared_ptr<RemoteInterface> _creator;
00133 };
00134
00135
00136 map<string, RemoteInterface::Ptr> RemoteInterface::_remoteInterfaces = map<string,RemoteInterface::Ptr>();
00137
00138 RemoteInterface::Ptr
00139 RemoteInterface::create(const string &rcVisardInetAddrs,
00140 unsigned int requestsTimeout)
00141 {
00142
00143 auto found = RemoteInterface::_remoteInterfaces.find(rcVisardInetAddrs);
00144 if (found != RemoteInterface::_remoteInterfaces.end())
00145 {
00146 return found->second;
00147 }
00148
00149
00150 auto newRemoteInterface = Ptr(
00151 new RemoteInterface(rcVisardInetAddrs, requestsTimeout));
00152 RemoteInterface::_remoteInterfaces[rcVisardInetAddrs] = newRemoteInterface;
00153
00154 return newRemoteInterface;
00155 }
00156
00157
00158 RemoteInterface::RemoteInterface(const string &rcVisardIP,
00159 unsigned int requestsTimeout) :
00160 _visardAddrs(rcVisardIP),
00161 _baseUrl("http://" + _visardAddrs + "/api/v1"),
00162 _timeoutCurl(requestsTimeout)
00163 {
00164 _reqStreams.clear();
00165 _protobufMap.clear();
00166
00167
00168 if (!isValidIPAddress(rcVisardIP))
00169 {
00170 throw invalid_argument("Given IP address is not a valid address: "
00171 + rcVisardIP);
00172 }
00173
00174
00175
00176 cpr::Url url = cpr::Url{_baseUrl + "/datastreams"};
00177 auto get = cpr::Get(url, cpr::Timeout{_timeoutCurl});
00178 handleCPRResponse(get);
00179
00180
00181 auto j = json::parse(get.text);
00182 for (const auto& stream : j) {
00183 _availStreams.push_back(stream["name"]);
00184 _protobufMap[stream["name"]] = stream["protobuf"];
00185 }
00186 }
00187
00188
00189 RemoteInterface::~RemoteInterface()
00190 {
00191 cleanUpRequestedStreams();
00192 for (const auto& s : _reqStreams)
00193 {
00194 if (s.second.size() > 0)
00195 {
00196 cerr << "[RemoteInterface] Could not stop all previously requested"
00197 " streams of type " << s.first << " on rc_visard. Please check "
00198 "device manually"
00199 " (" << _baseUrl << "/datastreams/" << s.first << ")"
00200 " for not containing any of the following legacy streams and"
00201 " delete them otherwise, e.g. using the swagger UI ("
00202 << "http://" + _visardAddrs + "/api/swagger/)"
00203 << ": "
00204 << toString(s.second)
00205 << endl;
00206 }
00207 }
00208 }
00209
00210
00211 void RemoteInterface::start(bool flagRestart)
00212 {
00213
00214 string serviceToCall = (flagRestart) ? "restart" : "start";
00215 cpr::Url url = cpr::Url{
00216 _baseUrl + "/nodes/rc_dynamics/services/" + serviceToCall};
00217 auto put = cpr::Put(url, cpr::Timeout{_timeoutCurl});
00218 handleCPRResponse(put);
00219 }
00220
00221
00222 void RemoteInterface::stop()
00223 {
00224
00225 cpr::Url url = cpr::Url{_baseUrl + "/nodes/rc_dynamics/services/stop"};
00226 auto put = cpr::Put(url, cpr::Timeout{_timeoutCurl});
00227 handleCPRResponse(put);
00228 }
00229
00230
00231 RemoteInterface::State RemoteInterface::getState()
00232 {
00233
00234 cpr::Url url = cpr::Url{_baseUrl + "/nodes/rc_dynamics/status"};
00235 auto get = cpr::Get(url, cpr::Timeout{_timeoutCurl});
00236 handleCPRResponse(get);
00237
00238
00239 auto j = json::parse(get.text);
00240 if (j["status"].get<string>() == "running")
00241 return State::RUNNING;
00242 else
00243 return State::STOPPED;
00244 }
00245
00246 list<string> RemoteInterface::getAvailableStreams()
00247 {
00248 return _availStreams;
00249 }
00250
00251
00252 string RemoteInterface::getPbMsgTypeOfStream(const string &stream)
00253 {
00254 checkStreamTypeAvailable(stream);
00255 return _protobufMap[stream];
00256 }
00257
00258
00259 list<string> RemoteInterface::getDestinationsOfStream(const string &stream)
00260 {
00261 checkStreamTypeAvailable(stream);
00262
00263 list<string> destinations;
00264
00265
00266 cpr::Url url = cpr::Url{_baseUrl + "/datastreams/" + stream};
00267 auto get = cpr::Get(url, cpr::Timeout{_timeoutCurl});
00268 handleCPRResponse(get);
00269
00270
00271 auto j = json::parse(get.text);
00272 for (auto dest : j["destinations"])
00273 {
00274 destinations.push_back(dest.get<string>());
00275 }
00276 return destinations;
00277 }
00278
00279
00280 void RemoteInterface::addDestinationToStream(const string &stream,
00281 const string &destination)
00282 {
00283 checkStreamTypeAvailable(stream);
00284
00285
00286 cpr::Url url = cpr::Url{_baseUrl + "/datastreams/" + stream};
00287 auto put = cpr::Put(url, cpr::Timeout{_timeoutCurl},
00288 cpr::Parameters{{"destination", destination}});
00289 handleCPRResponse(put);
00290
00291
00292 _reqStreams[stream].push_back(destination);
00293 }
00294
00295
00296 void RemoteInterface::deleteDestinationFromStream(const string &stream,
00297 const string &destination)
00298 {
00299 checkStreamTypeAvailable(stream);
00300
00301
00302 cpr::Url url = cpr::Url{_baseUrl + "/datastreams/" + stream};
00303 auto del = cpr::Delete(url, cpr::Timeout{_timeoutCurl},
00304 cpr::Parameters{{"destination", destination}});
00305 handleCPRResponse(del);
00306
00307
00308 auto& destinations = _reqStreams[stream];
00309 auto found = find(destinations.begin(), destinations.end(), destination);
00310 if (found != destinations.end())
00311 destinations.erase(found);
00312 }
00313
00314
00315 DataReceiver::Ptr
00316 RemoteInterface::createReceiverForStream(const string &stream,
00317 const string &destInterface,
00318 unsigned int destPort)
00319 {
00320 checkStreamTypeAvailable(stream);
00321
00322
00323 string destAddress;
00324 if (!getThisHostsIP(destAddress, _visardAddrs, destInterface))
00325 {
00326 stringstream msg;
00327 msg << "Could not infer a valid IP address "
00328 "for this host as the destination of the stream! "
00329 "Given network interface specification was '" << destInterface
00330 << "'.";
00331 throw invalid_argument(msg.str());
00332 }
00333
00334
00335 DataReceiver::Ptr receiver = TrackedDataReceiver::create(destAddress,
00336 destPort, stream,
00337 shared_from_this());
00338
00339
00340 string destination = destAddress + ":" + to_string(destPort);
00341 addDestinationToStream(stream, destination);
00342
00343
00344 unsigned int initialTimeOut = 5000;
00345 receiver->setTimeout(initialTimeOut);
00346 if (!receiver->receive(_protobufMap[stream]))
00347 {
00348 throw UnexpectedReceiveTimeout(initialTimeOut);
00349
00350
00351
00352
00353
00354
00355
00356 }
00357
00358
00359 receiver->setTimeout(100);
00360 return receiver;
00361 }
00362
00363
00364 void RemoteInterface::cleanUpRequestedStreams()
00365 {
00366
00367 for (auto const &s : _reqStreams)
00368 {
00369
00370 list<string> rcVisardsActiveStreams;
00371 try
00372 {
00373 rcVisardsActiveStreams = getDestinationsOfStream(s.first);
00374 }
00375 catch (exception &e)
00376 {
00377 cerr << "[RemoteInterface] Could not get list of active " << s.first
00378 << " streams for cleaning up previously requested streams: "
00379 << e.what() << endl;
00380 continue;
00381 }
00382
00383
00384 for (auto activeStream : rcVisardsActiveStreams)
00385 {
00386 auto found = find(s.second.begin(), s.second.end(), activeStream);
00387 if (found != s.second.end())
00388 {
00389 try {
00390 deleteDestinationFromStream(s.first, activeStream);
00391 } catch (exception &e) {
00392 cerr << "[RemoteInterface] Could not delete destination "
00393 << activeStream << " from " << s.first << " stream: "
00394 << e.what() << endl;
00395 }
00396 }
00397 }
00398
00399 }
00400 }
00401
00402 void RemoteInterface::checkStreamTypeAvailable(const string& stream) {
00403 auto found = find(_availStreams.begin(), _availStreams.end(), stream);
00404 if (found == _availStreams.end())
00405 {
00406 stringstream msg;
00407 msg << "Stream of type '" << stream << "' is not available on rc_visard "
00408 << _visardAddrs;
00409 throw invalid_argument(msg.str());
00410 }
00411 }
00412
00413
00414 }
00415 }