Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 #include "cartographer/cloud/internal/map_builder_server.h"
00018
00019 #include "cartographer/cloud/internal/handlers/add_fixed_frame_pose_data_handler.h"
00020 #include "cartographer/cloud/internal/handlers/add_imu_data_handler.h"
00021 #include "cartographer/cloud/internal/handlers/add_landmark_data_handler.h"
00022 #include "cartographer/cloud/internal/handlers/add_odometry_data_handler.h"
00023 #include "cartographer/cloud/internal/handlers/add_rangefinder_data_handler.h"
00024 #include "cartographer/cloud/internal/handlers/add_sensor_data_batch_handler.h"
00025 #include "cartographer/cloud/internal/handlers/add_trajectory_handler.h"
00026 #include "cartographer/cloud/internal/handlers/delete_trajectory_handler.h"
00027 #include "cartographer/cloud/internal/handlers/finish_trajectory_handler.h"
00028 #include "cartographer/cloud/internal/handlers/get_all_submap_poses.h"
00029 #include "cartographer/cloud/internal/handlers/get_constraints_handler.h"
00030 #include "cartographer/cloud/internal/handlers/get_landmark_poses_handler.h"
00031 #include "cartographer/cloud/internal/handlers/get_local_to_global_transform_handler.h"
00032 #include "cartographer/cloud/internal/handlers/get_submap_handler.h"
00033 #include "cartographer/cloud/internal/handlers/get_trajectory_node_poses_handler.h"
00034 #include "cartographer/cloud/internal/handlers/get_trajectory_states_handler.h"
00035 #include "cartographer/cloud/internal/handlers/is_trajectory_finished_handler.h"
00036 #include "cartographer/cloud/internal/handlers/is_trajectory_frozen_handler.h"
00037 #include "cartographer/cloud/internal/handlers/load_state_from_file_handler.h"
00038 #include "cartographer/cloud/internal/handlers/load_state_handler.h"
00039 #include "cartographer/cloud/internal/handlers/receive_global_slam_optimizations_handler.h"
00040 #include "cartographer/cloud/internal/handlers/receive_local_slam_results_handler.h"
00041 #include "cartographer/cloud/internal/handlers/run_final_optimization_handler.h"
00042 #include "cartographer/cloud/internal/handlers/set_landmark_pose_handler.h"
00043 #include "cartographer/cloud/internal/handlers/write_state_handler.h"
00044 #include "cartographer/cloud/internal/handlers/write_state_to_file_handler.h"
00045 #include "cartographer/cloud/internal/sensor/serialization.h"
00046 #include "glog/logging.h"
00047
00048 namespace cartographer {
00049 namespace cloud {
00050 namespace {
00051
00052 static auto* kIncomingDataQueueMetric = metrics::Gauge::Null();
00053 constexpr int kMaxMessageSize = 100 * 1024 * 1024;
00054 const common::Duration kPopTimeout = common::FromMilliseconds(100);
00055
00056 }
00057
00058 MapBuilderServer::MapBuilderServer(
00059 const proto::MapBuilderServerOptions& map_builder_server_options,
00060 std::unique_ptr<mapping::MapBuilderInterface> map_builder)
00061 : map_builder_(std::move(map_builder)) {
00062 async_grpc::Server::Builder server_builder;
00063 server_builder.SetServerAddress(map_builder_server_options.server_address());
00064 server_builder.SetNumGrpcThreads(
00065 map_builder_server_options.num_grpc_threads());
00066 server_builder.SetNumEventThreads(
00067 map_builder_server_options.num_event_threads());
00068 server_builder.SetMaxSendMessageSize(kMaxMessageSize);
00069 server_builder.SetMaxReceiveMessageSize(kMaxMessageSize);
00070 if (!map_builder_server_options.uplink_server_address().empty()) {
00071 local_trajectory_uploader_ = CreateLocalTrajectoryUploader(
00072 map_builder_server_options.uplink_server_address(),
00073 map_builder_server_options.upload_batch_size(),
00074 map_builder_server_options.enable_ssl_encryption(),
00075 map_builder_server_options.enable_google_auth());
00076 }
00077 server_builder.RegisterHandler<handlers::AddTrajectoryHandler>();
00078 server_builder.RegisterHandler<handlers::AddOdometryDataHandler>();
00079 server_builder.RegisterHandler<handlers::AddImuDataHandler>();
00080 server_builder.RegisterHandler<handlers::AddRangefinderDataHandler>();
00081 server_builder.RegisterHandler<handlers::AddFixedFramePoseDataHandler>();
00082 server_builder.RegisterHandler<handlers::AddLandmarkDataHandler>();
00083 server_builder.RegisterHandler<handlers::AddSensorDataBatchHandler>();
00084 server_builder.RegisterHandler<handlers::FinishTrajectoryHandler>();
00085 server_builder.RegisterHandler<handlers::DeleteTrajectoryHandler>();
00086 server_builder
00087 .RegisterHandler<handlers::ReceiveGlobalSlamOptimizationsHandler>();
00088 server_builder.RegisterHandler<handlers::ReceiveLocalSlamResultsHandler>();
00089 server_builder.RegisterHandler<handlers::GetSubmapHandler>();
00090 server_builder.RegisterHandler<handlers::GetTrajectoryNodePosesHandler>();
00091 server_builder.RegisterHandler<handlers::GetTrajectoryStatesHandler>();
00092 server_builder.RegisterHandler<handlers::GetLandmarkPosesHandler>();
00093 server_builder.RegisterHandler<handlers::GetAllSubmapPosesHandler>();
00094 server_builder.RegisterHandler<handlers::GetLocalToGlobalTransformHandler>();
00095 server_builder.RegisterHandler<handlers::GetConstraintsHandler>();
00096 server_builder.RegisterHandler<handlers::IsTrajectoryFinishedHandler>();
00097 server_builder.RegisterHandler<handlers::IsTrajectoryFrozenHandler>();
00098 server_builder.RegisterHandler<handlers::LoadStateHandler>();
00099 server_builder.RegisterHandler<handlers::LoadStateFromFileHandler>();
00100 server_builder.RegisterHandler<handlers::RunFinalOptimizationHandler>();
00101 server_builder.RegisterHandler<handlers::WriteStateHandler>();
00102 server_builder.RegisterHandler<handlers::WriteStateToFileHandler>();
00103 server_builder.RegisterHandler<handlers::SetLandmarkPoseHandler>();
00104 grpc_server_ = server_builder.Build();
00105 if (map_builder_server_options.map_builder_options()
00106 .use_trajectory_builder_2d()) {
00107 grpc_server_->SetExecutionContext(
00108 absl::make_unique<MapBuilderContext<mapping::Submap2D>>(this));
00109 } else if (map_builder_server_options.map_builder_options()
00110 .use_trajectory_builder_3d()) {
00111 grpc_server_->SetExecutionContext(
00112 absl::make_unique<MapBuilderContext<mapping::Submap3D>>(this));
00113 } else {
00114 LOG(FATAL)
00115 << "Set either use_trajectory_builder_2d or use_trajectory_builder_3d";
00116 }
00117 map_builder_->pose_graph()->SetGlobalSlamOptimizationCallback(
00118 [this](const std::map<int, mapping::SubmapId>& last_optimized_submap_ids,
00119 const std::map<int, mapping::NodeId>& last_optimized_node_ids) {
00120 OnGlobalSlamOptimizations(last_optimized_submap_ids,
00121 last_optimized_node_ids);
00122 });
00123 }
00124
00125 void MapBuilderServer::Start() {
00126 shutting_down_ = false;
00127 if (local_trajectory_uploader_) {
00128 local_trajectory_uploader_->Start();
00129 }
00130 StartSlamThread();
00131 grpc_server_->Start();
00132 }
00133
00134 void MapBuilderServer::WaitForShutdown() {
00135 grpc_server_->WaitForShutdown();
00136 if (slam_thread_) {
00137 slam_thread_->join();
00138 }
00139 if (local_trajectory_uploader_) {
00140 local_trajectory_uploader_->Shutdown();
00141 }
00142 }
00143
00144 void MapBuilderServer::Shutdown() {
00145 shutting_down_ = true;
00146 grpc_server_->Shutdown();
00147 if (slam_thread_) {
00148 slam_thread_->join();
00149 slam_thread_.reset();
00150 }
00151 if (local_trajectory_uploader_) {
00152 local_trajectory_uploader_->Shutdown();
00153 local_trajectory_uploader_.reset();
00154 }
00155 }
00156
00157 void MapBuilderServer::ProcessSensorDataQueue() {
00158 LOG(INFO) << "Starting SLAM thread.";
00159 while (!shutting_down_) {
00160 kIncomingDataQueueMetric->Set(incoming_data_queue_.Size());
00161 std::unique_ptr<MapBuilderContextInterface::Data> sensor_data =
00162 incoming_data_queue_.PopWithTimeout(kPopTimeout);
00163 if (sensor_data) {
00164 grpc_server_->GetContext<MapBuilderContextInterface>()
00165 ->AddSensorDataToTrajectory(*sensor_data);
00166 }
00167 }
00168 }
00169
00170 void MapBuilderServer::StartSlamThread() {
00171 CHECK(!slam_thread_);
00172
00173
00174 slam_thread_ = absl::make_unique<std::thread>(
00175 [this]() { this->ProcessSensorDataQueue(); });
00176 }
00177
00178 void MapBuilderServer::OnLocalSlamResult(
00179 int trajectory_id, const std::string client_id, common::Time time,
00180 transform::Rigid3d local_pose, sensor::RangeData range_data,
00181 std::unique_ptr<const mapping::TrajectoryBuilderInterface::InsertionResult>
00182 insertion_result) {
00183 auto shared_range_data =
00184 std::make_shared<sensor::RangeData>(std::move(range_data));
00185
00186
00187
00188 if (insertion_result &&
00189 grpc_server_->GetUnsynchronizedContext<MapBuilderContextInterface>()
00190 ->local_trajectory_uploader()) {
00191 auto sensor_data = absl::make_unique<proto::SensorData>();
00192 auto sensor_id =
00193 grpc_server_->GetUnsynchronizedContext<MapBuilderContextInterface>()
00194 ->local_trajectory_uploader()
00195 ->GetLocalSlamResultSensorId(trajectory_id);
00196 CreateSensorDataForLocalSlamResult(sensor_id.id, trajectory_id, client_id,
00197 time, starting_submap_index_,
00198 *insertion_result, sensor_data.get());
00199
00200 if (insertion_result->insertion_submaps.front()->insertion_finished()) {
00201 ++starting_submap_index_;
00202 }
00203 grpc_server_->GetUnsynchronizedContext<MapBuilderContextInterface>()
00204 ->local_trajectory_uploader()
00205 ->EnqueueSensorData(std::move(sensor_data));
00206 }
00207
00208 absl::MutexLock locker(&subscriptions_lock_);
00209 for (auto& entry : local_slam_subscriptions_[trajectory_id]) {
00210 auto copy_of_insertion_result =
00211 insertion_result
00212 ? absl::make_unique<
00213 const mapping::TrajectoryBuilderInterface::InsertionResult>(
00214 *insertion_result)
00215 : nullptr;
00216 MapBuilderContextInterface::LocalSlamSubscriptionCallback callback =
00217 entry.second;
00218 if (!callback(
00219 absl::make_unique<MapBuilderContextInterface::LocalSlamResult>(
00220 MapBuilderContextInterface::LocalSlamResult{
00221 trajectory_id, time, local_pose, shared_range_data,
00222 std::move(copy_of_insertion_result)}))) {
00223 LOG(INFO) << "Removing subscription with index: " << entry.first;
00224 CHECK_EQ(local_slam_subscriptions_[trajectory_id].erase(entry.first), 1u);
00225 }
00226 }
00227 }
00228
00229 void MapBuilderServer::OnGlobalSlamOptimizations(
00230 const std::map<int, mapping::SubmapId>& last_optimized_submap_ids,
00231 const std::map<int, mapping::NodeId>& last_optimized_node_ids) {
00232 absl::MutexLock locker(&subscriptions_lock_);
00233 for (auto& entry : global_slam_subscriptions_) {
00234 if (!entry.second(last_optimized_submap_ids, last_optimized_node_ids)) {
00235 LOG(INFO) << "Removing subscription with index: " << entry.first;
00236 CHECK_EQ(global_slam_subscriptions_.erase(entry.first), 1u);
00237 }
00238 }
00239 }
00240
00241 MapBuilderContextInterface::LocalSlamSubscriptionId
00242 MapBuilderServer::SubscribeLocalSlamResults(
00243 int trajectory_id,
00244 MapBuilderContextInterface::LocalSlamSubscriptionCallback callback) {
00245 absl::MutexLock locker(&subscriptions_lock_);
00246 local_slam_subscriptions_[trajectory_id].emplace(current_subscription_index_,
00247 callback);
00248 return MapBuilderContextInterface::LocalSlamSubscriptionId{
00249 trajectory_id, current_subscription_index_++};
00250 }
00251
00252 void MapBuilderServer::UnsubscribeLocalSlamResults(
00253 const MapBuilderContextInterface::LocalSlamSubscriptionId&
00254 subscription_id) {
00255 absl::MutexLock locker(&subscriptions_lock_);
00256 CHECK_EQ(local_slam_subscriptions_[subscription_id.trajectory_id].erase(
00257 subscription_id.subscription_index),
00258 1u);
00259 }
00260
00261 int MapBuilderServer::SubscribeGlobalSlamOptimizations(
00262 MapBuilderContextInterface::GlobalSlamOptimizationCallback callback) {
00263 absl::MutexLock locker(&subscriptions_lock_);
00264 global_slam_subscriptions_.emplace(current_subscription_index_, callback);
00265 return current_subscription_index_++;
00266 }
00267
00268 void MapBuilderServer::UnsubscribeGlobalSlamOptimizations(
00269 int subscription_index) {
00270 absl::MutexLock locker(&subscriptions_lock_);
00271 CHECK_EQ(global_slam_subscriptions_.erase(subscription_index), 1u);
00272 }
00273
00274 void MapBuilderServer::NotifyFinishTrajectory(int trajectory_id) {
00275 absl::MutexLock locker(&subscriptions_lock_);
00276 for (auto& entry : local_slam_subscriptions_[trajectory_id]) {
00277 MapBuilderContextInterface::LocalSlamSubscriptionCallback callback =
00278 entry.second;
00279
00280 callback(nullptr);
00281 }
00282 }
00283
00284 void MapBuilderServer::WaitUntilIdle() {
00285 incoming_data_queue_.WaitUntilEmpty();
00286 map_builder_->pose_graph()->RunFinalOptimization();
00287 }
00288
00289 void MapBuilderServer::RegisterMetrics(metrics::FamilyFactory* factory) {
00290 auto* queue_length = factory->NewGaugeFamily(
00291 "cloud_internal_map_builder_server_incoming_data_queue_length",
00292 "Incoming SLAM Data Queue length");
00293 kIncomingDataQueueMetric = queue_length->Add({});
00294 }
00295
00296 }
00297 }