00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 #include "remote_interface.h"
00037 #include "unexpected_receive_timeout.h"
00038
00039 #include "json.hpp"
00040 #include <cpr/cpr.h>
00041 #include <regex>
00042
00043 using namespace std;
00044 using json = nlohmann::json;
00045
00046 namespace rc
00047 {
00048 namespace dynamics
00049 {
00050
00051 const std::string RemoteInterface::State::IDLE = "IDLE";
00052 const std::string RemoteInterface::State::RUNNING = "RUNNING";
00053 const std::string RemoteInterface::State::FATAL = "FATAL";
00054 const std::string RemoteInterface::State::WAITING_FOR_INS = "WAITING_FOR_INS";
00055 const std::string RemoteInterface::State::WAITING_FOR_INS_AND_SLAM = "WAITING_FOR_INS_AND_SLAM";
00056 const std::string RemoteInterface::State::WAITING_FOR_SLAM = "WAITING_FOR_SLAM";
00057 const std::string RemoteInterface::State::RUNNING_WITH_SLAM = "RUNNING_WITH_SLAM";
00058 const std::string RemoteInterface::State::UNKNOWN = "UNKNOWN";
00059
00060 string toString(cpr::Response resp)
00061 {
00062 stringstream s;
00063 s << "status code: " << resp.status_code << endl
00064 << "url: " << resp.url << endl
00065 << "text: " << resp.text << endl
00066 << "error: " << resp.error.message;
00067 return s.str();
00068 }
00069
00070 string toString(list<string> list)
00071 {
00072 stringstream s;
00073 s << "[";
00074 for (auto it = list.begin(); it != list.end();)
00075 {
00076 s << *it;
00077 if (++it != list.end())
00078 {
00079 s << ", ";
00080 }
00081 }
00082 s << "]";
00083 return s.str();
00084 }
00085
00086 void handleCPRResponse(cpr::Response r)
00087 {
00088 switch (r.status_code) {
00089 case 200:
00090 return;
00091 case 429:
00092 throw RemoteInterface::TooManyRequests(r.url);
00093 default:
00094 throw runtime_error(toString(r));
00095 }
00096 }
00097
00098 namespace {
00099
00100 vector<int> wait_before_retry = { 50, 200, 500, 1000, 2000};
00101
00102
00103 cpr::Response cprGetWithRetry(cpr::Url url, cpr::Timeout timeout) {
00104 for (int retry : wait_before_retry) {
00105 auto response = cpr::Get(url, timeout, cpr::Header{ { "accept", "application/json" }});
00106 if (response.status_code == 429) {
00107 cout << "WARNING: Got http code 429 (too many requests) on "
00108 << url << ". Retrying in " << retry << "ms..." << endl;
00109 usleep(1000 * retry);
00110 continue;
00111 }
00112 return response;
00113 }
00114 throw RemoteInterface::TooManyRequests(url);
00115 }
00116
00117
00118 cpr::Response cprPutWithRetry(cpr::Url url, cpr::Timeout timeout, cpr::Body body = cpr::Body{}) {
00119
00120
00121 cpr::Header header;
00122 if (body == cpr::Body{}) {
00123 header = cpr::Header{ { "accept", "application/json" }};
00124 } else {
00125 header = cpr::Header{ { "accept", "application/json" }, { "Content-Type", "application/json" }};
00126 }
00127
00128 for (int retry : wait_before_retry) {
00129 auto response = cpr::Put(url, timeout, body, header);
00130 if (response.status_code==429) {
00131 cout << "WARNING: Got http code 429 (too many requests) on "
00132 << url << ". Retrying in " << retry << "ms..." << endl;
00133 usleep(1000 * retry);
00134 continue;
00135 }
00136 return response;
00137 }
00138 throw RemoteInterface::TooManyRequests(url);
00139 }
00140
00141
00142 cpr::Response cprDeleteWithRetry(cpr::Url url, cpr::Timeout timeout, cpr::Body body = cpr::Body{}) {
00143
00144
00145 cpr::Header header;
00146 if (body == cpr::Body{}) {
00147 header = cpr::Header{ { "accept", "application/json" }};
00148 } else {
00149 header = cpr::Header{ { "accept", "application/json" }, { "Content-Type", "application/json" }};
00150 }
00151
00152 for (int retry : wait_before_retry) {
00153 auto response = cpr::Delete(url, timeout, body, header);
00154 if (response.status_code==429) {
00155 cout << "WARNING: Got http code 429 (too many requests) on "
00156 << url << ". Retrying in " << retry << "ms..." << endl;
00157 usleep(1000 * retry);
00158 continue;
00159 }
00160 return response;
00161 }
00162 throw RemoteInterface::TooManyRequests(url);
00163 }
00164
00165 }
00166
00172 class TrackedDataReceiver : public DataReceiver
00173 {
00174 public:
00175 static shared_ptr<TrackedDataReceiver> create(const string& ip_address, unsigned int& port, const string& stream,
00176 shared_ptr<RemoteInterface> creator)
00177 {
00178 return shared_ptr<TrackedDataReceiver>(new TrackedDataReceiver(ip_address, port, stream, creator));
00179 }
00180
00181 virtual ~TrackedDataReceiver()
00182 {
00183 try
00184 {
00185 creator_->deleteDestinationFromStream(stream_, dest_);
00186 }
00187 catch (exception& e)
00188 {
00189 cerr << "[TrackedDataReceiver] Could not remove my destination " << dest_ << " for stream type " << stream_
00190 << " from rc_visard: " << e.what() << endl;
00191 }
00192 }
00193
00194 protected:
00195 TrackedDataReceiver(const string& ip_address, unsigned int& port, const string& stream,
00196 shared_ptr<RemoteInterface> creator)
00197 : DataReceiver(ip_address, port), dest_(ip_address + ":" + to_string(port)), stream_(stream), creator_(creator)
00198 {
00199 }
00200
00201 string dest_, stream_;
00202 shared_ptr<RemoteInterface> creator_;
00203 };
00204
00205
00206 map<string, RemoteInterface::Ptr> RemoteInterface::remote_interfaces_ = map<string, RemoteInterface::Ptr>();
00207
00208 RemoteInterface::Ptr RemoteInterface::create(const string& rc_visard_inet_addrs, unsigned int requests_timeout)
00209 {
00210
00211 auto found = RemoteInterface::remote_interfaces_.find(rc_visard_inet_addrs);
00212 if (found != RemoteInterface::remote_interfaces_.end())
00213 {
00214 return found->second;
00215 }
00216
00217
00218 auto new_remote_interface = Ptr(new RemoteInterface(rc_visard_inet_addrs, requests_timeout));
00219 RemoteInterface::remote_interfaces_[rc_visard_inet_addrs] = new_remote_interface;
00220
00221 return new_remote_interface;
00222 }
00223
00224 RemoteInterface::RemoteInterface(const string& rc_visard_ip, unsigned int requests_timeout)
00225 : visard_addrs_(rc_visard_ip), initialized_(false), visard_version_(0.0),
00226 base_url_("http://" + visard_addrs_ + "/api/v1"), timeout_curl_(requests_timeout)
00227 {
00228 req_streams_.clear();
00229 protobuf_map_.clear();
00230
00231
00232 if (!isValidIPAddress(rc_visard_ip))
00233 {
00234 throw invalid_argument("Given IP address is not a valid address: " + rc_visard_ip);
00235 }
00236 }
00237
00238 RemoteInterface::~RemoteInterface()
00239 {
00240 try {
00241 cleanUpRequestedStreams();
00242 } catch (exception& e) {
00243 cerr << "[RemoteInterface::~RemoteInterface] Could not clean up all previously requested streams: "
00244 << e.what() << endl;
00245 }
00246 for (const auto& s : req_streams_)
00247 {
00248 if (s.second.size() > 0)
00249 {
00250 cerr << "[RemoteInterface::~RemoteInterface] Could not stop all previously requested"
00251 " streams of type "
00252 << s.first << " on rc_visard. Please check "
00253 "device manually"
00254 " ("
00255 << base_url_ << "/datastreams/" << s.first << ")"
00256 " for not containing any of the following legacy streams and"
00257 " delete them otherwise, e.g. using the swagger UI ("
00258 << "http://" + visard_addrs_ + "/api/swagger/)"
00259 << ": " << toString(s.second) << endl;
00260 }
00261 }
00262 }
00263
00264 bool RemoteInterface::checkSystemReady()
00265 {
00266 initialized_=false;
00267 visard_version_=0.0;
00268 req_streams_.clear();
00269 protobuf_map_.clear();
00270 avail_streams_.clear();
00271
00272 cpr::Url url = cpr::Url{ base_url_ + "/system"};
00273 auto response = cprGetWithRetry(url, cpr::Timeout{ timeout_curl_ });
00274 if (response.status_code == 502)
00275 {
00276 return false;
00277 }
00278 handleCPRResponse(response);
00279
00280
00281 auto get_system = cprGetWithRetry(cpr::Url{ base_url_ + "/system" },
00282 cpr::Timeout{ timeout_curl_ });
00283 handleCPRResponse(get_system);
00284 auto j = json::parse(get_system.text);
00285 if (!j["ready"])
00286 {
00287 return false;
00288 }
00289
00290
00291 string version = j["firmware"]["active_image"]["image_version"];
00292 std::smatch match;
00293 if (std::regex_search(version, match, std::regex("v(\\d+).(\\d+).(\\d+)")))
00294 {
00295 visard_version_ = stof(match[0].str().substr(1,3));
00296 }
00297
00298
00299 auto get_streams = cprGetWithRetry(cpr::Url{ base_url_ + "/datastreams" },
00300 cpr::Timeout{ timeout_curl_ });
00301 handleCPRResponse(get_streams);
00302
00303
00304 j = json::parse(get_streams.text);
00305 for (const auto& stream : j)
00306 {
00307 avail_streams_.push_back(stream["name"]);
00308 protobuf_map_[stream["name"]] = stream["protobuf"];
00309 }
00310
00311
00312 initialized_ = true;
00313 return true;
00314 }
00315
00316 string RemoteInterface::getState(const std::string& node) {
00317 cpr::Url url = cpr::Url{ base_url_ + "/nodes/" + node + "/status"};
00318 auto response = cprGetWithRetry(url, cpr::Timeout{ timeout_curl_ });
00319 handleCPRResponse(response);
00320 try
00321 {
00322 auto j = json::parse(response.text);
00323 return j["values"]["state"];
00324 } catch (std::domain_error &e) {
00325
00326 return State::UNKNOWN;
00327 }
00328 }
00329
00330 string RemoteInterface::getDynamicsState()
00331 {
00332 return getState("rc_dynamics");
00333 }
00334
00335 string RemoteInterface::getSlamState()
00336 {
00337 return getState("rc_slam");
00338 }
00339
00340 string RemoteInterface::getStereoInsState()
00341 {
00342 return getState("rc_stereo_ins");
00343 }
00344
00345 std::string RemoteInterface::callDynamicsService(std::string service_name)
00346 {
00347 cpr::Url url = cpr::Url{ base_url_ + "/nodes/rc_dynamics/services/" + service_name };
00348 auto response = cprPutWithRetry(url, cpr::Timeout{ timeout_curl_ });
00349 handleCPRResponse(response);
00350 auto j = json::parse(response.text);
00351 std::string entered_state;
00352 bool accepted = true;
00353
00354 try
00355 {
00356 entered_state = j["response"]["current_state"].get<std::string>();
00357 if (entered_state != State::IDLE and entered_state != State::RUNNING and entered_state != State::FATAL and
00358 entered_state != State::WAITING_FOR_INS and entered_state != State::WAITING_FOR_INS_AND_SLAM and
00359 entered_state != State::WAITING_FOR_SLAM and entered_state != State::RUNNING_WITH_SLAM)
00360 {
00361
00362 throw InvalidState(entered_state);
00363 }
00364
00365 accepted = j["response"]["accepted"].get<bool>();
00366 }
00367 catch (std::logic_error&)
00368 {
00369
00370
00371 try
00372 {
00373 entered_state = std::to_string(j["response"]["enteredState"].get<int>());
00374 }
00375 catch (std::logic_error&)
00376 {
00377
00378 cerr << "Logic error when parsing the response of a service call to rc_dynamics!\n";
00379 cerr << "Service called: " << url << "\n";
00380 cerr << "Response:"
00381 << "\n";
00382 cerr << response.text << "\n";
00383 throw;
00384 }
00385 }
00386
00387 if (!accepted)
00388 {
00389 throw NotAccepted(service_name);
00390 }
00391
00392 return entered_state;
00393 }
00394
00395 std::string RemoteInterface::restart()
00396 {
00397 return callDynamicsService("restart");
00398 }
00399 std::string RemoteInterface::restartSlam()
00400 {
00401 return callDynamicsService("restart_slam");
00402 }
00403 std::string RemoteInterface::start()
00404 {
00405 return callDynamicsService("start");
00406 }
00407 std::string RemoteInterface::startSlam()
00408 {
00409 return callDynamicsService("start_slam");
00410 }
00411 std::string RemoteInterface::stop()
00412 {
00413 return callDynamicsService("stop");
00414 }
00415 std::string RemoteInterface::stopSlam()
00416 {
00417 return callDynamicsService("stop_slam");
00418 }
00419
00420 std::string RemoteInterface::resetSlam()
00421 {
00422 std::string service_name = "reset";
00423 cpr::Url url = cpr::Url{ base_url_ + "/nodes/rc_slam/services/" + service_name };
00424 auto response = cprPutWithRetry(url, cpr::Timeout{ timeout_curl_ });
00425 handleCPRResponse(response);
00426 auto j = json::parse(response.text);
00427 std::string entered_state;
00428 bool accepted = true;
00429
00430 try
00431 {
00432 entered_state = j["response"]["current_state"].get<std::string>();
00433 std::vector<std::string> valid_states = { "IDLE", "RUNNING", "FATAL", "WAITING_FOR_DATA",
00434 "RESTARTING", "RESETTING", "HALTED" };
00435 if (std::count(valid_states.begin(), valid_states.end(), entered_state) == 0)
00436 {
00437
00438 throw InvalidState(entered_state);
00439 }
00440
00441 accepted = j["response"]["accepted"].get<bool>();
00442 }
00443 catch (std::logic_error& json_exception)
00444 {
00445
00446
00447 try
00448 {
00449 entered_state = std::to_string(j["response"]["enteredState"].get<int>());
00450 }
00451 catch (std::logic_error& json_exception)
00452 {
00453
00454 cerr << "Logic error when parsing the response of a service call to rc_dynamics!\n";
00455 cerr << "Service called: " << url << "\n";
00456 cerr << "Response:"
00457 << "\n";
00458 cerr << response.text << "\n";
00459 throw;
00460 }
00461 }
00462
00463 if (!accepted)
00464 {
00465 throw NotAccepted(service_name);
00466 }
00467
00468 return entered_state;
00469 }
00470
00471 RemoteInterface::ReturnCode RemoteInterface::callSlamService(std::string service_name, unsigned int timeout_ms)
00472 {
00473 cpr::Url url = cpr::Url{ base_url_ + "/nodes/rc_slam/services/" + service_name };
00474 auto response = cprPutWithRetry(url, cpr::Timeout{ (int32_t)timeout_ms });
00475 handleCPRResponse(response);
00476 auto j = json::parse(response.text);
00477
00478 ReturnCode return_code;
00479
00480 try
00481 {
00482 return_code.value = j["response"]["return_code"]["value"].get<int>();
00483 return_code.message = j["response"]["return_code"]["message"];
00484 }
00485 catch (std::logic_error& json_exception)
00486 {
00487
00488 cerr << "Logic error when parsing the response of a service call to rc_dynamics!\n";
00489 cerr << "Service called: " << url << "\n";
00490 cerr << "Response:"
00491 << "\n";
00492 cerr << response.text << "\n";
00493 throw;
00494 }
00495
00496 return return_code;
00497 }
00498
00499 RemoteInterface::ReturnCode RemoteInterface::saveSlamMap(unsigned int timeout_ms)
00500 {
00501 return callSlamService("save_map", timeout_ms);
00502 }
00503 RemoteInterface::ReturnCode RemoteInterface::loadSlamMap(unsigned int timeout_ms)
00504 {
00505 return callSlamService("load_map", timeout_ms);
00506 }
00507 RemoteInterface::ReturnCode RemoteInterface::removeSlamMap(unsigned int timeout_ms)
00508 {
00509 return callSlamService("remove_map", timeout_ms);
00510 }
00511
00512 list<string> RemoteInterface::getAvailableStreams()
00513 {
00514 if (!initialized_ && !checkSystemReady())
00515 {
00516 throw std::runtime_error("RemoteInterface not properly initialized or rc_visard is not ready. "
00517 "Please initialize with method RemoteInterface::checkSystemReady()!");
00518 }
00519 return avail_streams_;
00520 }
00521
00522 string RemoteInterface::getPbMsgTypeOfStream(const string& stream)
00523 {
00524 checkStreamTypeAvailable(stream);
00525 return protobuf_map_[stream];
00526 }
00527
00528 list<string> RemoteInterface::getDestinationsOfStream(const string& stream)
00529 {
00530 checkStreamTypeAvailable(stream);
00531
00532 list<string> destinations;
00533
00534
00535 cpr::Url url = cpr::Url{ base_url_ + "/datastreams/" + stream };
00536 auto get = cprGetWithRetry(url, cpr::Timeout{ timeout_curl_ });
00537 handleCPRResponse(get);
00538
00539
00540 auto j = json::parse(get.text);
00541 for (auto dest : j["destinations"])
00542 {
00543 destinations.push_back(dest.get<string>());
00544 }
00545 return destinations;
00546 }
00547
00548 void RemoteInterface::addDestinationToStream(const string& stream, const string& destination)
00549 {
00550 checkStreamTypeAvailable(stream);
00551
00552
00553 json js_args;
00554 js_args["destination"] = json::array();
00555 js_args["destination"].push_back(destination);
00556 cpr::Url url = cpr::Url{ base_url_ + "/datastreams/" + stream };
00557 auto put = cprPutWithRetry(url, cpr::Timeout{ timeout_curl_ }, cpr::Body{ js_args.dump() });
00558 if (put.status_code == 403)
00559 {
00560 throw TooManyStreamDestinations(json::parse(put.text)["message"].get<string>());
00561 }
00562 handleCPRResponse(put);
00563
00564
00565 req_streams_[stream].push_back(destination);
00566 }
00567
00568 void RemoteInterface::deleteDestinationFromStream(const string& stream, const string& destination)
00569 {
00570 checkStreamTypeAvailable(stream);
00571
00572
00573 json js_args;
00574 js_args["destination"] = json::array();
00575 js_args["destination"].push_back(destination);
00576 cpr::Url url = cpr::Url{ base_url_ + "/datastreams/" + stream };
00577 auto del = cprDeleteWithRetry(url, cpr::Timeout{ timeout_curl_ }, cpr::Body{ js_args.dump() });
00578 handleCPRResponse(del);
00579
00580
00581 auto& destinations = req_streams_[stream];
00582 auto found = find(destinations.begin(), destinations.end(), destination);
00583 if (found != destinations.end())
00584 destinations.erase(found);
00585 }
00586
00587 void RemoteInterface::deleteDestinationsFromStream(const string& stream, const list<string>& destinations)
00588 {
00589 checkStreamTypeAvailable(stream);
00590
00591
00592 if (visard_version_ >= 1.600001) {
00593
00594
00595 json js_destinations = json::array();
00596 for (const auto& dest: destinations)
00597 {
00598 js_destinations.push_back(dest);
00599 }
00600 json js_args;
00601 js_args["destination"] = js_destinations;
00602 cpr::Url url = cpr::Url{ base_url_ + "/datastreams/" + stream };
00603 auto del = cprDeleteWithRetry(url, cpr::Timeout{ timeout_curl_ }, cpr::Body{ js_args.dump()});
00604 handleCPRResponse(del);
00605
00606
00607 } else {
00608 for (const auto& dest : destinations)
00609 {
00610
00611 json js_args;
00612 js_args["destination"] = json::array();
00613 js_args["destination"].push_back(dest);
00614 cpr::Url url = cpr::Url{ base_url_ + "/datastreams/" + stream };
00615 auto del = cprDeleteWithRetry(url, cpr::Timeout{ timeout_curl_ }, cpr::Body{ js_args.dump() });
00616 handleCPRResponse(del);
00617 }
00618 }
00619
00620
00621 auto& reqDestinations = req_streams_[stream];
00622 for (auto& destination : destinations)
00623 {
00624 auto found = find(reqDestinations.begin(), reqDestinations.end(), destination);
00625 if (found != reqDestinations.end())
00626 {
00627 reqDestinations.erase(found);
00628 }
00629 }
00630 }
00631
00632 namespace
00633 {
00634 roboception::msgs::Trajectory toProtobufTrajectory(const json js)
00635 {
00636
00637
00638
00639 roboception::msgs::Trajectory pb_traj;
00640
00641 json::const_iterator js_it;
00642 if ((js_it = js.find("parent")) != js.end())
00643 {
00644 pb_traj.set_parent(js_it.value());
00645 }
00646 if ((js_it = js.find("name")) != js.end())
00647 {
00648 pb_traj.set_name(js_it.value());
00649 }
00650 if ((js_it = js.find("producer")) != js.end())
00651 {
00652 pb_traj.set_producer(js_it.value());
00653 }
00654 if ((js_it = js.find("timestamp")) != js.end())
00655 {
00656 pb_traj.mutable_timestamp()->set_sec(js_it.value()["sec"]);
00657 pb_traj.mutable_timestamp()->set_nsec(js_it.value()["nsec"]);
00658 }
00659 for (const auto& js_pose : js["poses"])
00660 {
00661 auto pb_pose = pb_traj.add_poses();
00662 auto pb_time = pb_pose->mutable_timestamp();
00663 pb_time->set_sec(js_pose["timestamp"]["sec"]);
00664 pb_time->set_nsec(js_pose["timestamp"]["nsec"]);
00665 auto pb_position = pb_pose->mutable_pose()->mutable_position();
00666 pb_position->set_x(js_pose["pose"]["position"]["x"]);
00667 pb_position->set_y(js_pose["pose"]["position"]["y"]);
00668 pb_position->set_z(js_pose["pose"]["position"]["z"]);
00669 auto pb_orientation = pb_pose->mutable_pose()->mutable_orientation();
00670 pb_orientation->set_x(js_pose["pose"]["orientation"]["x"]);
00671 pb_orientation->set_y(js_pose["pose"]["orientation"]["y"]);
00672 pb_orientation->set_z(js_pose["pose"]["orientation"]["z"]);
00673 pb_orientation->set_w(js_pose["pose"]["orientation"]["w"]);
00674 }
00675 return pb_traj;
00676 }
00677 }
00678
00679 roboception::msgs::Trajectory RemoteInterface::getSlamTrajectory(const TrajectoryTime& start, const TrajectoryTime& end, unsigned int timeout_ms)
00680 {
00681
00682 json js_args, js_time, js_start_time, js_end_time;
00683 js_start_time["sec"] = start.getSec();
00684 js_start_time["nsec"] = start.getNsec();
00685 js_end_time["sec"] = end.getSec();
00686 js_end_time["nsec"] = end.getNsec();
00687 js_args["args"]["start_time"] = js_start_time;
00688 js_args["args"]["end_time"] = js_end_time;
00689 if (start.isRelative())
00690 js_args["args"]["start_time_relative"] = true;
00691 if (end.isRelative())
00692 js_args["args"]["end_time_relative"] = true;
00693
00694
00695 cpr::Url url = cpr::Url{ base_url_ + "/nodes/rc_slam/services/get_trajectory" };
00696 auto get = cprPutWithRetry(url, cpr::Timeout{ (int32_t)timeout_ms }, cpr::Body{ js_args.dump() });
00697 handleCPRResponse(get);
00698
00699 auto js = json::parse(get.text)["response"]["trajectory"];
00700 return toProtobufTrajectory(js);
00701 }
00702
00703 DataReceiver::Ptr RemoteInterface::createReceiverForStream(const string& stream, const string& dest_interface,
00704 unsigned int dest_port)
00705 {
00706 checkStreamTypeAvailable(stream);
00707
00708
00709 string dest_address;
00710 if (!getThisHostsIP(dest_address, visard_addrs_, dest_interface))
00711 {
00712 stringstream msg;
00713 msg << "Could not infer a valid IP address "
00714 "for this host as the destination of the stream! "
00715 "Given network interface specification was '"
00716 << dest_interface << "'.";
00717 throw invalid_argument(msg.str());
00718 }
00719
00720
00721 DataReceiver::Ptr receiver = TrackedDataReceiver::create(dest_address, dest_port, stream, shared_from_this());
00722
00723
00724 string destination = dest_address + ":" + to_string(dest_port);
00725 addDestinationToStream(stream, destination);
00726
00727
00728 unsigned int initial_timeOut = 5000;
00729 receiver->setTimeout(initial_timeOut);
00730 if (!receiver->receive(protobuf_map_[stream]))
00731 {
00732
00733 string current_state = getDynamicsState();
00734 std::vector<std::string> valid_states = { "RUNNING", "RUNNING_WITH_SLAM" };
00735 if (std::count(valid_states.begin(), valid_states.end(), current_state) == 0)
00736 {
00737 throw DynamicsNotRunning(current_state);
00738 }
00739
00740
00741 throw UnexpectedReceiveTimeout(initial_timeOut);
00742 }
00743
00744
00745 receiver->setTimeout(100);
00746 return receiver;
00747 }
00748
00749 void RemoteInterface::cleanUpRequestedStreams()
00750 {
00751
00752 for (auto const& s : req_streams_)
00753 {
00754 if (!s.second.empty())
00755 {
00756 deleteDestinationsFromStream(s.first, s.second);
00757 }
00758 }
00759 }
00760
00761 void RemoteInterface::checkStreamTypeAvailable(const string& stream)
00762 {
00763 if (!initialized_ && !checkSystemReady())
00764 {
00765 throw std::runtime_error("RemoteInterface not properly initialized or rc_visard is not ready. "
00766 "Please initialize with method RemoteInterface::checkSystemReady()!");
00767 }
00768 auto found = find(avail_streams_.begin(), avail_streams_.end(), stream);
00769 if (found == avail_streams_.end())
00770 {
00771 stringstream msg;
00772 msg << "Stream of type '" << stream << "' is not available on rc_visard " << visard_addrs_;
00773 throw invalid_argument(msg.str());
00774 }
00775 }
00776 }
00777 }