Program Listing for File lock_free_queue.hpp

Return to documentation for file (include/realtime_tools/lock_free_queue.hpp)

// Copyright 2025 PAL Robotics S.L.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.


#ifndef REALTIME_TOOLS__LOCK_FREE_QUEUE_HPP_
#define REALTIME_TOOLS__LOCK_FREE_QUEUE_HPP_

#include <chrono>
#include <mutex>
#include <type_traits>
#include <utility>

#include <boost/lockfree/queue.hpp>
#include <boost/lockfree/spsc_queue.hpp>
#include <boost/lockfree/stack.hpp>

namespace
{
// Trait to check if the capacity is set
template <typename T>
struct has_capacity : std::false_type
{
};

template <typename T, std::size_t Capacity>
struct has_capacity<boost::lockfree::spsc_queue<T, boost::lockfree::capacity<Capacity>>>
: std::true_type
{
};

template <typename T, std::size_t Capacity>
struct has_capacity<boost::lockfree::queue<T, boost::lockfree::capacity<Capacity>>> : std::true_type
{
};

template <typename T, std::size_t Capacity, bool FixedSize>
struct has_capacity<boost::lockfree::queue<
  T, boost::lockfree::capacity<Capacity>, boost::lockfree::fixed_sized<FixedSize>>> : std::true_type
{
};

// Traits to check if the queue is spsc_queue
template <typename T>
struct is_spsc_queue : std::false_type
{
};

template <typename T>
struct is_spsc_queue<boost::lockfree::spsc_queue<T>> : std::true_type
{
};

template <typename T, std::size_t Capacity>
struct is_spsc_queue<boost::lockfree::spsc_queue<T, boost::lockfree::capacity<Capacity>>>
: std::true_type
{
};

// Traits to get the capacity of the queue
// Default case: no capacity
template <typename T>
struct get_boost_lockfree_queue_capacity
{
  static constexpr std::size_t value = 0;  // Default to 0 if capacity is not defined
};

// Specialization for queues with capacity
template <typename T, std::size_t Capacity>
struct get_boost_lockfree_queue_capacity<
  boost::lockfree::spsc_queue<T, boost::lockfree::capacity<Capacity>>>
{
  static constexpr std::size_t value = Capacity;
};

// Specialization for queues with capacity
template <typename T, std::size_t Capacity>
struct get_boost_lockfree_queue_capacity<
  boost::lockfree::queue<T, boost::lockfree::capacity<Capacity>>>
{
  static constexpr std::size_t value = Capacity;
};

// Specialization for queues with capacity
template <typename T, std::size_t Capacity, bool FixedSize>
struct get_boost_lockfree_queue_capacity<boost::lockfree::queue<
  T, boost::lockfree::capacity<Capacity>, boost::lockfree::fixed_sized<FixedSize>>>
{
  static constexpr std::size_t value = Capacity;
};

}  // namespace

namespace realtime_tools
{
template <typename DataType, typename LockFreeContainer>
class LockFreeQueueBase
{
public:
  using T = DataType;

  template <
    bool HasCapacity = has_capacity<LockFreeContainer>::value,
    typename std::enable_if_t<HasCapacity, int> = 0>
  LockFreeQueueBase() : capacity_(get_boost_lockfree_queue_capacity<LockFreeContainer>::value)
  {
  }

  template <
    bool HasCapacity = has_capacity<LockFreeContainer>::value,
    typename std::enable_if_t<!HasCapacity, int> = 1>
  explicit LockFreeQueueBase(std::size_t capacity) : data_queue_(capacity), capacity_(capacity)
  {
  }

  virtual ~LockFreeQueueBase() = default;

  [[nodiscard]] bool pop(T & data) { return data_queue_.pop(data); }

  template <typename U>
  [[nodiscard]] std::enable_if_t<std::is_convertible_v<T, U>, bool> pop(U & data)
  {
    return data_queue_.pop(data);
  }

  [[nodiscard]] bool push(const T & data)
  {
    if constexpr (is_spsc_queue<LockFreeContainer>::value) {
      return data_queue_.push(data);
    } else {
      return data_queue_.bounded_push(data);
    }
  }

  template <typename U>
  [[nodiscard]] std::enable_if_t<std::is_convertible_v<T, U>, bool> push(const U & data)
  {
    if constexpr (is_spsc_queue<LockFreeContainer>::value) {
      return data_queue_.push(data);
    } else {
      return data_queue_.bounded_push(data);
    }
  }

  template <typename U>
  [[nodiscard]] std::enable_if_t<std::is_convertible_v<T, U>, bool> bounded_push(const U & data)
  {
    if (!push(data)) {
      T dummy;
      data_queue_.pop(dummy);
      return push(data);
    }
    return true;
  }

  [[nodiscard]] bool get_latest(T & data)
  {
    return data_queue_.consume_all([&data](const T & d) { data = d; }) > 0;
  }

  bool empty() const
  {
    if constexpr (is_spsc_queue<LockFreeContainer>::value) {
      return data_queue_.read_available() == 0;
    } else {
      return data_queue_.empty();
    }
  }

  size_t capacity() const { return capacity_; }

  template <
    bool IsSPSCQueue = is_spsc_queue<LockFreeContainer>::value,
    typename std::enable_if_t<IsSPSCQueue, int> = 0>
  std::size_t size() const
  {
    return data_queue_.read_available();
  }

  bool is_lock_free() const
  {
    return (is_spsc_queue<LockFreeContainer>::value) || data_queue_.is_lock_free();
  }

  const LockFreeContainer & get_lockfree_container() const { return data_queue_; }

  LockFreeContainer & get_lockfree_container() { return data_queue_; }

private:
  LockFreeContainer data_queue_;
  std::size_t capacity_;
};  // class

template <class DataType, std::size_t Capacity = 0>
using LockFreeSPSCQueue = std::conditional_t<
  Capacity == 0, LockFreeQueueBase<DataType, boost::lockfree::spsc_queue<DataType>>,
  LockFreeQueueBase<
    DataType, boost::lockfree::spsc_queue<DataType, boost::lockfree::capacity<Capacity>>>>;

template <class DataType, std::size_t Capacity = 0, bool FixedSize = true>
using LockFreeMPMCQueue = std::conditional_t<
  Capacity == 0,
  LockFreeQueueBase<
    DataType, boost::lockfree::queue<DataType, boost::lockfree::fixed_sized<FixedSize>>>,
  LockFreeQueueBase<
    DataType,
    boost::lockfree::queue<
      DataType, boost::lockfree::capacity<Capacity>, boost::lockfree::fixed_sized<FixedSize>>>>;

}  // namespace realtime_tools
#endif  // REALTIME_TOOLS__LOCK_FREE_QUEUE_HPP_