map_builder_server.cc
Go to the documentation of this file.
00001 /*
00002  * Copyright 2017 The Cartographer Authors
00003  *
00004  * Licensed under the Apache License, Version 2.0 (the "License");
00005  * you may not use this file except in compliance with the License.
00006  * You may obtain a copy of the License at
00007  *
00008  *      http://www.apache.org/licenses/LICENSE-2.0
00009  *
00010  * Unless required by applicable law or agreed to in writing, software
00011  * distributed under the License is distributed on an "AS IS" BASIS,
00012  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00013  * See the License for the specific language governing permissions and
00014  * limitations under the License.
00015  */
00016 
00017 #include "cartographer/cloud/internal/map_builder_server.h"
00018 
00019 #include "cartographer/cloud/internal/handlers/add_fixed_frame_pose_data_handler.h"
00020 #include "cartographer/cloud/internal/handlers/add_imu_data_handler.h"
00021 #include "cartographer/cloud/internal/handlers/add_landmark_data_handler.h"
00022 #include "cartographer/cloud/internal/handlers/add_odometry_data_handler.h"
00023 #include "cartographer/cloud/internal/handlers/add_rangefinder_data_handler.h"
00024 #include "cartographer/cloud/internal/handlers/add_sensor_data_batch_handler.h"
00025 #include "cartographer/cloud/internal/handlers/add_trajectory_handler.h"
00026 #include "cartographer/cloud/internal/handlers/delete_trajectory_handler.h"
00027 #include "cartographer/cloud/internal/handlers/finish_trajectory_handler.h"
00028 #include "cartographer/cloud/internal/handlers/get_all_submap_poses.h"
00029 #include "cartographer/cloud/internal/handlers/get_constraints_handler.h"
00030 #include "cartographer/cloud/internal/handlers/get_landmark_poses_handler.h"
00031 #include "cartographer/cloud/internal/handlers/get_local_to_global_transform_handler.h"
00032 #include "cartographer/cloud/internal/handlers/get_submap_handler.h"
00033 #include "cartographer/cloud/internal/handlers/get_trajectory_node_poses_handler.h"
00034 #include "cartographer/cloud/internal/handlers/get_trajectory_states_handler.h"
00035 #include "cartographer/cloud/internal/handlers/is_trajectory_finished_handler.h"
00036 #include "cartographer/cloud/internal/handlers/is_trajectory_frozen_handler.h"
00037 #include "cartographer/cloud/internal/handlers/load_state_from_file_handler.h"
00038 #include "cartographer/cloud/internal/handlers/load_state_handler.h"
00039 #include "cartographer/cloud/internal/handlers/receive_global_slam_optimizations_handler.h"
00040 #include "cartographer/cloud/internal/handlers/receive_local_slam_results_handler.h"
00041 #include "cartographer/cloud/internal/handlers/run_final_optimization_handler.h"
00042 #include "cartographer/cloud/internal/handlers/set_landmark_pose_handler.h"
00043 #include "cartographer/cloud/internal/handlers/write_state_handler.h"
00044 #include "cartographer/cloud/internal/handlers/write_state_to_file_handler.h"
00045 #include "cartographer/cloud/internal/sensor/serialization.h"
00046 #include "glog/logging.h"
00047 
00048 namespace cartographer {
00049 namespace cloud {
00050 namespace {
00051 
00052 static auto* kIncomingDataQueueMetric = metrics::Gauge::Null();
00053 constexpr int kMaxMessageSize = 100 * 1024 * 1024;  // 100 MB
00054 const common::Duration kPopTimeout = common::FromMilliseconds(100);
00055 
00056 }  // namespace
00057 
00058 MapBuilderServer::MapBuilderServer(
00059     const proto::MapBuilderServerOptions& map_builder_server_options,
00060     std::unique_ptr<mapping::MapBuilderInterface> map_builder)
00061     : map_builder_(std::move(map_builder)) {
00062   async_grpc::Server::Builder server_builder;
00063   server_builder.SetServerAddress(map_builder_server_options.server_address());
00064   server_builder.SetNumGrpcThreads(
00065       map_builder_server_options.num_grpc_threads());
00066   server_builder.SetNumEventThreads(
00067       map_builder_server_options.num_event_threads());
00068   server_builder.SetMaxSendMessageSize(kMaxMessageSize);
00069   server_builder.SetMaxReceiveMessageSize(kMaxMessageSize);
00070   if (!map_builder_server_options.uplink_server_address().empty()) {
00071     local_trajectory_uploader_ = CreateLocalTrajectoryUploader(
00072         map_builder_server_options.uplink_server_address(),
00073         map_builder_server_options.upload_batch_size(),
00074         map_builder_server_options.enable_ssl_encryption(),
00075         map_builder_server_options.enable_google_auth());
00076   }
00077   server_builder.RegisterHandler<handlers::AddTrajectoryHandler>();
00078   server_builder.RegisterHandler<handlers::AddOdometryDataHandler>();
00079   server_builder.RegisterHandler<handlers::AddImuDataHandler>();
00080   server_builder.RegisterHandler<handlers::AddRangefinderDataHandler>();
00081   server_builder.RegisterHandler<handlers::AddFixedFramePoseDataHandler>();
00082   server_builder.RegisterHandler<handlers::AddLandmarkDataHandler>();
00083   server_builder.RegisterHandler<handlers::AddSensorDataBatchHandler>();
00084   server_builder.RegisterHandler<handlers::FinishTrajectoryHandler>();
00085   server_builder.RegisterHandler<handlers::DeleteTrajectoryHandler>();
00086   server_builder
00087       .RegisterHandler<handlers::ReceiveGlobalSlamOptimizationsHandler>();
00088   server_builder.RegisterHandler<handlers::ReceiveLocalSlamResultsHandler>();
00089   server_builder.RegisterHandler<handlers::GetSubmapHandler>();
00090   server_builder.RegisterHandler<handlers::GetTrajectoryNodePosesHandler>();
00091   server_builder.RegisterHandler<handlers::GetTrajectoryStatesHandler>();
00092   server_builder.RegisterHandler<handlers::GetLandmarkPosesHandler>();
00093   server_builder.RegisterHandler<handlers::GetAllSubmapPosesHandler>();
00094   server_builder.RegisterHandler<handlers::GetLocalToGlobalTransformHandler>();
00095   server_builder.RegisterHandler<handlers::GetConstraintsHandler>();
00096   server_builder.RegisterHandler<handlers::IsTrajectoryFinishedHandler>();
00097   server_builder.RegisterHandler<handlers::IsTrajectoryFrozenHandler>();
00098   server_builder.RegisterHandler<handlers::LoadStateHandler>();
00099   server_builder.RegisterHandler<handlers::LoadStateFromFileHandler>();
00100   server_builder.RegisterHandler<handlers::RunFinalOptimizationHandler>();
00101   server_builder.RegisterHandler<handlers::WriteStateHandler>();
00102   server_builder.RegisterHandler<handlers::WriteStateToFileHandler>();
00103   server_builder.RegisterHandler<handlers::SetLandmarkPoseHandler>();
00104   grpc_server_ = server_builder.Build();
00105   if (map_builder_server_options.map_builder_options()
00106           .use_trajectory_builder_2d()) {
00107     grpc_server_->SetExecutionContext(
00108         absl::make_unique<MapBuilderContext<mapping::Submap2D>>(this));
00109   } else if (map_builder_server_options.map_builder_options()
00110                  .use_trajectory_builder_3d()) {
00111     grpc_server_->SetExecutionContext(
00112         absl::make_unique<MapBuilderContext<mapping::Submap3D>>(this));
00113   } else {
00114     LOG(FATAL)
00115         << "Set either use_trajectory_builder_2d or use_trajectory_builder_3d";
00116   }
00117   map_builder_->pose_graph()->SetGlobalSlamOptimizationCallback(
00118       [this](const std::map<int, mapping::SubmapId>& last_optimized_submap_ids,
00119              const std::map<int, mapping::NodeId>& last_optimized_node_ids) {
00120         OnGlobalSlamOptimizations(last_optimized_submap_ids,
00121                                   last_optimized_node_ids);
00122       });
00123 }
00124 
00125 void MapBuilderServer::Start() {
00126   shutting_down_ = false;
00127   if (local_trajectory_uploader_) {
00128     local_trajectory_uploader_->Start();
00129   }
00130   StartSlamThread();
00131   grpc_server_->Start();
00132 }
00133 
00134 void MapBuilderServer::WaitForShutdown() {
00135   grpc_server_->WaitForShutdown();
00136   if (slam_thread_) {
00137     slam_thread_->join();
00138   }
00139   if (local_trajectory_uploader_) {
00140     local_trajectory_uploader_->Shutdown();
00141   }
00142 }
00143 
00144 void MapBuilderServer::Shutdown() {
00145   shutting_down_ = true;
00146   grpc_server_->Shutdown();
00147   if (slam_thread_) {
00148     slam_thread_->join();
00149     slam_thread_.reset();
00150   }
00151   if (local_trajectory_uploader_) {
00152     local_trajectory_uploader_->Shutdown();
00153     local_trajectory_uploader_.reset();
00154   }
00155 }
00156 
00157 void MapBuilderServer::ProcessSensorDataQueue() {
00158   LOG(INFO) << "Starting SLAM thread.";
00159   while (!shutting_down_) {
00160     kIncomingDataQueueMetric->Set(incoming_data_queue_.Size());
00161     std::unique_ptr<MapBuilderContextInterface::Data> sensor_data =
00162         incoming_data_queue_.PopWithTimeout(kPopTimeout);
00163     if (sensor_data) {
00164       grpc_server_->GetContext<MapBuilderContextInterface>()
00165           ->AddSensorDataToTrajectory(*sensor_data);
00166     }
00167   }
00168 }
00169 
00170 void MapBuilderServer::StartSlamThread() {
00171   CHECK(!slam_thread_);
00172 
00173   // Start the SLAM processing thread.
00174   slam_thread_ = absl::make_unique<std::thread>(
00175       [this]() { this->ProcessSensorDataQueue(); });
00176 }
00177 
00178 void MapBuilderServer::OnLocalSlamResult(
00179     int trajectory_id, const std::string client_id, common::Time time,
00180     transform::Rigid3d local_pose, sensor::RangeData range_data,
00181     std::unique_ptr<const mapping::TrajectoryBuilderInterface::InsertionResult>
00182         insertion_result) {
00183   auto shared_range_data =
00184       std::make_shared<sensor::RangeData>(std::move(range_data));
00185 
00186   // If there is an uplink server and a submap insertion happened, enqueue this
00187   // local SLAM result for uploading.
00188   if (insertion_result &&
00189       grpc_server_->GetUnsynchronizedContext<MapBuilderContextInterface>()
00190           ->local_trajectory_uploader()) {
00191     auto sensor_data = absl::make_unique<proto::SensorData>();
00192     auto sensor_id =
00193         grpc_server_->GetUnsynchronizedContext<MapBuilderContextInterface>()
00194             ->local_trajectory_uploader()
00195             ->GetLocalSlamResultSensorId(trajectory_id);
00196     CreateSensorDataForLocalSlamResult(sensor_id.id, trajectory_id, client_id,
00197                                        time, starting_submap_index_,
00198                                        *insertion_result, sensor_data.get());
00199     // TODO(cschuet): Make this more robust.
00200     if (insertion_result->insertion_submaps.front()->insertion_finished()) {
00201       ++starting_submap_index_;
00202     }
00203     grpc_server_->GetUnsynchronizedContext<MapBuilderContextInterface>()
00204         ->local_trajectory_uploader()
00205         ->EnqueueSensorData(std::move(sensor_data));
00206   }
00207 
00208   absl::MutexLock locker(&subscriptions_lock_);
00209   for (auto& entry : local_slam_subscriptions_[trajectory_id]) {
00210     auto copy_of_insertion_result =
00211         insertion_result
00212             ? absl::make_unique<
00213                   const mapping::TrajectoryBuilderInterface::InsertionResult>(
00214                   *insertion_result)
00215             : nullptr;
00216     MapBuilderContextInterface::LocalSlamSubscriptionCallback callback =
00217         entry.second;
00218     if (!callback(
00219             absl::make_unique<MapBuilderContextInterface::LocalSlamResult>(
00220                 MapBuilderContextInterface::LocalSlamResult{
00221                     trajectory_id, time, local_pose, shared_range_data,
00222                     std::move(copy_of_insertion_result)}))) {
00223       LOG(INFO) << "Removing subscription with index: " << entry.first;
00224       CHECK_EQ(local_slam_subscriptions_[trajectory_id].erase(entry.first), 1u);
00225     }
00226   }
00227 }
00228 
00229 void MapBuilderServer::OnGlobalSlamOptimizations(
00230     const std::map<int, mapping::SubmapId>& last_optimized_submap_ids,
00231     const std::map<int, mapping::NodeId>& last_optimized_node_ids) {
00232   absl::MutexLock locker(&subscriptions_lock_);
00233   for (auto& entry : global_slam_subscriptions_) {
00234     if (!entry.second(last_optimized_submap_ids, last_optimized_node_ids)) {
00235       LOG(INFO) << "Removing subscription with index: " << entry.first;
00236       CHECK_EQ(global_slam_subscriptions_.erase(entry.first), 1u);
00237     }
00238   }
00239 }
00240 
00241 MapBuilderContextInterface::LocalSlamSubscriptionId
00242 MapBuilderServer::SubscribeLocalSlamResults(
00243     int trajectory_id,
00244     MapBuilderContextInterface::LocalSlamSubscriptionCallback callback) {
00245   absl::MutexLock locker(&subscriptions_lock_);
00246   local_slam_subscriptions_[trajectory_id].emplace(current_subscription_index_,
00247                                                    callback);
00248   return MapBuilderContextInterface::LocalSlamSubscriptionId{
00249       trajectory_id, current_subscription_index_++};
00250 }
00251 
00252 void MapBuilderServer::UnsubscribeLocalSlamResults(
00253     const MapBuilderContextInterface::LocalSlamSubscriptionId&
00254         subscription_id) {
00255   absl::MutexLock locker(&subscriptions_lock_);
00256   CHECK_EQ(local_slam_subscriptions_[subscription_id.trajectory_id].erase(
00257                subscription_id.subscription_index),
00258            1u);
00259 }
00260 
00261 int MapBuilderServer::SubscribeGlobalSlamOptimizations(
00262     MapBuilderContextInterface::GlobalSlamOptimizationCallback callback) {
00263   absl::MutexLock locker(&subscriptions_lock_);
00264   global_slam_subscriptions_.emplace(current_subscription_index_, callback);
00265   return current_subscription_index_++;
00266 }
00267 
00268 void MapBuilderServer::UnsubscribeGlobalSlamOptimizations(
00269     int subscription_index) {
00270   absl::MutexLock locker(&subscriptions_lock_);
00271   CHECK_EQ(global_slam_subscriptions_.erase(subscription_index), 1u);
00272 }
00273 
00274 void MapBuilderServer::NotifyFinishTrajectory(int trajectory_id) {
00275   absl::MutexLock locker(&subscriptions_lock_);
00276   for (auto& entry : local_slam_subscriptions_[trajectory_id]) {
00277     MapBuilderContextInterface::LocalSlamSubscriptionCallback callback =
00278         entry.second;
00279     // 'nullptr' signals subscribers that the trajectory finished.
00280     callback(nullptr);
00281   }
00282 }
00283 
00284 void MapBuilderServer::WaitUntilIdle() {
00285   incoming_data_queue_.WaitUntilEmpty();
00286   map_builder_->pose_graph()->RunFinalOptimization();
00287 }
00288 
00289 void MapBuilderServer::RegisterMetrics(metrics::FamilyFactory* factory) {
00290   auto* queue_length = factory->NewGaugeFamily(
00291       "cloud_internal_map_builder_server_incoming_data_queue_length",
00292       "Incoming SLAM Data Queue length");
00293   kIncomingDataQueueMetric = queue_length->Add({});
00294 }
00295 
00296 }  // namespace cloud
00297 }  // namespace cartographer


cartographer
Author(s): The Cartographer Authors
autogenerated on Thu May 9 2019 02:27:35