local_trajectory_uploader.cc
Go to the documentation of this file.
00001 /*
00002  * Copyright 2018 The Cartographer Authors
00003  *
00004  * Licensed under the Apache License, Version 2.0 (the "License");
00005  * you may not use this file except in compliance with the License.
00006  * You may obtain a copy of the License at
00007  *
00008  *      http://www.apache.org/licenses/LICENSE-2.0
00009  *
00010  * Unless required by applicable law or agreed to in writing, software
00011  * distributed under the License is distributed on an "AS IS" BASIS,
00012  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00013  * See the License for the specific language governing permissions and
00014  * limitations under the License.
00015  */
00016 
00017 #include "cartographer/cloud/internal/local_trajectory_uploader.h"
00018 
00019 #include <map>
00020 #include <thread>
00021 
00022 #include "absl/memory/memory.h"
00023 #include "async_grpc/client.h"
00024 #include "cartographer/cloud/internal/handlers/add_sensor_data_batch_handler.h"
00025 #include "cartographer/cloud/internal/handlers/add_trajectory_handler.h"
00026 #include "cartographer/cloud/internal/handlers/finish_trajectory_handler.h"
00027 #include "cartographer/cloud/internal/sensor/serialization.h"
00028 #include "cartographer/common/blocking_queue.h"
00029 #include "glog/logging.h"
00030 #include "grpc++/grpc++.h"
00031 
00032 namespace cartographer {
00033 namespace cloud {
00034 namespace {
00035 
00036 using absl::make_unique;
00037 
00038 constexpr int kConnectionTimeoutInSeconds = 10;
00039 constexpr int kConnectionRecoveryTimeoutInSeconds = 60;
00040 constexpr int kTokenRefreshIntervalInSeconds = 60;
00041 const common::Duration kPopTimeout = common::FromMilliseconds(100);
00042 
00043 // This defines the '::grpc::StatusCode's that are considered unrecoverable
00044 // errors and hence no retries will be attempted by the client.
00045 const std::set<::grpc::StatusCode> kUnrecoverableStatusCodes = {
00046     ::grpc::DEADLINE_EXCEEDED,
00047     ::grpc::NOT_FOUND,
00048     ::grpc::UNAVAILABLE,
00049     ::grpc::UNKNOWN,
00050 };
00051 
00052 bool IsNewSubmap(const mapping::proto::Submap& submap) {
00053   return (submap.has_submap_2d() && submap.submap_2d().num_range_data() == 1) ||
00054          (submap.has_submap_3d() && submap.submap_3d().num_range_data() == 1);
00055 }
00056 
00057 class LocalTrajectoryUploader : public LocalTrajectoryUploaderInterface {
00058  public:
00059   struct TrajectoryInfo {
00060     // nullopt if uplink has not yet responded to AddTrajectoryRequest.
00061     absl::optional<int> uplink_trajectory_id;
00062     const std::set<SensorId> expected_sensor_ids;
00063     const mapping::proto::TrajectoryBuilderOptions trajectory_options;
00064     const std::string client_id;
00065   };
00066 
00067  public:
00068   LocalTrajectoryUploader(const std::string& uplink_server_address,
00069                           int batch_size, bool enable_ssl_encryption,
00070                           bool enable_google_auth);
00071   ~LocalTrajectoryUploader();
00072 
00073   // Starts the upload thread.
00074   void Start() final;
00075 
00076   // Shuts down the upload thread. This method blocks until the shutdown is
00077   // complete.
00078   void Shutdown() final;
00079 
00080   grpc::Status AddTrajectory(
00081       const std::string& client_id, int local_trajectory_id,
00082       const std::set<SensorId>& expected_sensor_ids,
00083       const mapping::proto::TrajectoryBuilderOptions& trajectory_options) final;
00084   grpc::Status FinishTrajectory(const std::string& client_id,
00085                                 int local_trajectory_id) final;
00086   void EnqueueSensorData(std::unique_ptr<proto::SensorData> sensor_data) final;
00087   void TryRecovery();
00088 
00089   SensorId GetLocalSlamResultSensorId(int local_trajectory_id) const final {
00090     return SensorId{SensorId::SensorType::LOCAL_SLAM_RESULT,
00091                     "local_slam_result_" + std::to_string(local_trajectory_id)};
00092   }
00093 
00094  private:
00095   void ProcessSendQueue();
00096   // Returns 'false' for failure.
00097   bool TranslateTrajectoryId(proto::SensorMetadata* sensor_metadata);
00098   grpc::Status RegisterTrajectory(int local_trajectory_id);
00099 
00100   std::shared_ptr<::grpc::Channel> client_channel_;
00101   int batch_size_;
00102   std::map<int, TrajectoryInfo> local_trajectory_id_to_trajectory_info_;
00103   common::BlockingQueue<std::unique_ptr<proto::SensorData>> send_queue_;
00104   bool shutting_down_ = false;
00105   std::unique_ptr<std::thread> upload_thread_;
00106 };
00107 
00108 LocalTrajectoryUploader::LocalTrajectoryUploader(
00109     const std::string& uplink_server_address, int batch_size,
00110     bool enable_ssl_encryption, bool enable_google_auth)
00111     : batch_size_(batch_size) {
00112   auto channel_creds =
00113       enable_google_auth
00114           ? grpc::GoogleDefaultCredentials()
00115           : (enable_ssl_encryption
00116                  ? ::grpc::SslCredentials(::grpc::SslCredentialsOptions())
00117                  : ::grpc::InsecureChannelCredentials());
00118 
00119   client_channel_ = ::grpc::CreateChannel(uplink_server_address, channel_creds);
00120   std::chrono::system_clock::time_point deadline =
00121       std::chrono::system_clock::now() +
00122       std::chrono::seconds(kConnectionTimeoutInSeconds);
00123   LOG(INFO) << "Connecting to uplink " << uplink_server_address;
00124   if (!client_channel_->WaitForConnected(deadline)) {
00125     LOG(ERROR) << "Failed to connect to " << uplink_server_address;
00126   }
00127 }
00128 
00129 LocalTrajectoryUploader::~LocalTrajectoryUploader() {}
00130 
00131 void LocalTrajectoryUploader::Start() {
00132   CHECK(!shutting_down_);
00133   CHECK(!upload_thread_);
00134   upload_thread_ =
00135       make_unique<std::thread>([this]() { this->ProcessSendQueue(); });
00136 }
00137 
00138 void LocalTrajectoryUploader::Shutdown() {
00139   CHECK(!shutting_down_);
00140   CHECK(upload_thread_);
00141   shutting_down_ = true;
00142   upload_thread_->join();
00143 }
00144 
00145 void LocalTrajectoryUploader::TryRecovery() {
00146   if (client_channel_->GetState(false /* try_to_connect */) !=
00147       grpc_connectivity_state::GRPC_CHANNEL_READY) {
00148     LOG(INFO) << "Trying to re-connect to uplink...";
00149     std::chrono::system_clock::time_point deadline =
00150         std::chrono::system_clock::now() +
00151         std::chrono::seconds(kConnectionRecoveryTimeoutInSeconds);
00152     if (!client_channel_->WaitForConnected(deadline)) {
00153       LOG(ERROR) << "Failed to re-connect to uplink prior to recovery attempt.";
00154       return;
00155     }
00156   }
00157   LOG(INFO) << "Uplink channel ready, trying recovery.";
00158 
00159   // Wind the sensor_data_queue forward to the next new submap.
00160   LOG(INFO) << "LocalTrajectoryUploader tries to recover with next submap.";
00161   while (true) {
00162     if (shutting_down_) {
00163       return;
00164     }
00165     proto::SensorData* sensor_data =
00166         send_queue_.PeekWithTimeout<proto::SensorData>(kPopTimeout);
00167     if (sensor_data) {
00168       CHECK_GE(sensor_data->local_slam_result_data().submaps_size(), 0);
00169       if (sensor_data->sensor_data_case() ==
00170               proto::SensorData::kLocalSlamResultData &&
00171           sensor_data->local_slam_result_data().submaps_size() > 0 &&
00172           IsNewSubmap(sensor_data->local_slam_result_data().submaps(
00173               sensor_data->local_slam_result_data().submaps_size() - 1))) {
00174         break;
00175       } else {
00176         send_queue_.Pop();
00177       }
00178     }
00179   }
00180 
00181   // Because the trajectories may be interrupted on the uplink side, we can no
00182   // longer upload to those.
00183   for (auto& entry : local_trajectory_id_to_trajectory_info_) {
00184     entry.second.uplink_trajectory_id.reset();
00185   }
00186   // TODO(gaschler): If the uplink did not restart but only the connection was
00187   // interrupted, this leaks trajectories in the uplink.
00188 
00189   // Attempt to recreate the trajectories.
00190   for (const auto& entry : local_trajectory_id_to_trajectory_info_) {
00191     grpc::Status status = RegisterTrajectory(entry.first);
00192     if (!status.ok()) {
00193       LOG(ERROR) << "Failed to create trajectory. Aborting recovery attempt. "
00194                  << status.error_message();
00195       return;
00196     }
00197   }
00198   LOG(INFO) << "LocalTrajectoryUploader recovered.";
00199 }
00200 
00201 void LocalTrajectoryUploader::ProcessSendQueue() {
00202   LOG(INFO) << "Starting uploader thread.";
00203   proto::AddSensorDataBatchRequest batch_request;
00204   while (!shutting_down_) {
00205     auto sensor_data = send_queue_.PopWithTimeout(kPopTimeout);
00206     if (sensor_data) {
00207       if (!TranslateTrajectoryId(sensor_data->mutable_sensor_metadata())) {
00208         batch_request.clear_sensor_data();
00209         TryRecovery();
00210         continue;
00211       }
00212       proto::SensorData* added_sensor_data = batch_request.add_sensor_data();
00213       *added_sensor_data = *sensor_data;
00214 
00215       // A submap also holds a trajectory id that must be translated to uplink's
00216       // trajectory id.
00217       if (added_sensor_data->has_local_slam_result_data()) {
00218         for (mapping::proto::Submap& mutable_submap :
00219              *added_sensor_data->mutable_local_slam_result_data()
00220                   ->mutable_submaps()) {
00221           mutable_submap.mutable_submap_id()->set_trajectory_id(
00222               added_sensor_data->sensor_metadata().trajectory_id());
00223         }
00224       }
00225 
00226       if (batch_request.sensor_data_size() == batch_size_) {
00227         async_grpc::Client<handlers::AddSensorDataBatchSignature> client(
00228             client_channel_, common::FromSeconds(kConnectionTimeoutInSeconds),
00229             async_grpc::CreateUnlimitedConstantDelayStrategy(
00230                 common::FromSeconds(1), kUnrecoverableStatusCodes));
00231         if (client.Write(batch_request)) {
00232           LOG(INFO) << "Uploaded " << batch_request.ByteSize()
00233                     << " bytes of sensor data.";
00234           batch_request.clear_sensor_data();
00235           continue;
00236         }
00237         // Unrecoverable error occurred. Attempt recovery.
00238         batch_request.clear_sensor_data();
00239         TryRecovery();
00240       }
00241     }
00242   }
00243 }
00244 
00245 bool LocalTrajectoryUploader::TranslateTrajectoryId(
00246     proto::SensorMetadata* sensor_metadata) {
00247   auto it = local_trajectory_id_to_trajectory_info_.find(
00248       sensor_metadata->trajectory_id());
00249   if (it == local_trajectory_id_to_trajectory_info_.end()) {
00250     return false;
00251   }
00252   if (!it->second.uplink_trajectory_id.has_value()) {
00253     // Could not yet register trajectory with uplink server.
00254     return false;
00255   }
00256   int cloud_trajectory_id = it->second.uplink_trajectory_id.value();
00257   sensor_metadata->set_trajectory_id(cloud_trajectory_id);
00258   return true;
00259 }
00260 
00261 grpc::Status LocalTrajectoryUploader::AddTrajectory(
00262     const std::string& client_id, int local_trajectory_id,
00263     const std::set<SensorId>& expected_sensor_ids,
00264     const mapping::proto::TrajectoryBuilderOptions& trajectory_options) {
00265   CHECK_EQ(local_trajectory_id_to_trajectory_info_.count(local_trajectory_id),
00266            0);
00267   local_trajectory_id_to_trajectory_info_.emplace(
00268       local_trajectory_id,
00269       TrajectoryInfo{{}, expected_sensor_ids, trajectory_options, client_id});
00270   return RegisterTrajectory(local_trajectory_id);
00271 }
00272 
00273 grpc::Status LocalTrajectoryUploader::RegisterTrajectory(
00274     int local_trajectory_id) {
00275   TrajectoryInfo& trajectory_info =
00276       local_trajectory_id_to_trajectory_info_.at(local_trajectory_id);
00277   proto::AddTrajectoryRequest request;
00278   request.set_client_id(trajectory_info.client_id);
00279   *request.mutable_trajectory_builder_options() =
00280       trajectory_info.trajectory_options;
00281   for (const SensorId& sensor_id : trajectory_info.expected_sensor_ids) {
00282     // Range sensors are not forwarded, but combined into a LocalSlamResult.
00283     if (sensor_id.type != SensorId::SensorType::RANGE) {
00284       *request.add_expected_sensor_ids() = cloud::ToProto(sensor_id);
00285     }
00286   }
00287   *request.add_expected_sensor_ids() =
00288       cloud::ToProto(GetLocalSlamResultSensorId(local_trajectory_id));
00289   async_grpc::Client<handlers::AddTrajectorySignature> client(
00290       client_channel_, common::FromSeconds(kConnectionTimeoutInSeconds));
00291   ::grpc::Status status;
00292   if (!client.Write(request, &status)) {
00293     LOG(ERROR) << "Failed to register local_trajectory_id "
00294                << local_trajectory_id << " with uplink server. "
00295                << status.error_message();
00296     return status;
00297   }
00298   LOG(INFO) << "Created trajectory for client_id: " << trajectory_info.client_id
00299             << " local trajectory_id: " << local_trajectory_id
00300             << " uplink trajectory_id: " << client.response().trajectory_id();
00301   trajectory_info.uplink_trajectory_id = client.response().trajectory_id();
00302   return status;
00303 }
00304 
00305 grpc::Status LocalTrajectoryUploader::FinishTrajectory(
00306     const std::string& client_id, int local_trajectory_id) {
00307   auto it = local_trajectory_id_to_trajectory_info_.find(local_trajectory_id);
00308   if (it == local_trajectory_id_to_trajectory_info_.end()) {
00309     return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
00310                         "local_trajectory_id has not been"
00311                         " registered with AddTrajectory.");
00312   }
00313   auto cloud_trajectory_id = it->second.uplink_trajectory_id;
00314   if (!cloud_trajectory_id.has_value()) {
00315     return grpc::Status(
00316         grpc::StatusCode::UNAVAILABLE,
00317         "trajectory_id has not been created in uplink, ignoring.");
00318   }
00319   proto::FinishTrajectoryRequest request;
00320   request.set_client_id(client_id);
00321   request.set_trajectory_id(cloud_trajectory_id.value());
00322   async_grpc::Client<handlers::FinishTrajectorySignature> client(
00323       client_channel_, common::FromSeconds(kConnectionTimeoutInSeconds));
00324   grpc::Status status;
00325   client.Write(request, &status);
00326   return status;
00327 }
00328 
00329 void LocalTrajectoryUploader::EnqueueSensorData(
00330     std::unique_ptr<proto::SensorData> sensor_data) {
00331   send_queue_.Push(std::move(sensor_data));
00332 }
00333 
00334 }  // namespace
00335 
00336 std::unique_ptr<LocalTrajectoryUploaderInterface> CreateLocalTrajectoryUploader(
00337     const std::string& uplink_server_address, int batch_size,
00338     bool enable_ssl_encryption, bool enable_google_auth) {
00339   return make_unique<LocalTrajectoryUploader>(uplink_server_address, batch_size,
00340                                               enable_ssl_encryption,
00341                                               enable_google_auth);
00342 }
00343 
00344 }  // namespace cloud
00345 }  // namespace cartographer


cartographer
Author(s): The Cartographer Authors
autogenerated on Thu May 9 2019 02:27:35