Program Listing for File timers_manager.hpp

Return to documentation for file (include/rclcpp/experimental/timers_manager.hpp)

// Copyright 2023 iRobot Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef RCLCPP__EXPERIMENTAL__TIMERS_MANAGER_HPP_
#define RCLCPP__EXPERIMENTAL__TIMERS_MANAGER_HPP_

#include <algorithm>
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <functional>
#include <memory>
#include <mutex>
#include <optional>
#include <thread>
#include <utility>
#include <vector>
#include "rclcpp/context.hpp"
#include "rclcpp/timer.hpp"

namespace rclcpp
{
namespace experimental
{

class TimersManager
{
public:
  RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(TimersManager)


  RCLCPP_PUBLIC
  TimersManager(
    std::shared_ptr<rclcpp::Context> context,
    std::function<void(const rclcpp::TimerBase *,
    const std::shared_ptr<void> &)> on_ready_callback = nullptr);

  RCLCPP_PUBLIC
  ~TimersManager();

  RCLCPP_PUBLIC
  void add_timer(rclcpp::TimerBase::SharedPtr timer);

  RCLCPP_PUBLIC
  void remove_timer(rclcpp::TimerBase::SharedPtr timer);

  RCLCPP_PUBLIC
  void clear();

  RCLCPP_PUBLIC
  void start();

  RCLCPP_PUBLIC
  void stop();

  RCLCPP_PUBLIC
  size_t get_number_ready_timers();

  RCLCPP_PUBLIC
  bool execute_head_timer();

  RCLCPP_PUBLIC
  void execute_ready_timer(const rclcpp::TimerBase * timer_id, const std::shared_ptr<void> & data);

  RCLCPP_PUBLIC
  std::optional<std::chrono::nanoseconds> get_head_timeout();

private:
  RCLCPP_DISABLE_COPY(TimersManager)

  using TimerPtr = rclcpp::TimerBase::SharedPtr;
  using WeakTimerPtr = rclcpp::TimerBase::WeakPtr;

  // Forward declaration
  class TimersHeap;

  class WeakTimersHeap
  {
public:
    bool add_timer(TimerPtr timer)
    {
      TimersHeap locked_heap = this->validate_and_lock();
      bool added = locked_heap.add_timer(std::move(timer));

      if (added) {
        // Re-create the weak heap with the new timer added
        this->store(locked_heap);
      }

      return added;
    }

    bool remove_timer(TimerPtr timer)
    {
      TimersHeap locked_heap = this->validate_and_lock();
      bool removed = locked_heap.remove_timer(std::move(timer));

      if (removed) {
        // Re-create the weak heap with the timer removed
        this->store(locked_heap);
      }

      return removed;
    }

    TimerPtr get_timer(const rclcpp::TimerBase * timer_id)
    {
      for (auto & weak_timer : weak_heap_) {
        auto timer = weak_timer.lock();
        if (timer.get() == timer_id) {
          return timer;
        }
      }
      return nullptr;
    }

    const WeakTimerPtr & front() const
    {
      return weak_heap_.front();
    }

    bool empty() const
    {
      return weak_heap_.empty();
    }

    TimersHeap validate_and_lock()
    {
      TimersHeap locked_heap;
      bool any_timer_destroyed = false;

      for (auto weak_timer : weak_heap_) {
        auto timer = weak_timer.lock();
        if (timer) {
          // This timer is valid, so add it to the locked heap
          // Note: we access friend private `owned_heap_` member field.
          locked_heap.owned_heap_.push_back(std::move(timer));
        } else {
          // This timer went out of scope, so we don't add it to locked heap
          // and we mark the corresponding flag.
          // It's not needed to erase it from weak heap, as we are going to re-heapify.
          // Note: we can't exit from the loop here, as we need to find all valid timers.
          any_timer_destroyed = true;
        }
      }

      // If a timer has gone out of scope, then the remaining elements do not represent
      // a valid heap anymore. We need to re-heapify the timers heap.
      if (any_timer_destroyed) {
        locked_heap.heapify();
        // Re-create the weak heap now that elements have been heapified again
        this->store(locked_heap);
      }

      return locked_heap;
    }

    void store(const TimersHeap & heap)
    {
      weak_heap_.clear();
      // Note: we access friend private `owned_heap_` member field.
      for (auto t : heap.owned_heap_) {
        weak_heap_.push_back(t);
      }
    }

    void clear()
    {
      weak_heap_.clear();
    }

private:
    std::vector<WeakTimerPtr> weak_heap_;
  };

  class TimersHeap
  {
public:
    bool add_timer(TimerPtr timer)
    {
      // Nothing to do if the timer is already stored here
      auto it = std::find(owned_heap_.begin(), owned_heap_.end(), timer);
      if (it != owned_heap_.end()) {
        return false;
      }

      owned_heap_.push_back(std::move(timer));
      std::push_heap(owned_heap_.begin(), owned_heap_.end(), timer_greater);

      return true;
    }

    bool remove_timer(TimerPtr timer)
    {
      // Nothing to do if the timer is not stored here
      auto it = std::find(owned_heap_.begin(), owned_heap_.end(), timer);
      if (it == owned_heap_.end()) {
        return false;
      }

      owned_heap_.erase(it);
      this->heapify();

      return true;
    }

    TimerPtr & front()
    {
      return owned_heap_.front();
    }

    const TimerPtr & front() const
    {
      return owned_heap_.front();
    }

    bool empty() const
    {
      return owned_heap_.empty();
    }

    size_t size() const
    {
      return owned_heap_.size();
    }

    size_t get_number_ready_timers() const
    {
      size_t ready_timers = 0;

      for (TimerPtr t : owned_heap_) {
        if (t->is_ready()) {
          ready_timers++;
        }
      }

      return ready_timers;
    }

    void heapify_root()
    {
      // The following code is a more efficient version than doing
      // pop_heap, pop_back, push_back, push_heap
      // as it removes the need for the last push_heap

      // Push the modified element (i.e. the current root) at the bottom of the heap
      owned_heap_.push_back(owned_heap_[0]);
      // Exchange first and last-1 elements and reheapify
      std::pop_heap(owned_heap_.begin(), owned_heap_.end(), timer_greater);
      // Remove last element
      owned_heap_.pop_back();
    }

    void heapify()
    {
      std::make_heap(owned_heap_.begin(), owned_heap_.end(), timer_greater);
    }

    void clear_timers_on_reset_callbacks()
    {
      for (TimerPtr & t : owned_heap_) {
        t->clear_on_reset_callback();
      }
    }

    friend TimersHeap WeakTimersHeap::validate_and_lock();

    friend void WeakTimersHeap::store(const TimersHeap & heap);

private:
    static bool timer_greater(TimerPtr a, TimerPtr b)
    {
      // TODO(alsora): this can cause an error if timers are using different clocks
      return a->time_until_trigger() > b->time_until_trigger();
    }

    std::vector<TimerPtr> owned_heap_;
  };

  void run_timers();

  std::optional<std::chrono::nanoseconds> get_head_timeout_unsafe();

  void execute_ready_timers_unsafe();

  // Callback to be called when timer is ready
  std::function<void(const rclcpp::TimerBase *,
    const std::shared_ptr<void> &)> on_ready_callback_ = nullptr;

  // Thread used to run the timers execution task
  std::thread timers_thread_;
  // Protects access to timers
  std::mutex timers_mutex_;
  // Protects access to stop()
  std::mutex stop_mutex_;
  // Notifies the timers thread whenever timers are added/removed
  std::condition_variable timers_cv_;
  // Flag used as predicate by timers_cv_ that denotes one or more timers being added/removed
  bool timers_updated_ {false};
  // Indicates whether the timers thread is currently running or not
  std::atomic<bool> running_ {false};
  // Parent context used to understand if ROS is still active
  std::shared_ptr<rclcpp::Context> context_;
  // Timers heap storage with weak ownership
  WeakTimersHeap weak_timers_heap_;
};

}  // namespace experimental
}  // namespace rclcpp

#endif  // RCLCPP__EXPERIMENTAL__TIMERS_MANAGER_HPP_