local_trajectory_uploader.cc
Go to the documentation of this file.
1 /*
2  * Copyright 2018 The Cartographer Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <map>
20 #include <thread>
21 
22 #include "async_grpc/client.h"
29 #include "glog/logging.h"
30 #include "grpc++/grpc++.h"
31 
32 namespace cartographer {
33 namespace cloud {
34 namespace {
35 
37 
38 constexpr int kConnectionTimeoutInSecond = 10;
39 const common::Duration kPopTimeout = common::FromMilliseconds(100);
40 
41 class LocalTrajectoryUploader : public LocalTrajectoryUploaderInterface {
42  public:
43  LocalTrajectoryUploader(const std::string &uplink_server_address,
44  int batch_size, bool enable_ssl_encryption);
45  ~LocalTrajectoryUploader();
46 
47  // Starts the upload thread.
48  void Start() final;
49 
50  // Shuts down the upload thread. This method blocks until the shutdown is
51  // complete.
52  void Shutdown() final;
53 
54  void AddTrajectory(
55  int local_trajectory_id, const std::set<SensorId> &expected_sensor_ids,
56  const mapping::proto::TrajectoryBuilderOptions &trajectory_options) final;
57  void FinishTrajectory(int local_trajectory_id) final;
58  void EnqueueSensorData(std::unique_ptr<proto::SensorData> sensor_data) final;
59 
60  SensorId GetLocalSlamResultSensorId(int local_trajectory_id) const final {
62  "local_slam_result_" + std::to_string(local_trajectory_id)};
63  }
64 
65  private:
66  void ProcessSendQueue();
67  void TranslateTrajectoryId(proto::SensorMetadata *sensor_metadata);
68 
69  std::shared_ptr<::grpc::Channel> client_channel_;
72  common::BlockingQueue<std::unique_ptr<proto::SensorData>> send_queue_;
73  bool shutting_down_ = false;
74  std::unique_ptr<std::thread> upload_thread_;
75 };
76 
77 LocalTrajectoryUploader::LocalTrajectoryUploader(
78  const std::string &uplink_server_address, int batch_size,
79  bool enable_ssl_encryption)
80  : client_channel_(::grpc::CreateChannel(
81  uplink_server_address,
82  enable_ssl_encryption
83  ? ::grpc::SslCredentials(::grpc::SslCredentialsOptions())
84  : ::grpc::InsecureChannelCredentials())),
85  batch_size_(batch_size) {
86  std::chrono::system_clock::time_point deadline(
87  std::chrono::system_clock::now() +
88  std::chrono::seconds(kConnectionTimeoutInSecond));
89  LOG(INFO) << "Connecting to uplink " << uplink_server_address;
90  if (!client_channel_->WaitForConnected(deadline)) {
91  LOG(FATAL) << "Failed to connect to " << uplink_server_address;
92  }
93 }
94 
95 LocalTrajectoryUploader::~LocalTrajectoryUploader() {}
96 
97 void LocalTrajectoryUploader::Start() {
98  CHECK(!shutting_down_);
99  CHECK(!upload_thread_);
101  make_unique<std::thread>([this]() { this->ProcessSendQueue(); });
102 }
103 
104 void LocalTrajectoryUploader::Shutdown() {
105  CHECK(!shutting_down_);
106  CHECK(upload_thread_);
107  shutting_down_ = true;
108  upload_thread_->join();
109 }
110 
111 void LocalTrajectoryUploader::ProcessSendQueue() {
112  LOG(INFO) << "Starting uploader thread.";
113  proto::AddSensorDataBatchRequest batch_request;
114  while (!shutting_down_) {
115  auto sensor_data = send_queue_.PopWithTimeout(kPopTimeout);
116  if (sensor_data) {
117  proto::SensorData *added_sensor_data = batch_request.add_sensor_data();
118  *added_sensor_data = *sensor_data;
119  TranslateTrajectoryId(added_sensor_data->mutable_sensor_metadata());
120 
121  // A submap also holds a trajectory id that must be translated to uplink's
122  // trajectory id.
123  if (added_sensor_data->has_local_slam_result_data()) {
124  for (mapping::proto::Submap &mutable_submap :
125  *added_sensor_data->mutable_local_slam_result_data()
126  ->mutable_submaps()) {
127  mutable_submap.mutable_submap_id()->set_trajectory_id(
128  added_sensor_data->sensor_metadata().trajectory_id());
129  }
130  }
131 
132  if (batch_request.sensor_data_size() == batch_size_) {
133  async_grpc::Client<handlers::AddSensorDataBatchSignature> client(
134  client_channel_, async_grpc::CreateUnlimitedConstantDelayStrategy(
135  common::FromSeconds(1)));
136  CHECK(client.Write(batch_request));
137  batch_request.clear_sensor_data();
138  }
139  }
140  }
141 }
142 
143 void LocalTrajectoryUploader::TranslateTrajectoryId(
144  proto::SensorMetadata *sensor_metadata) {
145  int cloud_trajectory_id =
146  local_to_cloud_trajectory_id_map_.at(sensor_metadata->trajectory_id());
147  sensor_metadata->set_trajectory_id(cloud_trajectory_id);
148 }
149 
150 void LocalTrajectoryUploader::AddTrajectory(
151  int local_trajectory_id, const std::set<SensorId> &expected_sensor_ids,
152  const mapping::proto::TrajectoryBuilderOptions &trajectory_options) {
153  proto::AddTrajectoryRequest request;
154  *request.mutable_trajectory_builder_options() = trajectory_options;
155  for (const SensorId &sensor_id : expected_sensor_ids) {
156  // Range sensors are not forwarded, but combined into a LocalSlamResult.
157  if (sensor_id.type != SensorId::SensorType::RANGE) {
158  *request.add_expected_sensor_ids() = cloud::ToProto(sensor_id);
159  }
160  }
161  *request.add_expected_sensor_ids() =
162  cloud::ToProto(GetLocalSlamResultSensorId(local_trajectory_id));
163  async_grpc::Client<handlers::AddTrajectorySignature> client(client_channel_);
164  CHECK(client.Write(request));
165  CHECK_EQ(local_to_cloud_trajectory_id_map_.count(local_trajectory_id), 0);
166  local_to_cloud_trajectory_id_map_[local_trajectory_id] =
167  client.response().trajectory_id();
168 }
169 
170 void LocalTrajectoryUploader::FinishTrajectory(int local_trajectory_id) {
171  CHECK_EQ(local_to_cloud_trajectory_id_map_.count(local_trajectory_id), 1);
172  int cloud_trajectory_id =
173  local_to_cloud_trajectory_id_map_[local_trajectory_id];
174  proto::FinishTrajectoryRequest request;
175  request.set_trajectory_id(cloud_trajectory_id);
176  async_grpc::Client<handlers::FinishTrajectorySignature> client(
178  CHECK(client.Write(request));
179 }
180 
181 void LocalTrajectoryUploader::EnqueueSensorData(
182  std::unique_ptr<proto::SensorData> sensor_data) {
183  send_queue_.Push(std::move(sensor_data));
184 }
185 
186 } // namespace
187 
188 std::unique_ptr<LocalTrajectoryUploaderInterface> CreateLocalTrajectoryUploader(
189  const std::string &uplink_server_address, int batch_size,
190  bool enable_ssl_encryption) {
191  return make_unique<LocalTrajectoryUploader>(uplink_server_address, batch_size,
192  enable_ssl_encryption);
193 }
194 
195 } // namespace cloud
196 } // namespace cartographer
common::Duration FromMilliseconds(const int64 milliseconds)
Definition: time.cc:43
_Unique_if< T >::_Single_object make_unique(Args &&... args)
Definition: make_unique.h:46
std::unique_ptr< LocalTrajectoryUploaderInterface > CreateLocalTrajectoryUploader(const std::string &uplink_server_address, int batch_size, bool enable_ssl_encryption)
Duration FromSeconds(const double seconds)
Definition: time.cc:24
bool shutting_down_
UniversalTimeScaleClock::duration Duration
Definition: time.h:43
std::map< int, int > local_to_cloud_trajectory_id_map_
common::BlockingQueue< std::unique_ptr< proto::SensorData > > send_queue_
std::unique_ptr< std::thread > upload_thread_
proto::SensorId ToProto(const mapping::TrajectoryBuilderInterface::SensorId &sensor_id)
std::shared_ptr<::grpc::Channel > client_channel_


cartographer
Author(s): The Cartographer Authors
autogenerated on Mon Feb 28 2022 22:00:58