Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 #include "cartographer/cloud/internal/local_trajectory_uploader.h"
00018
00019 #include <map>
00020 #include <thread>
00021
00022 #include "absl/memory/memory.h"
00023 #include "async_grpc/client.h"
00024 #include "cartographer/cloud/internal/handlers/add_sensor_data_batch_handler.h"
00025 #include "cartographer/cloud/internal/handlers/add_trajectory_handler.h"
00026 #include "cartographer/cloud/internal/handlers/finish_trajectory_handler.h"
00027 #include "cartographer/cloud/internal/sensor/serialization.h"
00028 #include "cartographer/common/blocking_queue.h"
00029 #include "glog/logging.h"
00030 #include "grpc++/grpc++.h"
00031
00032 namespace cartographer {
00033 namespace cloud {
00034 namespace {
00035
00036 using absl::make_unique;
00037
00038 constexpr int kConnectionTimeoutInSeconds = 10;
00039 constexpr int kConnectionRecoveryTimeoutInSeconds = 60;
00040 constexpr int kTokenRefreshIntervalInSeconds = 60;
00041 const common::Duration kPopTimeout = common::FromMilliseconds(100);
00042
00043
00044
00045 const std::set<::grpc::StatusCode> kUnrecoverableStatusCodes = {
00046 ::grpc::DEADLINE_EXCEEDED,
00047 ::grpc::NOT_FOUND,
00048 ::grpc::UNAVAILABLE,
00049 ::grpc::UNKNOWN,
00050 };
00051
00052 bool IsNewSubmap(const mapping::proto::Submap& submap) {
00053 return (submap.has_submap_2d() && submap.submap_2d().num_range_data() == 1) ||
00054 (submap.has_submap_3d() && submap.submap_3d().num_range_data() == 1);
00055 }
00056
00057 class LocalTrajectoryUploader : public LocalTrajectoryUploaderInterface {
00058 public:
00059 struct TrajectoryInfo {
00060
00061 absl::optional<int> uplink_trajectory_id;
00062 const std::set<SensorId> expected_sensor_ids;
00063 const mapping::proto::TrajectoryBuilderOptions trajectory_options;
00064 const std::string client_id;
00065 };
00066
00067 public:
00068 LocalTrajectoryUploader(const std::string& uplink_server_address,
00069 int batch_size, bool enable_ssl_encryption,
00070 bool enable_google_auth);
00071 ~LocalTrajectoryUploader();
00072
00073
00074 void Start() final;
00075
00076
00077
00078 void Shutdown() final;
00079
00080 grpc::Status AddTrajectory(
00081 const std::string& client_id, int local_trajectory_id,
00082 const std::set<SensorId>& expected_sensor_ids,
00083 const mapping::proto::TrajectoryBuilderOptions& trajectory_options) final;
00084 grpc::Status FinishTrajectory(const std::string& client_id,
00085 int local_trajectory_id) final;
00086 void EnqueueSensorData(std::unique_ptr<proto::SensorData> sensor_data) final;
00087 void TryRecovery();
00088
00089 SensorId GetLocalSlamResultSensorId(int local_trajectory_id) const final {
00090 return SensorId{SensorId::SensorType::LOCAL_SLAM_RESULT,
00091 "local_slam_result_" + std::to_string(local_trajectory_id)};
00092 }
00093
00094 private:
00095 void ProcessSendQueue();
00096
00097 bool TranslateTrajectoryId(proto::SensorMetadata* sensor_metadata);
00098 grpc::Status RegisterTrajectory(int local_trajectory_id);
00099
00100 std::shared_ptr<::grpc::Channel> client_channel_;
00101 int batch_size_;
00102 std::map<int, TrajectoryInfo> local_trajectory_id_to_trajectory_info_;
00103 common::BlockingQueue<std::unique_ptr<proto::SensorData>> send_queue_;
00104 bool shutting_down_ = false;
00105 std::unique_ptr<std::thread> upload_thread_;
00106 };
00107
00108 LocalTrajectoryUploader::LocalTrajectoryUploader(
00109 const std::string& uplink_server_address, int batch_size,
00110 bool enable_ssl_encryption, bool enable_google_auth)
00111 : batch_size_(batch_size) {
00112 auto channel_creds =
00113 enable_google_auth
00114 ? grpc::GoogleDefaultCredentials()
00115 : (enable_ssl_encryption
00116 ? ::grpc::SslCredentials(::grpc::SslCredentialsOptions())
00117 : ::grpc::InsecureChannelCredentials());
00118
00119 client_channel_ = ::grpc::CreateChannel(uplink_server_address, channel_creds);
00120 std::chrono::system_clock::time_point deadline =
00121 std::chrono::system_clock::now() +
00122 std::chrono::seconds(kConnectionTimeoutInSeconds);
00123 LOG(INFO) << "Connecting to uplink " << uplink_server_address;
00124 if (!client_channel_->WaitForConnected(deadline)) {
00125 LOG(ERROR) << "Failed to connect to " << uplink_server_address;
00126 }
00127 }
00128
00129 LocalTrajectoryUploader::~LocalTrajectoryUploader() {}
00130
00131 void LocalTrajectoryUploader::Start() {
00132 CHECK(!shutting_down_);
00133 CHECK(!upload_thread_);
00134 upload_thread_ =
00135 make_unique<std::thread>([this]() { this->ProcessSendQueue(); });
00136 }
00137
00138 void LocalTrajectoryUploader::Shutdown() {
00139 CHECK(!shutting_down_);
00140 CHECK(upload_thread_);
00141 shutting_down_ = true;
00142 upload_thread_->join();
00143 }
00144
00145 void LocalTrajectoryUploader::TryRecovery() {
00146 if (client_channel_->GetState(false ) !=
00147 grpc_connectivity_state::GRPC_CHANNEL_READY) {
00148 LOG(INFO) << "Trying to re-connect to uplink...";
00149 std::chrono::system_clock::time_point deadline =
00150 std::chrono::system_clock::now() +
00151 std::chrono::seconds(kConnectionRecoveryTimeoutInSeconds);
00152 if (!client_channel_->WaitForConnected(deadline)) {
00153 LOG(ERROR) << "Failed to re-connect to uplink prior to recovery attempt.";
00154 return;
00155 }
00156 }
00157 LOG(INFO) << "Uplink channel ready, trying recovery.";
00158
00159
00160 LOG(INFO) << "LocalTrajectoryUploader tries to recover with next submap.";
00161 while (true) {
00162 if (shutting_down_) {
00163 return;
00164 }
00165 proto::SensorData* sensor_data =
00166 send_queue_.PeekWithTimeout<proto::SensorData>(kPopTimeout);
00167 if (sensor_data) {
00168 CHECK_GE(sensor_data->local_slam_result_data().submaps_size(), 0);
00169 if (sensor_data->sensor_data_case() ==
00170 proto::SensorData::kLocalSlamResultData &&
00171 sensor_data->local_slam_result_data().submaps_size() > 0 &&
00172 IsNewSubmap(sensor_data->local_slam_result_data().submaps(
00173 sensor_data->local_slam_result_data().submaps_size() - 1))) {
00174 break;
00175 } else {
00176 send_queue_.Pop();
00177 }
00178 }
00179 }
00180
00181
00182
00183 for (auto& entry : local_trajectory_id_to_trajectory_info_) {
00184 entry.second.uplink_trajectory_id.reset();
00185 }
00186
00187
00188
00189
00190 for (const auto& entry : local_trajectory_id_to_trajectory_info_) {
00191 grpc::Status status = RegisterTrajectory(entry.first);
00192 if (!status.ok()) {
00193 LOG(ERROR) << "Failed to create trajectory. Aborting recovery attempt. "
00194 << status.error_message();
00195 return;
00196 }
00197 }
00198 LOG(INFO) << "LocalTrajectoryUploader recovered.";
00199 }
00200
00201 void LocalTrajectoryUploader::ProcessSendQueue() {
00202 LOG(INFO) << "Starting uploader thread.";
00203 proto::AddSensorDataBatchRequest batch_request;
00204 while (!shutting_down_) {
00205 auto sensor_data = send_queue_.PopWithTimeout(kPopTimeout);
00206 if (sensor_data) {
00207 if (!TranslateTrajectoryId(sensor_data->mutable_sensor_metadata())) {
00208 batch_request.clear_sensor_data();
00209 TryRecovery();
00210 continue;
00211 }
00212 proto::SensorData* added_sensor_data = batch_request.add_sensor_data();
00213 *added_sensor_data = *sensor_data;
00214
00215
00216
00217 if (added_sensor_data->has_local_slam_result_data()) {
00218 for (mapping::proto::Submap& mutable_submap :
00219 *added_sensor_data->mutable_local_slam_result_data()
00220 ->mutable_submaps()) {
00221 mutable_submap.mutable_submap_id()->set_trajectory_id(
00222 added_sensor_data->sensor_metadata().trajectory_id());
00223 }
00224 }
00225
00226 if (batch_request.sensor_data_size() == batch_size_) {
00227 async_grpc::Client<handlers::AddSensorDataBatchSignature> client(
00228 client_channel_, common::FromSeconds(kConnectionTimeoutInSeconds),
00229 async_grpc::CreateUnlimitedConstantDelayStrategy(
00230 common::FromSeconds(1), kUnrecoverableStatusCodes));
00231 if (client.Write(batch_request)) {
00232 LOG(INFO) << "Uploaded " << batch_request.ByteSize()
00233 << " bytes of sensor data.";
00234 batch_request.clear_sensor_data();
00235 continue;
00236 }
00237
00238 batch_request.clear_sensor_data();
00239 TryRecovery();
00240 }
00241 }
00242 }
00243 }
00244
00245 bool LocalTrajectoryUploader::TranslateTrajectoryId(
00246 proto::SensorMetadata* sensor_metadata) {
00247 auto it = local_trajectory_id_to_trajectory_info_.find(
00248 sensor_metadata->trajectory_id());
00249 if (it == local_trajectory_id_to_trajectory_info_.end()) {
00250 return false;
00251 }
00252 if (!it->second.uplink_trajectory_id.has_value()) {
00253
00254 return false;
00255 }
00256 int cloud_trajectory_id = it->second.uplink_trajectory_id.value();
00257 sensor_metadata->set_trajectory_id(cloud_trajectory_id);
00258 return true;
00259 }
00260
00261 grpc::Status LocalTrajectoryUploader::AddTrajectory(
00262 const std::string& client_id, int local_trajectory_id,
00263 const std::set<SensorId>& expected_sensor_ids,
00264 const mapping::proto::TrajectoryBuilderOptions& trajectory_options) {
00265 CHECK_EQ(local_trajectory_id_to_trajectory_info_.count(local_trajectory_id),
00266 0);
00267 local_trajectory_id_to_trajectory_info_.emplace(
00268 local_trajectory_id,
00269 TrajectoryInfo{{}, expected_sensor_ids, trajectory_options, client_id});
00270 return RegisterTrajectory(local_trajectory_id);
00271 }
00272
00273 grpc::Status LocalTrajectoryUploader::RegisterTrajectory(
00274 int local_trajectory_id) {
00275 TrajectoryInfo& trajectory_info =
00276 local_trajectory_id_to_trajectory_info_.at(local_trajectory_id);
00277 proto::AddTrajectoryRequest request;
00278 request.set_client_id(trajectory_info.client_id);
00279 *request.mutable_trajectory_builder_options() =
00280 trajectory_info.trajectory_options;
00281 for (const SensorId& sensor_id : trajectory_info.expected_sensor_ids) {
00282
00283 if (sensor_id.type != SensorId::SensorType::RANGE) {
00284 *request.add_expected_sensor_ids() = cloud::ToProto(sensor_id);
00285 }
00286 }
00287 *request.add_expected_sensor_ids() =
00288 cloud::ToProto(GetLocalSlamResultSensorId(local_trajectory_id));
00289 async_grpc::Client<handlers::AddTrajectorySignature> client(
00290 client_channel_, common::FromSeconds(kConnectionTimeoutInSeconds));
00291 ::grpc::Status status;
00292 if (!client.Write(request, &status)) {
00293 LOG(ERROR) << "Failed to register local_trajectory_id "
00294 << local_trajectory_id << " with uplink server. "
00295 << status.error_message();
00296 return status;
00297 }
00298 LOG(INFO) << "Created trajectory for client_id: " << trajectory_info.client_id
00299 << " local trajectory_id: " << local_trajectory_id
00300 << " uplink trajectory_id: " << client.response().trajectory_id();
00301 trajectory_info.uplink_trajectory_id = client.response().trajectory_id();
00302 return status;
00303 }
00304
00305 grpc::Status LocalTrajectoryUploader::FinishTrajectory(
00306 const std::string& client_id, int local_trajectory_id) {
00307 auto it = local_trajectory_id_to_trajectory_info_.find(local_trajectory_id);
00308 if (it == local_trajectory_id_to_trajectory_info_.end()) {
00309 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
00310 "local_trajectory_id has not been"
00311 " registered with AddTrajectory.");
00312 }
00313 auto cloud_trajectory_id = it->second.uplink_trajectory_id;
00314 if (!cloud_trajectory_id.has_value()) {
00315 return grpc::Status(
00316 grpc::StatusCode::UNAVAILABLE,
00317 "trajectory_id has not been created in uplink, ignoring.");
00318 }
00319 proto::FinishTrajectoryRequest request;
00320 request.set_client_id(client_id);
00321 request.set_trajectory_id(cloud_trajectory_id.value());
00322 async_grpc::Client<handlers::FinishTrajectorySignature> client(
00323 client_channel_, common::FromSeconds(kConnectionTimeoutInSeconds));
00324 grpc::Status status;
00325 client.Write(request, &status);
00326 return status;
00327 }
00328
00329 void LocalTrajectoryUploader::EnqueueSensorData(
00330 std::unique_ptr<proto::SensorData> sensor_data) {
00331 send_queue_.Push(std::move(sensor_data));
00332 }
00333
00334 }
00335
00336 std::unique_ptr<LocalTrajectoryUploaderInterface> CreateLocalTrajectoryUploader(
00337 const std::string& uplink_server_address, int batch_size,
00338 bool enable_ssl_encryption, bool enable_google_auth) {
00339 return make_unique<LocalTrajectoryUploader>(uplink_server_address, batch_size,
00340 enable_ssl_encryption,
00341 enable_google_auth);
00342 }
00343
00344 }
00345 }