Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 #include "cartographer/sensor/internal/ordered_multi_queue.h"
00018
00019 #include <algorithm>
00020 #include <sstream>
00021 #include <vector>
00022
00023 #include "absl/memory/memory.h"
00024 #include "glog/logging.h"
00025
00026 namespace cartographer {
00027 namespace sensor {
00028
00029 namespace {
00030
00031
00032
00033 const int kMaxQueueSize = 500;
00034
00035 }
00036
00037 inline std::ostream& operator<<(std::ostream& out, const QueueKey& key) {
00038 return out << '(' << key.trajectory_id << ", " << key.sensor_id << ')';
00039 }
00040
00041 OrderedMultiQueue::OrderedMultiQueue() {}
00042
00043 OrderedMultiQueue::~OrderedMultiQueue() {
00044 for (auto& entry : queues_) {
00045 CHECK(entry.second.finished);
00046 }
00047 }
00048
00049 void OrderedMultiQueue::AddQueue(const QueueKey& queue_key, Callback callback) {
00050 CHECK_EQ(queues_.count(queue_key), 0);
00051 queues_[queue_key].callback = std::move(callback);
00052 }
00053
00054 void OrderedMultiQueue::MarkQueueAsFinished(const QueueKey& queue_key) {
00055 auto it = queues_.find(queue_key);
00056 CHECK(it != queues_.end()) << "Did not find '" << queue_key << "'.";
00057 auto& queue = it->second;
00058 CHECK(!queue.finished);
00059 queue.finished = true;
00060 Dispatch();
00061 }
00062
00063 void OrderedMultiQueue::Add(const QueueKey& queue_key,
00064 std::unique_ptr<Data> data) {
00065 auto it = queues_.find(queue_key);
00066 if (it == queues_.end()) {
00067 LOG_EVERY_N(WARNING, 1000)
00068 << "Ignored data for queue: '" << queue_key << "'";
00069 return;
00070 }
00071 it->second.queue.Push(std::move(data));
00072 Dispatch();
00073 }
00074
00075 void OrderedMultiQueue::Flush() {
00076 std::vector<QueueKey> unfinished_queues;
00077 for (auto& entry : queues_) {
00078 if (!entry.second.finished) {
00079 unfinished_queues.push_back(entry.first);
00080 }
00081 }
00082 for (auto& unfinished_queue : unfinished_queues) {
00083 MarkQueueAsFinished(unfinished_queue);
00084 }
00085 }
00086
00087 QueueKey OrderedMultiQueue::GetBlocker() const {
00088 CHECK(!queues_.empty());
00089 return blocker_;
00090 }
00091
00092 void OrderedMultiQueue::Dispatch() {
00093 while (true) {
00094 const Data* next_data = nullptr;
00095 Queue* next_queue = nullptr;
00096 QueueKey next_queue_key;
00097 for (auto it = queues_.begin(); it != queues_.end();) {
00098 const auto* data = it->second.queue.Peek<Data>();
00099 if (data == nullptr) {
00100 if (it->second.finished) {
00101 queues_.erase(it++);
00102 continue;
00103 }
00104 CannotMakeProgress(it->first);
00105 return;
00106 }
00107 if (next_data == nullptr || data->GetTime() < next_data->GetTime()) {
00108 next_data = data;
00109 next_queue = &it->second;
00110 next_queue_key = it->first;
00111 }
00112 CHECK_LE(last_dispatched_time_, next_data->GetTime())
00113 << "Non-sorted data added to queue: '" << it->first << "'";
00114 ++it;
00115 }
00116 if (next_data == nullptr) {
00117 CHECK(queues_.empty());
00118 return;
00119 }
00120
00121
00122
00123 const common::Time common_start_time =
00124 GetCommonStartTime(next_queue_key.trajectory_id);
00125
00126 if (next_data->GetTime() >= common_start_time) {
00127
00128 last_dispatched_time_ = next_data->GetTime();
00129 next_queue->callback(next_queue->queue.Pop());
00130 } else if (next_queue->queue.Size() < 2) {
00131 if (!next_queue->finished) {
00132
00133 CannotMakeProgress(next_queue_key);
00134 return;
00135 }
00136 last_dispatched_time_ = next_data->GetTime();
00137 next_queue->callback(next_queue->queue.Pop());
00138 } else {
00139
00140
00141
00142 std::unique_ptr<Data> next_data_owner = next_queue->queue.Pop();
00143 if (next_queue->queue.Peek<Data>()->GetTime() > common_start_time) {
00144 last_dispatched_time_ = next_data->GetTime();
00145 next_queue->callback(std::move(next_data_owner));
00146 }
00147 }
00148 }
00149 }
00150
00151 void OrderedMultiQueue::CannotMakeProgress(const QueueKey& queue_key) {
00152 blocker_ = queue_key;
00153 for (auto& entry : queues_) {
00154 if (entry.second.queue.Size() > kMaxQueueSize) {
00155 LOG_EVERY_N(WARNING, 60) << "Queue waiting for data: " << queue_key;
00156 return;
00157 }
00158 }
00159 }
00160
00161 common::Time OrderedMultiQueue::GetCommonStartTime(const int trajectory_id) {
00162 auto emplace_result = common_start_time_per_trajectory_.emplace(
00163 trajectory_id, common::Time::min());
00164 common::Time& common_start_time = emplace_result.first->second;
00165 if (emplace_result.second) {
00166 for (auto& entry : queues_) {
00167 if (entry.first.trajectory_id == trajectory_id) {
00168 common_start_time = std::max(
00169 common_start_time, entry.second.queue.Peek<Data>()->GetTime());
00170 }
00171 }
00172 LOG(INFO) << "All sensor data for trajectory " << trajectory_id
00173 << " is available starting at '" << common_start_time << "'.";
00174 }
00175 return common_start_time;
00176 }
00177
00178 }
00179 }