map_builder_server.cc
Go to the documentation of this file.
1 /*
2  * Copyright 2017 The Cartographer Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
42 #include "glog/logging.h"
43 
44 namespace cartographer {
45 namespace cloud {
46 namespace {
47 
48 static auto* kIncomingDataQueueMetric = metrics::Gauge::Null();
49 const common::Duration kPopTimeout = common::FromMilliseconds(100);
50 
51 } // namespace
52 
54  const proto::MapBuilderServerOptions& map_builder_server_options,
55  std::unique_ptr<mapping::MapBuilderInterface> map_builder)
56  : map_builder_(std::move(map_builder)) {
57  async_grpc::Server::Builder server_builder;
58  server_builder.SetServerAddress(map_builder_server_options.server_address());
59  server_builder.SetNumGrpcThreads(
60  map_builder_server_options.num_grpc_threads());
61  server_builder.SetNumEventThreads(
62  map_builder_server_options.num_event_threads());
63  if (!map_builder_server_options.uplink_server_address().empty()) {
65  map_builder_server_options.uplink_server_address(),
66  map_builder_server_options.upload_batch_size(),
67  map_builder_server_options.enable_ssl_encryption());
68  }
69  server_builder.RegisterHandler<handlers::AddTrajectoryHandler>();
70  server_builder.RegisterHandler<handlers::AddOdometryDataHandler>();
71  server_builder.RegisterHandler<handlers::AddImuDataHandler>();
72  server_builder.RegisterHandler<handlers::AddRangefinderDataHandler>();
73  server_builder.RegisterHandler<handlers::AddFixedFramePoseDataHandler>();
74  server_builder.RegisterHandler<handlers::AddLandmarkDataHandler>();
75  server_builder.RegisterHandler<handlers::AddSensorDataBatchHandler>();
76  server_builder.RegisterHandler<handlers::FinishTrajectoryHandler>();
77  server_builder
78  .RegisterHandler<handlers::ReceiveGlobalSlamOptimizationsHandler>();
79  server_builder.RegisterHandler<handlers::ReceiveLocalSlamResultsHandler>();
80  server_builder.RegisterHandler<handlers::GetSubmapHandler>();
81  server_builder.RegisterHandler<handlers::GetTrajectoryNodePosesHandler>();
82  server_builder.RegisterHandler<handlers::GetLandmarkPosesHandler>();
83  server_builder.RegisterHandler<handlers::GetAllSubmapPosesHandler>();
84  server_builder.RegisterHandler<handlers::GetLocalToGlobalTransformHandler>();
85  server_builder.RegisterHandler<handlers::GetConstraintsHandler>();
86  server_builder.RegisterHandler<handlers::IsTrajectoryFinishedHandler>();
87  server_builder.RegisterHandler<handlers::IsTrajectoryFrozenHandler>();
88  server_builder.RegisterHandler<handlers::LoadStateHandler>();
89  server_builder.RegisterHandler<handlers::RunFinalOptimizationHandler>();
90  server_builder.RegisterHandler<handlers::WriteStateHandler>();
91  server_builder.RegisterHandler<handlers::SetLandmarkPoseHandler>();
92  grpc_server_ = server_builder.Build();
93  if (map_builder_server_options.map_builder_options()
94  .use_trajectory_builder_2d()) {
95  grpc_server_->SetExecutionContext(
97  } else if (map_builder_server_options.map_builder_options()
98  .use_trajectory_builder_3d()) {
99  grpc_server_->SetExecutionContext(
101  } else {
102  LOG(FATAL)
103  << "Set either use_trajectory_builder_2d or use_trajectory_builder_3d";
104  }
105  map_builder_->pose_graph()->SetGlobalSlamOptimizationCallback(
107  std::placeholders::_1, std::placeholders::_2));
108 }
109 
111  shutting_down_ = false;
114  }
115  StartSlamThread();
116  grpc_server_->Start();
117 }
118 
120  grpc_server_->WaitForShutdown();
121  if (slam_thread_) {
122  slam_thread_->join();
123  }
125  local_trajectory_uploader_->Shutdown();
126  }
127 }
128 
130  shutting_down_ = true;
131  grpc_server_->Shutdown();
132  if (slam_thread_) {
133  slam_thread_->join();
134  slam_thread_.reset();
135  }
137  local_trajectory_uploader_->Shutdown();
139  }
140 }
141 
143  LOG(INFO) << "Starting SLAM thread.";
144  while (!shutting_down_) {
145  kIncomingDataQueueMetric->Set(incoming_data_queue_.Size());
146  std::unique_ptr<MapBuilderContextInterface::Data> sensor_data =
147  incoming_data_queue_.PopWithTimeout(kPopTimeout);
148  if (sensor_data) {
150  ->AddSensorDataToTrajectory(*sensor_data);
151  }
152  }
153 }
154 
156  CHECK(!slam_thread_);
157 
158  // Start the SLAM processing thread.
159  slam_thread_ = common::make_unique<std::thread>(
160  [this]() { this->ProcessSensorDataQueue(); });
161 }
162 
164  int trajectory_id, common::Time time, transform::Rigid3d local_pose,
165  sensor::RangeData range_data,
166  std::unique_ptr<const mapping::TrajectoryBuilderInterface::InsertionResult>
167  insertion_result) {
168  auto shared_range_data =
169  std::make_shared<sensor::RangeData>(std::move(range_data));
170 
171  // If there is an uplink server and a submap insertion happened, enqueue this
172  // local SLAM result for uploading.
173  if (insertion_result &&
174  grpc_server_->GetUnsynchronizedContext<MapBuilderContextInterface>()
176  auto sensor_data = common::make_unique<proto::SensorData>();
177  auto sensor_id =
178  grpc_server_->GetUnsynchronizedContext<MapBuilderContextInterface>()
179  ->local_trajectory_uploader()
180  ->GetLocalSlamResultSensorId(trajectory_id);
181  CreateSensorDataForLocalSlamResult(sensor_id.id, trajectory_id, time,
183  *insertion_result, sensor_data.get());
184  // TODO(cschuet): Make this more robust.
185  if (insertion_result->insertion_submaps.front()->finished()) {
187  }
188  grpc_server_->GetUnsynchronizedContext<MapBuilderContextInterface>()
189  ->local_trajectory_uploader()
190  ->EnqueueSensorData(std::move(sensor_data));
191  }
192 
194  for (auto& entry : local_slam_subscriptions_[trajectory_id]) {
195  auto copy_of_insertion_result =
196  insertion_result
199  *insertion_result)
200  : nullptr;
202  entry.second;
203  if (!callback(
204  common::make_unique<MapBuilderContextInterface::LocalSlamResult>(
206  trajectory_id, time, local_pose, shared_range_data,
207  std::move(copy_of_insertion_result)}))) {
208  LOG(INFO) << "Removing subscription with index: " << entry.first;
209  CHECK_EQ(local_slam_subscriptions_[trajectory_id].erase(entry.first), 1u);
210  }
211  }
212 }
213 
215  const std::map<int, mapping::SubmapId>& last_optimized_submap_ids,
216  const std::map<int, mapping::NodeId>& last_optimized_node_ids) {
218  for (auto& entry : global_slam_subscriptions_) {
219  if (!entry.second(last_optimized_submap_ids, last_optimized_node_ids)) {
220  LOG(INFO) << "Removing subscription with index: " << entry.first;
221  CHECK_EQ(global_slam_subscriptions_.erase(entry.first), 1u);
222  }
223  }
224 }
225 
228  int trajectory_id,
231  local_slam_subscriptions_[trajectory_id].emplace(current_subscription_index_,
232  callback);
234  trajectory_id, current_subscription_index_++};
235 }
236 
239  subscription_id) {
241  CHECK_EQ(local_slam_subscriptions_[subscription_id.trajectory_id].erase(
242  subscription_id.subscription_index),
243  1u);
244 }
245 
249  global_slam_subscriptions_.emplace(current_subscription_index_, callback);
251 }
252 
254  int subscription_index) {
256  CHECK_EQ(global_slam_subscriptions_.erase(subscription_index), 1u);
257 }
258 
261  for (auto& entry : local_slam_subscriptions_[trajectory_id]) {
263  entry.second;
264  // 'nullptr' signals subscribers that the trajectory finished.
265  callback(nullptr);
266  }
267 }
268 
270  incoming_data_queue_.WaitUntilEmpty();
271  map_builder_->pose_graph()->RunFinalOptimization();
272 }
273 
275  auto* queue_length = factory->NewGaugeFamily(
276  "cloud_internal_map_builder_server_incoming_data_queue_length",
277  "Incoming SLAM Data Queue length");
278  kIncomingDataQueueMetric = queue_length->Add({});
279 }
280 
281 } // namespace cloud
282 } // namespace cartographer
common::Duration FromMilliseconds(const int64 milliseconds)
Definition: time.cc:43
void OnGlobalSlamOptimizations(const std::map< int, mapping::SubmapId > &last_optimized_submap_ids, const std::map< int, mapping::NodeId > &last_optimized_node_ids)
std::unique_ptr< mapping::MapBuilderInterface > map_builder_
virtual Family< Gauge > * NewGaugeFamily(const std::string &name, const std::string &description)=0
_Unique_if< T >::_Single_object make_unique(Args &&... args)
Definition: make_unique.h:46
MapBuilderContextInterface::LocalSlamSubscriptionId SubscribeLocalSlamResults(int trajectory_id, MapBuilderContextInterface::LocalSlamSubscriptionCallback callback)
std::function< bool(std::unique_ptr< LocalSlamResult >)> LocalSlamSubscriptionCallback
void NotifyFinishTrajectory(int trajectory_id)
std::function< bool(const std::map< int, mapping::SubmapId > &, const std::map< int, mapping::NodeId > &)> GlobalSlamOptimizationCallback
std::unique_ptr< MapBuilderInterface > map_builder_
std::unique_ptr< async_grpc::Server > grpc_server_
virtual void EnqueueSensorData(int trajectory_id, std::unique_ptr< sensor::Data > data)=0
UniversalTimeScaleClock::time_point Time
Definition: time.h:44
void OnLocalSlamResult(int trajectory_id, common::Time time, transform::Rigid3d local_pose, sensor::RangeData range_data, std::unique_ptr< const mapping::TrajectoryBuilderInterface::InsertionResult > insertion_result)
std::unique_ptr< LocalTrajectoryUploaderInterface > CreateLocalTrajectoryUploader(const std::string &uplink_server_address, int batch_size, bool enable_ssl_encryption)
MapBuilderServer(const proto::MapBuilderServerOptions &map_builder_server_options, std::unique_ptr< mapping::MapBuilderInterface > map_builder)
static time_point time
UniversalTimeScaleClock::duration Duration
Definition: time.h:43
std::unique_ptr< LocalTrajectoryUploaderInterface > local_trajectory_uploader_
int SubscribeGlobalSlamOptimizations(MapBuilderContextInterface::GlobalSlamOptimizationCallback callback)
std::unique_ptr< std::thread > slam_thread_
void UnsubscribeLocalSlamResults(const MapBuilderContextInterface::LocalSlamSubscriptionId &subscription_id)
common::BlockingQueue< std::unique_ptr< MapBuilderContextInterface::Data > > incoming_data_queue_
static void RegisterMetrics(metrics::FamilyFactory *family_factory)
void CreateSensorDataForLocalSlamResult(const std::string &sensor_id, int trajectory_id, common::Time time, int starting_submap_index, const mapping::TrajectoryBuilderInterface::InsertionResult &insertion_result, proto::SensorData *proto)
Mutex::Locker MutexLocker
Definition: mutex.h:95
virtual LocalTrajectoryUploaderInterface * local_trajectory_uploader()=0
void UnsubscribeGlobalSlamOptimizations(int subscription_index)
static Gauge * Null()
Definition: gauge.cc:36


cartographer
Author(s): The Cartographer Authors
autogenerated on Mon Feb 28 2022 22:00:58