ordered_multi_queue.h
Go to the documentation of this file.
00001 /*
00002  * Copyright 2016 The Cartographer Authors
00003  *
00004  * Licensed under the Apache License, Version 2.0 (the "License");
00005  * you may not use this file except in compliance with the License.
00006  * You may obtain a copy of the License at
00007  *
00008  *      http://www.apache.org/licenses/LICENSE-2.0
00009  *
00010  * Unless required by applicable law or agreed to in writing, software
00011  * distributed under the License is distributed on an "AS IS" BASIS,
00012  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00013  * See the License for the specific language governing permissions and
00014  * limitations under the License.
00015  */
00016 
00017 #ifndef CARTOGRAPHER_SENSOR_INTERNAL_ORDERED_MULTI_QUEUE_H_
00018 #define CARTOGRAPHER_SENSOR_INTERNAL_ORDERED_MULTI_QUEUE_H_
00019 
00020 #include <functional>
00021 #include <map>
00022 #include <memory>
00023 #include <string>
00024 #include <tuple>
00025 
00026 #include "cartographer/common/blocking_queue.h"
00027 #include "cartographer/common/port.h"
00028 #include "cartographer/common/time.h"
00029 #include "cartographer/sensor/internal/dispatchable.h"
00030 
00031 namespace cartographer {
00032 namespace sensor {
00033 
00034 struct QueueKey {
00035   int trajectory_id;
00036   std::string sensor_id;
00037 
00038   bool operator<(const QueueKey& other) const {
00039     return std::forward_as_tuple(trajectory_id, sensor_id) <
00040            std::forward_as_tuple(other.trajectory_id, other.sensor_id);
00041   }
00042 };
00043 
00044 // Maintains multiple queues of sorted sensor data and dispatches it in merge
00045 // sorted order. It will wait to see at least one value for each unfinished
00046 // queue before dispatching the next time ordered value across all queues.
00047 //
00048 // This class is thread-compatible.
00049 class OrderedMultiQueue {
00050  public:
00051   using Callback = std::function<void(std::unique_ptr<Data>)>;
00052 
00053   OrderedMultiQueue();
00054   OrderedMultiQueue(OrderedMultiQueue&& queue) = default;
00055 
00056   ~OrderedMultiQueue();
00057 
00058   // Adds a new queue with key 'queue_key' which must not already exist.
00059   // 'callback' will be called whenever data from this queue can be dispatched.
00060   void AddQueue(const QueueKey& queue_key, Callback callback);
00061 
00062   // Marks a queue as finished, i.e. no further data can be added. The queue
00063   // will be removed once the last piece of data from it has been dispatched.
00064   void MarkQueueAsFinished(const QueueKey& queue_key);
00065 
00066   // Adds 'data' to a queue with the given 'queue_key'. Data must be added
00067   // sorted per queue.
00068   void Add(const QueueKey& queue_key, std::unique_ptr<Data> data);
00069 
00070   // Dispatches all remaining values in sorted order and removes the underlying
00071   // queues.
00072   void Flush();
00073 
00074   // Must only be called if at least one unfinished queue exists. Returns the
00075   // key of a queue that needs more data before the OrderedMultiQueue can
00076   // dispatch data.
00077   QueueKey GetBlocker() const;
00078 
00079  private:
00080   struct Queue {
00081     common::BlockingQueue<std::unique_ptr<Data>> queue;
00082     Callback callback;
00083     bool finished = false;
00084   };
00085 
00086   void Dispatch();
00087   void CannotMakeProgress(const QueueKey& queue_key);
00088   common::Time GetCommonStartTime(int trajectory_id);
00089 
00090   // Used to verify that values are dispatched in sorted order.
00091   common::Time last_dispatched_time_ = common::Time::min();
00092 
00093   std::map<int, common::Time> common_start_time_per_trajectory_;
00094   std::map<QueueKey, Queue> queues_;
00095   QueueKey blocker_;
00096 };
00097 
00098 }  // namespace sensor
00099 }  // namespace cartographer
00100 
00101 #endif  // CARTOGRAPHER_SENSOR_INTERNAL_ORDERED_MULTI_QUEUE_H_


cartographer
Author(s): The Cartographer Authors
autogenerated on Thu May 9 2019 02:27:35