Program Listing for File dynamic_storage.hpp

Return to documentation for file (include/rclcpp/wait_set_policies/dynamic_storage.hpp)

// Copyright 2020 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef RCLCPP__WAIT_SET_POLICIES__DYNAMIC_STORAGE_HPP_
#define RCLCPP__WAIT_SET_POLICIES__DYNAMIC_STORAGE_HPP_

#include <algorithm>
#include <memory>
#include <utility>
#include <vector>

#include "rclcpp/client.hpp"
#include "rclcpp/guard_condition.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/service.hpp"
#include "rclcpp/subscription_base.hpp"
#include "rclcpp/subscription_wait_set_mask.hpp"
#include "rclcpp/timer.hpp"
#include "rclcpp/visibility_control.hpp"
#include "rclcpp/wait_set_policies/detail/storage_policy_common.hpp"
#include "rclcpp/waitable.hpp"

namespace rclcpp
{
namespace wait_set_policies
{

class DynamicStorage : public rclcpp::wait_set_policies::detail::StoragePolicyCommon<false>
{
protected:
  using is_mutable = std::true_type;

  class SubscriptionEntry
  {
    // (wjwwood): indent of 'public:' is weird, I know. uncrustify is dumb.

public:
    std::shared_ptr<rclcpp::SubscriptionBase> subscription;
    rclcpp::SubscriptionWaitSetMask mask;

    SubscriptionEntry(
      std::shared_ptr<rclcpp::SubscriptionBase> subscription_in = nullptr,
      const rclcpp::SubscriptionWaitSetMask & mask_in = {})
    : subscription(std::move(subscription_in)),
      mask(mask_in)
    {}

    void
    reset() noexcept
    {
      subscription.reset();
    }
  };
  class WeakSubscriptionEntry
  {
public:
    std::weak_ptr<rclcpp::SubscriptionBase> subscription;
    rclcpp::SubscriptionWaitSetMask mask;

    explicit WeakSubscriptionEntry(
      const std::shared_ptr<rclcpp::SubscriptionBase> & subscription_in,
      const rclcpp::SubscriptionWaitSetMask & mask_in) noexcept
    : subscription(subscription_in),
      mask(mask_in)
    {}

    explicit WeakSubscriptionEntry(const SubscriptionEntry & other)
    : subscription(other.subscription),
      mask(other.mask)
    {}

    std::shared_ptr<rclcpp::SubscriptionBase>
    lock() const
    {
      return subscription.lock();
    }

    bool
    expired() const noexcept
    {
      return subscription.expired();
    }
  };
  using SequenceOfWeakSubscriptions = std::vector<WeakSubscriptionEntry>;
  using SubscriptionsIterable = std::vector<SubscriptionEntry>;

  using SequenceOfWeakGuardConditions = std::vector<std::weak_ptr<rclcpp::GuardCondition>>;
  using GuardConditionsIterable = std::vector<std::shared_ptr<rclcpp::GuardCondition>>;

  using SequenceOfWeakTimers = std::vector<std::weak_ptr<rclcpp::TimerBase>>;
  using TimersIterable = std::vector<std::shared_ptr<rclcpp::TimerBase>>;

  using SequenceOfWeakClients = std::vector<std::weak_ptr<rclcpp::ClientBase>>;
  using ClientsIterable = std::vector<std::shared_ptr<rclcpp::ClientBase>>;

  using SequenceOfWeakServices = std::vector<std::weak_ptr<rclcpp::ServiceBase>>;
  using ServicesIterable = std::vector<std::shared_ptr<rclcpp::ServiceBase>>;

  class WaitableEntry
  {
public:
    std::shared_ptr<rclcpp::Waitable> waitable;
    std::shared_ptr<void> associated_entity;

    WaitableEntry(
      std::shared_ptr<rclcpp::Waitable> waitable_in = nullptr,
      std::shared_ptr<void> associated_entity_in = nullptr) noexcept
    : waitable(std::move(waitable_in)),
      associated_entity(std::move(associated_entity_in))
    {}

    void
    reset() noexcept
    {
      waitable.reset();
      associated_entity.reset();
    }
  };
  class WeakWaitableEntry
  {
public:
    std::weak_ptr<rclcpp::Waitable> waitable;
    std::weak_ptr<void> associated_entity;

    explicit WeakWaitableEntry(
      const std::shared_ptr<rclcpp::Waitable> & waitable_in,
      const std::shared_ptr<void> & associated_entity_in) noexcept
    : waitable(waitable_in),
      associated_entity(associated_entity_in)
    {}

    explicit WeakWaitableEntry(const WaitableEntry & other)
    : waitable(other.waitable),
      associated_entity(other.associated_entity)
    {}

    std::shared_ptr<rclcpp::Waitable>
    lock() const
    {
      return waitable.lock();
    }

    bool
    expired() const noexcept
    {
      return waitable.expired();
    }
  };
  using SequenceOfWeakWaitables = std::vector<WeakWaitableEntry>;
  using WaitablesIterable = std::vector<WaitableEntry>;

  template<class ArrayOfExtraGuardConditions>
  explicit
  DynamicStorage(
    const SubscriptionsIterable & subscriptions,
    const GuardConditionsIterable & guard_conditions,
    const ArrayOfExtraGuardConditions & extra_guard_conditions,
    const TimersIterable & timers,
    const ClientsIterable & clients,
    const ServicesIterable & services,
    const WaitablesIterable & waitables,
    rclcpp::Context::SharedPtr context
  )
  : StoragePolicyCommon(
      subscriptions,
      guard_conditions,
      extra_guard_conditions,
      timers,
      clients,
      services,
      waitables,
      context),
    subscriptions_(subscriptions.cbegin(), subscriptions.cend()),
    shared_subscriptions_(subscriptions_.size()),
    guard_conditions_(guard_conditions.cbegin(), guard_conditions.cend()),
    shared_guard_conditions_(guard_conditions_.size()),
    timers_(timers.cbegin(), timers.cend()),
    shared_timers_(timers_.size()),
    clients_(clients.cbegin(), clients.cend()),
    shared_clients_(clients_.size()),
    services_(services.cbegin(), services.cend()),
    shared_services_(services_.size()),
    waitables_(waitables.cbegin(), waitables.cend()),
    shared_waitables_(waitables_.size())
  {}

  ~DynamicStorage() = default;

  template<class ArrayOfExtraGuardConditions>
  void
  storage_rebuild_rcl_wait_set(const ArrayOfExtraGuardConditions & extra_guard_conditions)
  {
    this->storage_rebuild_rcl_wait_set_with_sets(
      subscriptions_,
      guard_conditions_,
      extra_guard_conditions,
      timers_,
      clients_,
      services_,
      waitables_
    );
  }

  template<class EntityT, class SequenceOfEntitiesT>
  static
  bool
  storage_has_entity(const EntityT & entity, const SequenceOfEntitiesT & entities)
  {
    return std::any_of(
      entities.cbegin(),
      entities.cend(),
      [&entity](const auto & inner) {return &entity == inner.lock().get();});
  }

  template<class EntityT, class SequenceOfEntitiesT>
  static
  auto
  storage_find_entity(const EntityT & entity, const SequenceOfEntitiesT & entities)
  {
    return std::find_if(
      entities.cbegin(),
      entities.cend(),
      [&entity](const auto & inner) {return &entity == inner.lock().get();});
  }

  void
  storage_add_subscription(std::shared_ptr<rclcpp::SubscriptionBase> && subscription)
  {
    if (this->storage_has_entity(*subscription, subscriptions_)) {
      throw std::runtime_error("subscription already in wait set");
    }
    WeakSubscriptionEntry weak_entry{std::move(subscription), {}};
    subscriptions_.push_back(std::move(weak_entry));
    this->storage_flag_for_resize();
  }

  void
  storage_remove_subscription(std::shared_ptr<rclcpp::SubscriptionBase> && subscription)
  {
    auto it = this->storage_find_entity(*subscription, subscriptions_);
    if (subscriptions_.cend() == it) {
      throw std::runtime_error("subscription not in wait set");
    }
    subscriptions_.erase(it);
    this->storage_flag_for_resize();
  }

  void
  storage_add_guard_condition(std::shared_ptr<rclcpp::GuardCondition> && guard_condition)
  {
    if (this->storage_has_entity(*guard_condition, guard_conditions_)) {
      throw std::runtime_error("guard_condition already in wait set");
    }
    guard_conditions_.push_back(std::move(guard_condition));
    this->storage_flag_for_resize();
  }

  void
  storage_remove_guard_condition(std::shared_ptr<rclcpp::GuardCondition> && guard_condition)
  {
    auto it = this->storage_find_entity(*guard_condition, guard_conditions_);
    if (guard_conditions_.cend() == it) {
      throw std::runtime_error("guard_condition not in wait set");
    }
    guard_conditions_.erase(it);
    this->storage_flag_for_resize();
  }

  void
  storage_add_timer(std::shared_ptr<rclcpp::TimerBase> && timer)
  {
    if (this->storage_has_entity(*timer, timers_)) {
      throw std::runtime_error("timer already in wait set");
    }
    timers_.push_back(std::move(timer));
    this->storage_flag_for_resize();
  }

  void
  storage_remove_timer(std::shared_ptr<rclcpp::TimerBase> && timer)
  {
    auto it = this->storage_find_entity(*timer, timers_);
    if (timers_.cend() == it) {
      throw std::runtime_error("timer not in wait set");
    }
    timers_.erase(it);
    this->storage_flag_for_resize();
  }

  void
  storage_add_client(std::shared_ptr<rclcpp::ClientBase> && client)
  {
    if (this->storage_has_entity(*client, clients_)) {
      throw std::runtime_error("client already in wait set");
    }
    clients_.push_back(std::move(client));
    this->storage_flag_for_resize();
  }

  void
  storage_remove_client(std::shared_ptr<rclcpp::ClientBase> && client)
  {
    auto it = this->storage_find_entity(*client, clients_);
    if (clients_.cend() == it) {
      throw std::runtime_error("client not in wait set");
    }
    clients_.erase(it);
    this->storage_flag_for_resize();
  }

  void
  storage_add_service(std::shared_ptr<rclcpp::ServiceBase> && service)
  {
    if (this->storage_has_entity(*service, services_)) {
      throw std::runtime_error("service already in wait set");
    }
    services_.push_back(std::move(service));
    this->storage_flag_for_resize();
  }

  void
  storage_remove_service(std::shared_ptr<rclcpp::ServiceBase> && service)
  {
    auto it = this->storage_find_entity(*service, services_);
    if (services_.cend() == it) {
      throw std::runtime_error("service not in wait set");
    }
    services_.erase(it);
    this->storage_flag_for_resize();
  }

  void
  storage_add_waitable(
    std::shared_ptr<rclcpp::Waitable> && waitable,
    std::shared_ptr<void> && associated_entity)
  {
    if (this->storage_has_entity(*waitable, waitables_)) {
      throw std::runtime_error("waitable already in wait set");
    }
    WeakWaitableEntry weak_entry(std::move(waitable), std::move(associated_entity));
    waitables_.push_back(std::move(weak_entry));
    this->storage_flag_for_resize();
  }

  void
  storage_remove_waitable(std::shared_ptr<rclcpp::Waitable> && waitable)
  {
    auto it = this->storage_find_entity(*waitable, waitables_);
    if (waitables_.cend() == it) {
      throw std::runtime_error("waitable not in wait set");
    }
    waitables_.erase(it);
    this->storage_flag_for_resize();
  }

  // this is noexcept because:
  //   - std::weak_ptr::expired is noexcept
  //   - the erase-remove idiom is noexcept, since we're not using the ExecutionPolicy version
  //   - std::vector::erase is noexcept if the assignment operator of T is also
  //     - and, the operator= for std::weak_ptr is noexcept
  void
  storage_prune_deleted_entities() noexcept
  {
    // reusable (templated) lambda for removal predicate
    auto p =
      [](const auto & weak_ptr) {
        // remove entries which have expired
        return weak_ptr.expired();
      };
    // remove guard conditions which have been deleted
    guard_conditions_.erase(
      std::remove_if(guard_conditions_.begin(), guard_conditions_.end(), p),
      guard_conditions_.end());
    timers_.erase(std::remove_if(timers_.begin(), timers_.end(), p), timers_.end());
    clients_.erase(std::remove_if(clients_.begin(), clients_.end(), p), clients_.end());
    services_.erase(std::remove_if(services_.begin(), services_.end(), p), services_.end());
    waitables_.erase(std::remove_if(waitables_.begin(), waitables_.end(), p), waitables_.end());
  }

  void
  storage_acquire_ownerships()
  {
    if (++ownership_reference_counter_ > 1) {
      // Avoid redundant locking.
      return;
    }
    // Setup common locking function.
    auto lock_all = [](const auto & weak_ptrs, auto & shared_ptrs) {
        shared_ptrs.resize(weak_ptrs.size());
        size_t index = 0;
        for (const auto & weak_ptr : weak_ptrs) {
          shared_ptrs[index++] = weak_ptr.lock();
        }
      };
    // Lock all the weak pointers and hold them until released.
    lock_all(guard_conditions_, shared_guard_conditions_);
    lock_all(timers_, shared_timers_);
    lock_all(clients_, shared_clients_);
    lock_all(services_, shared_services_);

    // We need a specialized version of this for waitables.
    auto lock_all_waitables = [](const auto & weak_ptrs, auto & shared_ptrs) {
        shared_ptrs.resize(weak_ptrs.size());
        size_t index = 0;
        for (const auto & weak_ptr : weak_ptrs) {
          shared_ptrs[index++] = WaitableEntry{
            weak_ptr.waitable.lock(),
            weak_ptr.associated_entity.lock()};
        }
      };
    lock_all_waitables(waitables_, shared_waitables_);
  }

  void
  storage_release_ownerships()
  {
    if (--ownership_reference_counter_ > 0) {
      // Avoid releasing ownership until reference count is 0.
      return;
    }
    // "Unlock" all shared pointers by resetting them.
    auto reset_all = [](auto & shared_ptrs) {
        for (auto & shared_ptr : shared_ptrs) {
          shared_ptr.reset();
        }
      };
    reset_all(shared_guard_conditions_);
    reset_all(shared_timers_);
    reset_all(shared_clients_);
    reset_all(shared_services_);
    reset_all(shared_waitables_);
  }

  size_t ownership_reference_counter_ = 0;

  SequenceOfWeakSubscriptions subscriptions_;
  SubscriptionsIterable shared_subscriptions_;

  SequenceOfWeakGuardConditions guard_conditions_;
  GuardConditionsIterable shared_guard_conditions_;

  SequenceOfWeakTimers timers_;
  TimersIterable shared_timers_;

  SequenceOfWeakClients clients_;
  ClientsIterable shared_clients_;

  SequenceOfWeakServices services_;
  ServicesIterable shared_services_;

  SequenceOfWeakWaitables waitables_;
  WaitablesIterable shared_waitables_;
};

}  // namespace wait_set_policies
}  // namespace rclcpp

#endif  // RCLCPP__WAIT_SET_POLICIES__DYNAMIC_STORAGE_HPP_