ordered_multi_queue.cc
Go to the documentation of this file.
00001 /*
00002  * Copyright 2016 The Cartographer Authors
00003  *
00004  * Licensed under the Apache License, Version 2.0 (the "License");
00005  * you may not use this file except in compliance with the License.
00006  * You may obtain a copy of the License at
00007  *
00008  *      http://www.apache.org/licenses/LICENSE-2.0
00009  *
00010  * Unless required by applicable law or agreed to in writing, software
00011  * distributed under the License is distributed on an "AS IS" BASIS,
00012  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00013  * See the License for the specific language governing permissions and
00014  * limitations under the License.
00015  */
00016 
00017 #include "cartographer/sensor/internal/ordered_multi_queue.h"
00018 
00019 #include <algorithm>
00020 #include <sstream>
00021 #include <vector>
00022 
00023 #include "absl/memory/memory.h"
00024 #include "glog/logging.h"
00025 
00026 namespace cartographer {
00027 namespace sensor {
00028 
00029 namespace {
00030 
00031 // Number of items that can be queued up before we log which queues are waiting
00032 // for data.
00033 const int kMaxQueueSize = 500;
00034 
00035 }  // namespace
00036 
00037 inline std::ostream& operator<<(std::ostream& out, const QueueKey& key) {
00038   return out << '(' << key.trajectory_id << ", " << key.sensor_id << ')';
00039 }
00040 
00041 OrderedMultiQueue::OrderedMultiQueue() {}
00042 
00043 OrderedMultiQueue::~OrderedMultiQueue() {
00044   for (auto& entry : queues_) {
00045     CHECK(entry.second.finished);
00046   }
00047 }
00048 
00049 void OrderedMultiQueue::AddQueue(const QueueKey& queue_key, Callback callback) {
00050   CHECK_EQ(queues_.count(queue_key), 0);
00051   queues_[queue_key].callback = std::move(callback);
00052 }
00053 
00054 void OrderedMultiQueue::MarkQueueAsFinished(const QueueKey& queue_key) {
00055   auto it = queues_.find(queue_key);
00056   CHECK(it != queues_.end()) << "Did not find '" << queue_key << "'.";
00057   auto& queue = it->second;
00058   CHECK(!queue.finished);
00059   queue.finished = true;
00060   Dispatch();
00061 }
00062 
00063 void OrderedMultiQueue::Add(const QueueKey& queue_key,
00064                             std::unique_ptr<Data> data) {
00065   auto it = queues_.find(queue_key);
00066   if (it == queues_.end()) {
00067     LOG_EVERY_N(WARNING, 1000)
00068         << "Ignored data for queue: '" << queue_key << "'";
00069     return;
00070   }
00071   it->second.queue.Push(std::move(data));
00072   Dispatch();
00073 }
00074 
00075 void OrderedMultiQueue::Flush() {
00076   std::vector<QueueKey> unfinished_queues;
00077   for (auto& entry : queues_) {
00078     if (!entry.second.finished) {
00079       unfinished_queues.push_back(entry.first);
00080     }
00081   }
00082   for (auto& unfinished_queue : unfinished_queues) {
00083     MarkQueueAsFinished(unfinished_queue);
00084   }
00085 }
00086 
00087 QueueKey OrderedMultiQueue::GetBlocker() const {
00088   CHECK(!queues_.empty());
00089   return blocker_;
00090 }
00091 
00092 void OrderedMultiQueue::Dispatch() {
00093   while (true) {
00094     const Data* next_data = nullptr;
00095     Queue* next_queue = nullptr;
00096     QueueKey next_queue_key;
00097     for (auto it = queues_.begin(); it != queues_.end();) {
00098       const auto* data = it->second.queue.Peek<Data>();
00099       if (data == nullptr) {
00100         if (it->second.finished) {
00101           queues_.erase(it++);
00102           continue;
00103         }
00104         CannotMakeProgress(it->first);
00105         return;
00106       }
00107       if (next_data == nullptr || data->GetTime() < next_data->GetTime()) {
00108         next_data = data;
00109         next_queue = &it->second;
00110         next_queue_key = it->first;
00111       }
00112       CHECK_LE(last_dispatched_time_, next_data->GetTime())
00113           << "Non-sorted data added to queue: '" << it->first << "'";
00114       ++it;
00115     }
00116     if (next_data == nullptr) {
00117       CHECK(queues_.empty());
00118       return;
00119     }
00120 
00121     // If we haven't dispatched any data for this trajectory yet, fast forward
00122     // all queues of this trajectory until a common start time has been reached.
00123     const common::Time common_start_time =
00124         GetCommonStartTime(next_queue_key.trajectory_id);
00125 
00126     if (next_data->GetTime() >= common_start_time) {
00127       // Happy case, we are beyond the 'common_start_time' already.
00128       last_dispatched_time_ = next_data->GetTime();
00129       next_queue->callback(next_queue->queue.Pop());
00130     } else if (next_queue->queue.Size() < 2) {
00131       if (!next_queue->finished) {
00132         // We cannot decide whether to drop or dispatch this yet.
00133         CannotMakeProgress(next_queue_key);
00134         return;
00135       }
00136       last_dispatched_time_ = next_data->GetTime();
00137       next_queue->callback(next_queue->queue.Pop());
00138     } else {
00139       // We take a peek at the time after next data. If it also is not beyond
00140       // 'common_start_time' we drop 'next_data', otherwise we just found the
00141       // first packet to dispatch from this queue.
00142       std::unique_ptr<Data> next_data_owner = next_queue->queue.Pop();
00143       if (next_queue->queue.Peek<Data>()->GetTime() > common_start_time) {
00144         last_dispatched_time_ = next_data->GetTime();
00145         next_queue->callback(std::move(next_data_owner));
00146       }
00147     }
00148   }
00149 }
00150 
00151 void OrderedMultiQueue::CannotMakeProgress(const QueueKey& queue_key) {
00152   blocker_ = queue_key;
00153   for (auto& entry : queues_) {
00154     if (entry.second.queue.Size() > kMaxQueueSize) {
00155       LOG_EVERY_N(WARNING, 60) << "Queue waiting for data: " << queue_key;
00156       return;
00157     }
00158   }
00159 }
00160 
00161 common::Time OrderedMultiQueue::GetCommonStartTime(const int trajectory_id) {
00162   auto emplace_result = common_start_time_per_trajectory_.emplace(
00163       trajectory_id, common::Time::min());
00164   common::Time& common_start_time = emplace_result.first->second;
00165   if (emplace_result.second) {
00166     for (auto& entry : queues_) {
00167       if (entry.first.trajectory_id == trajectory_id) {
00168         common_start_time = std::max(
00169             common_start_time, entry.second.queue.Peek<Data>()->GetTime());
00170       }
00171     }
00172     LOG(INFO) << "All sensor data for trajectory " << trajectory_id
00173               << " is available starting at '" << common_start_time << "'.";
00174   }
00175   return common_start_time;
00176 }
00177 
00178 }  // namespace sensor
00179 }  // namespace cartographer


cartographer
Author(s): The Cartographer Authors
autogenerated on Thu May 9 2019 02:27:35