24 #include "glog/logging.h" 33 const int kMaxQueueSize = 500;
45 CHECK(entry.second.finished);
50 CHECK_EQ(
queues_.count(queue_key), 0);
51 queues_[queue_key].callback = std::move(callback);
55 auto it =
queues_.find(queue_key);
56 CHECK(it !=
queues_.end()) <<
"Did not find '" << queue_key <<
"'.";
57 auto& queue = it->second;
58 CHECK(!queue.finished);
59 queue.finished =
true;
64 std::unique_ptr<Data> data) {
65 auto it =
queues_.find(queue_key);
67 LOG_EVERY_N(WARNING, 1000)
68 <<
"Ignored data for queue: '" << queue_key <<
"'";
71 it->second.queue.Push(std::move(data));
76 std::vector<QueueKey> unfinished_queues;
78 if (!entry.second.finished) {
79 unfinished_queues.push_back(entry.first);
82 for (
auto& unfinished_queue : unfinished_queues) {
94 const Data* next_data =
nullptr;
95 Queue* next_queue =
nullptr;
98 const auto* data = it->second.queue.Peek<
Data>();
99 if (data ==
nullptr) {
100 if (it->second.finished) {
107 if (next_data ==
nullptr || data->
GetTime() < next_data->
GetTime()) {
109 next_queue = &it->second;
110 next_queue_key = it->first;
113 <<
"Non-sorted data added to queue: '" << it->first <<
"'";
116 if (next_data ==
nullptr) {
126 if (next_data->
GetTime() >= common_start_time) {
130 }
else if (next_queue->
queue.Size() < 2) {
142 std::unique_ptr<Data> next_data_owner = next_queue->
queue.Pop();
145 next_queue->
callback(std::move(next_data_owner));
154 if (entry.second.queue.Size() > kMaxQueueSize) {
155 LOG_EVERY_N(WARNING, 60) <<
"Queue waiting for data: " << queue_key;
163 trajectory_id, common::Time::min());
164 common::Time& common_start_time = emplace_result.first->second;
165 if (emplace_result.second) {
167 if (entry.first.trajectory_id == trajectory_id) {
168 common_start_time = std::max(
169 common_start_time, entry.second.queue.Peek<
Data>()->
GetTime());
172 LOG(INFO) <<
"All sensor data for trajectory " << trajectory_id
173 <<
" is available starting at '" << common_start_time <<
"'.";
175 return common_start_time;
std::function< void(std::unique_ptr< Data >)> Callback
QueueKey GetBlocker() const
UniversalTimeScaleClock::time_point Time
common::Time last_dispatched_time_
void AddQueue(const QueueKey &queue_key, Callback callback)
void MarkQueueAsFinished(const QueueKey &queue_key)
std::ostream & operator<<(std::ostream &out, const QueueKey &key)
void Add(const QueueKey &queue_key, std::unique_ptr< Data > data)
std::map< QueueKey, Queue > queues_
common::Time GetCommonStartTime(int trajectory_id)
common::BlockingQueue< std::unique_ptr< Data > > queue
virtual common::Time GetTime() const =0
void CannotMakeProgress(const QueueKey &queue_key)
std::map< int, common::Time > common_start_time_per_trajectory_