00001 /* 00002 * Copyright 2016 The Cartographer Authors 00003 * 00004 * Licensed under the Apache License, Version 2.0 (the "License"); 00005 * you may not use this file except in compliance with the License. 00006 * You may obtain a copy of the License at 00007 * 00008 * http://www.apache.org/licenses/LICENSE-2.0 00009 * 00010 * Unless required by applicable law or agreed to in writing, software 00011 * distributed under the License is distributed on an "AS IS" BASIS, 00012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 00013 * See the License for the specific language governing permissions and 00014 * limitations under the License. 00015 */ 00016 00017 #ifndef CARTOGRAPHER_SENSOR_INTERNAL_ORDERED_MULTI_QUEUE_H_ 00018 #define CARTOGRAPHER_SENSOR_INTERNAL_ORDERED_MULTI_QUEUE_H_ 00019 00020 #include <functional> 00021 #include <map> 00022 #include <memory> 00023 #include <string> 00024 #include <tuple> 00025 00026 #include "cartographer/common/blocking_queue.h" 00027 #include "cartographer/common/port.h" 00028 #include "cartographer/common/time.h" 00029 #include "cartographer/sensor/internal/dispatchable.h" 00030 00031 namespace cartographer { 00032 namespace sensor { 00033 00034 struct QueueKey { 00035 int trajectory_id; 00036 std::string sensor_id; 00037 00038 bool operator<(const QueueKey& other) const { 00039 return std::forward_as_tuple(trajectory_id, sensor_id) < 00040 std::forward_as_tuple(other.trajectory_id, other.sensor_id); 00041 } 00042 }; 00043 00044 // Maintains multiple queues of sorted sensor data and dispatches it in merge 00045 // sorted order. It will wait to see at least one value for each unfinished 00046 // queue before dispatching the next time ordered value across all queues. 00047 // 00048 // This class is thread-compatible. 00049 class OrderedMultiQueue { 00050 public: 00051 using Callback = std::function<void(std::unique_ptr<Data>)>; 00052 00053 OrderedMultiQueue(); 00054 OrderedMultiQueue(OrderedMultiQueue&& queue) = default; 00055 00056 ~OrderedMultiQueue(); 00057 00058 // Adds a new queue with key 'queue_key' which must not already exist. 00059 // 'callback' will be called whenever data from this queue can be dispatched. 00060 void AddQueue(const QueueKey& queue_key, Callback callback); 00061 00062 // Marks a queue as finished, i.e. no further data can be added. The queue 00063 // will be removed once the last piece of data from it has been dispatched. 00064 void MarkQueueAsFinished(const QueueKey& queue_key); 00065 00066 // Adds 'data' to a queue with the given 'queue_key'. Data must be added 00067 // sorted per queue. 00068 void Add(const QueueKey& queue_key, std::unique_ptr<Data> data); 00069 00070 // Dispatches all remaining values in sorted order and removes the underlying 00071 // queues. 00072 void Flush(); 00073 00074 // Must only be called if at least one unfinished queue exists. Returns the 00075 // key of a queue that needs more data before the OrderedMultiQueue can 00076 // dispatch data. 00077 QueueKey GetBlocker() const; 00078 00079 private: 00080 struct Queue { 00081 common::BlockingQueue<std::unique_ptr<Data>> queue; 00082 Callback callback; 00083 bool finished = false; 00084 }; 00085 00086 void Dispatch(); 00087 void CannotMakeProgress(const QueueKey& queue_key); 00088 common::Time GetCommonStartTime(int trajectory_id); 00089 00090 // Used to verify that values are dispatched in sorted order. 00091 common::Time last_dispatched_time_ = common::Time::min(); 00092 00093 std::map<int, common::Time> common_start_time_per_trajectory_; 00094 std::map<QueueKey, Queue> queues_; 00095 QueueKey blocker_; 00096 }; 00097 00098 } // namespace sensor 00099 } // namespace cartographer 00100 00101 #endif // CARTOGRAPHER_SENSOR_INTERNAL_ORDERED_MULTI_QUEUE_H_