22 #include "async_grpc/client.h" 29 #include "glog/logging.h" 30 #include "grpc++/grpc++.h" 38 constexpr
int kConnectionTimeoutInSecond = 10;
41 class LocalTrajectoryUploader :
public LocalTrajectoryUploaderInterface {
43 LocalTrajectoryUploader(
const std::string &uplink_server_address,
44 int batch_size,
bool enable_ssl_encryption);
45 ~LocalTrajectoryUploader();
52 void Shutdown() final;
55 int local_trajectory_id, const std::set<
SensorId> &expected_sensor_ids,
56 const mapping::proto::TrajectoryBuilderOptions &trajectory_options) final;
57 void FinishTrajectory(
int local_trajectory_id) final;
58 void EnqueueSensorData(std::unique_ptr<proto::SensorData> sensor_data) final;
60 SensorId GetLocalSlamResultSensorId(
int local_trajectory_id) const final {
62 "local_slam_result_" + std::to_string(local_trajectory_id)};
66 void ProcessSendQueue();
67 void TranslateTrajectoryId(proto::SensorMetadata *sensor_metadata);
72 common::BlockingQueue<std::unique_ptr<proto::SensorData>>
send_queue_;
77 LocalTrajectoryUploader::LocalTrajectoryUploader(
78 const std::string &uplink_server_address,
int batch_size,
79 bool enable_ssl_encryption)
81 uplink_server_address,
83 ? ::grpc::SslCredentials(::grpc::SslCredentialsOptions())
84 : ::grpc::InsecureChannelCredentials())),
86 std::chrono::system_clock::time_point deadline(
87 std::chrono::system_clock::now() +
88 std::chrono::seconds(kConnectionTimeoutInSecond));
89 LOG(INFO) <<
"Connecting to uplink " << uplink_server_address;
91 LOG(FATAL) <<
"Failed to connect to " << uplink_server_address;
95 LocalTrajectoryUploader::~LocalTrajectoryUploader() {}
97 void LocalTrajectoryUploader::Start() {
101 make_unique<std::thread>([
this]() { this->ProcessSendQueue(); });
104 void LocalTrajectoryUploader::Shutdown() {
111 void LocalTrajectoryUploader::ProcessSendQueue() {
112 LOG(INFO) <<
"Starting uploader thread.";
113 proto::AddSensorDataBatchRequest batch_request;
115 auto sensor_data =
send_queue_.PopWithTimeout(kPopTimeout);
117 proto::SensorData *added_sensor_data = batch_request.add_sensor_data();
118 *added_sensor_data = *sensor_data;
119 TranslateTrajectoryId(added_sensor_data->mutable_sensor_metadata());
123 if (added_sensor_data->has_local_slam_result_data()) {
124 for (mapping::proto::Submap &mutable_submap :
125 *added_sensor_data->mutable_local_slam_result_data()
126 ->mutable_submaps()) {
127 mutable_submap.mutable_submap_id()->set_trajectory_id(
128 added_sensor_data->sensor_metadata().trajectory_id());
132 if (batch_request.sensor_data_size() ==
batch_size_) {
133 async_grpc::Client<handlers::AddSensorDataBatchSignature> client(
136 CHECK(client.Write(batch_request));
137 batch_request.clear_sensor_data();
143 void LocalTrajectoryUploader::TranslateTrajectoryId(
144 proto::SensorMetadata *sensor_metadata) {
145 int cloud_trajectory_id =
147 sensor_metadata->set_trajectory_id(cloud_trajectory_id);
150 void LocalTrajectoryUploader::AddTrajectory(
151 int local_trajectory_id,
const std::set<SensorId> &expected_sensor_ids,
152 const mapping::proto::TrajectoryBuilderOptions &trajectory_options) {
153 proto::AddTrajectoryRequest request;
154 *request.mutable_trajectory_builder_options() = trajectory_options;
155 for (
const SensorId &sensor_id : expected_sensor_ids) {
161 *request.add_expected_sensor_ids() =
163 async_grpc::Client<handlers::AddTrajectorySignature> client(
client_channel_);
164 CHECK(client.Write(request));
167 client.response().trajectory_id();
170 void LocalTrajectoryUploader::FinishTrajectory(
int local_trajectory_id) {
172 int cloud_trajectory_id =
174 proto::FinishTrajectoryRequest request;
175 request.set_trajectory_id(cloud_trajectory_id);
176 async_grpc::Client<handlers::FinishTrajectorySignature> client(
178 CHECK(client.Write(request));
181 void LocalTrajectoryUploader::EnqueueSensorData(
182 std::unique_ptr<proto::SensorData> sensor_data) {
189 const std::string &uplink_server_address,
int batch_size,
190 bool enable_ssl_encryption) {
191 return make_unique<LocalTrajectoryUploader>(uplink_server_address, batch_size,
192 enable_ssl_encryption);
common::Duration FromMilliseconds(const int64 milliseconds)
_Unique_if< T >::_Single_object make_unique(Args &&... args)
std::unique_ptr< LocalTrajectoryUploaderInterface > CreateLocalTrajectoryUploader(const std::string &uplink_server_address, int batch_size, bool enable_ssl_encryption)
Duration FromSeconds(const double seconds)
UniversalTimeScaleClock::duration Duration
std::map< int, int > local_to_cloud_trajectory_id_map_
common::BlockingQueue< std::unique_ptr< proto::SensorData > > send_queue_
std::unique_ptr< std::thread > upload_thread_
proto::SensorId ToProto(const mapping::TrajectoryBuilderInterface::SensorId &sensor_id)
std::shared_ptr<::grpc::Channel > client_channel_