ordered_multi_queue.cc
Go to the documentation of this file.
1 /*
2  * Copyright 2016 The Cartographer Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <algorithm>
20 #include <sstream>
21 #include <vector>
22 
24 #include "glog/logging.h"
25 
26 namespace cartographer {
27 namespace sensor {
28 
29 namespace {
30 
31 // Number of items that can be queued up before we log which queues are waiting
32 // for data.
33 const int kMaxQueueSize = 500;
34 
35 } // namespace
36 
37 inline std::ostream& operator<<(std::ostream& out, const QueueKey& key) {
38  return out << '(' << key.trajectory_id << ", " << key.sensor_id << ')';
39 }
40 
42 
44  for (auto& entry : queues_) {
45  CHECK(entry.second.finished);
46  }
47 }
48 
49 void OrderedMultiQueue::AddQueue(const QueueKey& queue_key, Callback callback) {
50  CHECK_EQ(queues_.count(queue_key), 0);
51  queues_[queue_key].callback = std::move(callback);
52 }
53 
55  auto it = queues_.find(queue_key);
56  CHECK(it != queues_.end()) << "Did not find '" << queue_key << "'.";
57  auto& queue = it->second;
58  CHECK(!queue.finished);
59  queue.finished = true;
60  Dispatch();
61 }
62 
63 void OrderedMultiQueue::Add(const QueueKey& queue_key,
64  std::unique_ptr<Data> data) {
65  auto it = queues_.find(queue_key);
66  if (it == queues_.end()) {
67  LOG_EVERY_N(WARNING, 1000)
68  << "Ignored data for queue: '" << queue_key << "'";
69  return;
70  }
71  it->second.queue.Push(std::move(data));
72  Dispatch();
73 }
74 
76  std::vector<QueueKey> unfinished_queues;
77  for (auto& entry : queues_) {
78  if (!entry.second.finished) {
79  unfinished_queues.push_back(entry.first);
80  }
81  }
82  for (auto& unfinished_queue : unfinished_queues) {
83  MarkQueueAsFinished(unfinished_queue);
84  }
85 }
86 
88  CHECK(!queues_.empty());
89  return blocker_;
90 }
91 
93  while (true) {
94  const Data* next_data = nullptr;
95  Queue* next_queue = nullptr;
96  QueueKey next_queue_key;
97  for (auto it = queues_.begin(); it != queues_.end();) {
98  const auto* data = it->second.queue.Peek<Data>();
99  if (data == nullptr) {
100  if (it->second.finished) {
101  queues_.erase(it++);
102  continue;
103  }
104  CannotMakeProgress(it->first);
105  return;
106  }
107  if (next_data == nullptr || data->GetTime() < next_data->GetTime()) {
108  next_data = data;
109  next_queue = &it->second;
110  next_queue_key = it->first;
111  }
112  CHECK_LE(last_dispatched_time_, next_data->GetTime())
113  << "Non-sorted data added to queue: '" << it->first << "'";
114  ++it;
115  }
116  if (next_data == nullptr) {
117  CHECK(queues_.empty());
118  return;
119  }
120 
121  // If we haven't dispatched any data for this trajectory yet, fast forward
122  // all queues of this trajectory until a common start time has been reached.
123  const common::Time common_start_time =
124  GetCommonStartTime(next_queue_key.trajectory_id);
125 
126  if (next_data->GetTime() >= common_start_time) {
127  // Happy case, we are beyond the 'common_start_time' already.
128  last_dispatched_time_ = next_data->GetTime();
129  next_queue->callback(next_queue->queue.Pop());
130  } else if (next_queue->queue.Size() < 2) {
131  if (!next_queue->finished) {
132  // We cannot decide whether to drop or dispatch this yet.
133  CannotMakeProgress(next_queue_key);
134  return;
135  }
136  last_dispatched_time_ = next_data->GetTime();
137  next_queue->callback(next_queue->queue.Pop());
138  } else {
139  // We take a peek at the time after next data. If it also is not beyond
140  // 'common_start_time' we drop 'next_data', otherwise we just found the
141  // first packet to dispatch from this queue.
142  std::unique_ptr<Data> next_data_owner = next_queue->queue.Pop();
143  if (next_queue->queue.Peek<Data>()->GetTime() > common_start_time) {
144  last_dispatched_time_ = next_data->GetTime();
145  next_queue->callback(std::move(next_data_owner));
146  }
147  }
148  }
149 }
150 
152  blocker_ = queue_key;
153  for (auto& entry : queues_) {
154  if (entry.second.queue.Size() > kMaxQueueSize) {
155  LOG_EVERY_N(WARNING, 60) << "Queue waiting for data: " << queue_key;
156  return;
157  }
158  }
159 }
160 
162  auto emplace_result = common_start_time_per_trajectory_.emplace(
163  trajectory_id, common::Time::min());
164  common::Time& common_start_time = emplace_result.first->second;
165  if (emplace_result.second) {
166  for (auto& entry : queues_) {
167  if (entry.first.trajectory_id == trajectory_id) {
168  common_start_time = std::max(
169  common_start_time, entry.second.queue.Peek<Data>()->GetTime());
170  }
171  }
172  LOG(INFO) << "All sensor data for trajectory " << trajectory_id
173  << " is available starting at '" << common_start_time << "'.";
174  }
175  return common_start_time;
176 }
177 
178 } // namespace sensor
179 } // namespace cartographer
std::function< void(std::unique_ptr< Data >)> Callback
UniversalTimeScaleClock::time_point Time
Definition: time.h:44
void AddQueue(const QueueKey &queue_key, Callback callback)
void MarkQueueAsFinished(const QueueKey &queue_key)
std::ostream & operator<<(std::ostream &out, const QueueKey &key)
void Add(const QueueKey &queue_key, std::unique_ptr< Data > data)
common::Time GetCommonStartTime(int trajectory_id)
common::BlockingQueue< std::unique_ptr< Data > > queue
virtual common::Time GetTime() const =0
void CannotMakeProgress(const QueueKey &queue_key)
std::map< int, common::Time > common_start_time_per_trajectory_


cartographer
Author(s): The Cartographer Authors
autogenerated on Mon Feb 28 2022 22:00:58