Program Listing for File wait_set_template.hpp

Return to documentation for file (include/rclcpp/wait_set_template.hpp)

// Copyright 2020 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef RCLCPP__WAIT_SET_TEMPLATE_HPP_
#define RCLCPP__WAIT_SET_TEMPLATE_HPP_

#include <chrono>
#include <memory>
#include <utility>

#include "rcl/wait.h"
#include "rcpputils/scope_exit.hpp"

#include "rclcpp/client.hpp"
#include "rclcpp/context.hpp"
#include "rclcpp/contexts/default_context.hpp"
#include "rclcpp/guard_condition.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/service.hpp"
#include "rclcpp/subscription_base.hpp"
#include "rclcpp/subscription_wait_set_mask.hpp"
#include "rclcpp/timer.hpp"
#include "rclcpp/visibility_control.hpp"
#include "rclcpp/wait_result.hpp"
#include "rclcpp/waitable.hpp"

namespace rclcpp
{


template<class SynchronizationPolicy, class StoragePolicy>
class WaitSetTemplate final : private SynchronizationPolicy, private StoragePolicy
{
public:
  RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(WaitSetTemplate)

  using typename StoragePolicy::SubscriptionEntry;
  using typename StoragePolicy::WaitableEntry;


  explicit
  WaitSetTemplate(
    const typename StoragePolicy::SubscriptionsIterable & subscriptions = {},
    const typename StoragePolicy::GuardConditionsIterable & guard_conditions = {},
    const typename StoragePolicy::TimersIterable & timers = {},
    const typename StoragePolicy::ClientsIterable & clients = {},
    const typename StoragePolicy::ServicesIterable & services = {},
    const typename StoragePolicy::WaitablesIterable & waitables = {},
    rclcpp::Context::SharedPtr context =
    rclcpp::contexts::get_global_default_context())
  : SynchronizationPolicy(context),
    StoragePolicy(
      subscriptions,
      guard_conditions,
      // this method comes from the SynchronizationPolicy
      this->get_extra_guard_conditions(),
      timers,
      clients,
      services,
      waitables,
      context)
  {}


  const rcl_wait_set_t &
  get_rcl_wait_set() const
  {
    // this method comes from the StoragePolicy
    return this->storage_get_rcl_wait_set();
  }


  void
  add_subscription(
    std::shared_ptr<rclcpp::SubscriptionBase> subscription,
    rclcpp::SubscriptionWaitSetMask mask = {})
  {
    if (nullptr == subscription) {
      throw std::invalid_argument("subscription is nullptr");
    }
    // this method comes from the SynchronizationPolicy
    this->sync_add_subscription(
      std::move(subscription),
      mask,
      [this](
        std::shared_ptr<rclcpp::SubscriptionBase> && inner_subscription,
        const rclcpp::SubscriptionWaitSetMask & mask)
      {
        // These methods comes from the StoragePolicy, and may not exist for
        // fixed sized storage policies.
        // It will throw if the subscription has already been added.
        if (mask.include_subscription) {
          auto local_subscription = inner_subscription;
          bool already_in_use =
          local_subscription->exchange_in_use_by_wait_set_state(local_subscription.get(), true);
          if (already_in_use) {
            throw std::runtime_error("subscription already associated with a wait set");
          }
          this->storage_add_subscription(std::move(local_subscription));
        }
        if (mask.include_events) {
          for (auto key_event_pair : inner_subscription->get_event_handlers()) {
            auto event = key_event_pair.second;
            auto local_subscription = inner_subscription;
            bool already_in_use =
            local_subscription->exchange_in_use_by_wait_set_state(event.get(), true);
            if (already_in_use) {
              throw std::runtime_error("subscription event already associated with a wait set");
            }
            this->storage_add_waitable(std::move(event), std::move(local_subscription));
          }
        }
        if (mask.include_intra_process_waitable) {
          auto local_subscription = inner_subscription;
          auto waitable = inner_subscription->get_intra_process_waitable();
          if (nullptr != waitable) {
            bool already_in_use = local_subscription->exchange_in_use_by_wait_set_state(
              waitable.get(),
              true);
            if (already_in_use) {
              throw std::runtime_error(
                "subscription intra-process waitable already associated with a wait set");
            }
            this->storage_add_waitable(
              std::move(inner_subscription->get_intra_process_waitable()),
              std::move(local_subscription));
          }
        }
      });
  }


  void
  remove_subscription(
    std::shared_ptr<rclcpp::SubscriptionBase> subscription,
    rclcpp::SubscriptionWaitSetMask mask = {})
  {
    if (nullptr == subscription) {
      throw std::invalid_argument("subscription is nullptr");
    }
    // this method comes from the SynchronizationPolicy
    this->sync_remove_subscription(
      std::move(subscription),
      mask,
      [this](
        std::shared_ptr<rclcpp::SubscriptionBase> && inner_subscription,
        const rclcpp::SubscriptionWaitSetMask & mask)
      {
        // This method comes from the StoragePolicy, and it may not exist for
        // fixed sized storage policies.
        // It will throw if the subscription is not in the wait set.
        if (mask.include_subscription) {
          auto local_subscription = inner_subscription;
          local_subscription->exchange_in_use_by_wait_set_state(local_subscription.get(), false);
          this->storage_remove_subscription(std::move(local_subscription));
        }
        if (mask.include_events) {
          for (auto key_event_pair : inner_subscription->get_event_handlers()) {
            auto event = key_event_pair.second;
            auto local_subscription = inner_subscription;
            local_subscription->exchange_in_use_by_wait_set_state(event.get(), false);
            this->storage_remove_waitable(std::move(event));
          }
        }
        if (mask.include_intra_process_waitable) {
          auto local_waitable = inner_subscription->get_intra_process_waitable();
          if (nullptr != local_waitable) {
            // This is the case when intra process is enabled for the subscription.
            inner_subscription->exchange_in_use_by_wait_set_state(local_waitable.get(), false);
            this->storage_remove_waitable(std::move(local_waitable));
          }
        }
      });
  }


  void
  add_guard_condition(std::shared_ptr<rclcpp::GuardCondition> guard_condition)
  {
    if (nullptr == guard_condition) {
      throw std::invalid_argument("guard_condition is nullptr");
    }
    // this method comes from the SynchronizationPolicy
    this->sync_add_guard_condition(
      std::move(guard_condition),
      [this](std::shared_ptr<rclcpp::GuardCondition> && inner_guard_condition) {
        bool already_in_use = inner_guard_condition->exchange_in_use_by_wait_set_state(true);
        if (already_in_use) {
          throw std::runtime_error("guard condition already in use by another wait set");
        }
        // This method comes from the StoragePolicy, and it may not exist for
        // fixed sized storage policies.
        // It will throw if the guard condition has already been added.
        this->storage_add_guard_condition(std::move(inner_guard_condition));
      });
  }


  void
  remove_guard_condition(std::shared_ptr<rclcpp::GuardCondition> guard_condition)
  {
    if (nullptr == guard_condition) {
      throw std::invalid_argument("guard_condition is nullptr");
    }
    // this method comes from the SynchronizationPolicy
    this->sync_remove_guard_condition(
      std::move(guard_condition),
      [this](std::shared_ptr<rclcpp::GuardCondition> && inner_guard_condition) {
        inner_guard_condition->exchange_in_use_by_wait_set_state(false);
        // This method comes from the StoragePolicy, and it may not exist for
        // fixed sized storage policies.
        // It will throw if the guard condition is not in the wait set.
        this->storage_remove_guard_condition(std::move(inner_guard_condition));
      });
  }


  void
  add_timer(std::shared_ptr<rclcpp::TimerBase> timer)
  {
    if (nullptr == timer) {
      throw std::invalid_argument("timer is nullptr");
    }
    // this method comes from the SynchronizationPolicy
    this->sync_add_timer(
      std::move(timer),
      [this](std::shared_ptr<rclcpp::TimerBase> && inner_timer) {
        bool already_in_use = inner_timer->exchange_in_use_by_wait_set_state(true);
        if (already_in_use) {
          throw std::runtime_error("timer already in use by another wait set");
        }
        // This method comes from the StoragePolicy, and it may not exist for
        // fixed sized storage policies.
        // It will throw if the timer has already been added.
        this->storage_add_timer(std::move(inner_timer));
      });
  }


  void
  remove_timer(std::shared_ptr<rclcpp::TimerBase> timer)
  {
    if (nullptr == timer) {
      throw std::invalid_argument("timer is nullptr");
    }
    // this method comes from the SynchronizationPolicy
    this->sync_remove_timer(
      std::move(timer),
      [this](std::shared_ptr<rclcpp::TimerBase> && inner_timer) {
        inner_timer->exchange_in_use_by_wait_set_state(false);
        // This method comes from the StoragePolicy, and it may not exist for
        // fixed sized storage policies.
        // It will throw if the timer is not in the wait set.
        this->storage_remove_timer(std::move(inner_timer));
      });
  }


  void
  add_client(std::shared_ptr<rclcpp::ClientBase> client)
  {
    if (nullptr == client) {
      throw std::invalid_argument("client is nullptr");
    }
    // this method comes from the SynchronizationPolicy
    this->sync_add_client(
      std::move(client),
      [this](std::shared_ptr<rclcpp::ClientBase> && inner_client) {
        bool already_in_use = inner_client->exchange_in_use_by_wait_set_state(true);
        if (already_in_use) {
          throw std::runtime_error("client already in use by another wait set");
        }
        // This method comes from the StoragePolicy, and it may not exist for
        // fixed sized storage policies.
        // It will throw if the client has already been added.
        this->storage_add_client(std::move(inner_client));
      });
  }


  void
  remove_client(std::shared_ptr<rclcpp::ClientBase> client)
  {
    if (nullptr == client) {
      throw std::invalid_argument("client is nullptr");
    }
    // this method comes from the SynchronizationPolicy
    this->sync_remove_client(
      std::move(client),
      [this](std::shared_ptr<rclcpp::ClientBase> && inner_client) {
        inner_client->exchange_in_use_by_wait_set_state(false);
        // This method comes from the StoragePolicy, and it may not exist for
        // fixed sized storage policies.
        // It will throw if the client is not in the wait set.
        this->storage_remove_client(std::move(inner_client));
      });
  }


  void
  add_service(std::shared_ptr<rclcpp::ServiceBase> service)
  {
    if (nullptr == service) {
      throw std::invalid_argument("service is nullptr");
    }
    // this method comes from the SynchronizationPolicy
    this->sync_add_service(
      std::move(service),
      [this](std::shared_ptr<rclcpp::ServiceBase> && inner_service) {
        bool already_in_use = inner_service->exchange_in_use_by_wait_set_state(true);
        if (already_in_use) {
          throw std::runtime_error("service already in use by another wait set");
        }
        // This method comes from the StoragePolicy, and it may not exist for
        // fixed sized storage policies.
        // It will throw if the service has already been added.
        this->storage_add_service(std::move(inner_service));
      });
  }


  void
  remove_service(std::shared_ptr<rclcpp::ServiceBase> service)
  {
    if (nullptr == service) {
      throw std::invalid_argument("service is nullptr");
    }
    // this method comes from the SynchronizationPolicy
    this->sync_remove_service(
      std::move(service),
      [this](std::shared_ptr<rclcpp::ServiceBase> && inner_service) {
        inner_service->exchange_in_use_by_wait_set_state(false);
        // This method comes from the StoragePolicy, and it may not exist for
        // fixed sized storage policies.
        // It will throw if the service is not in the wait set.
        this->storage_remove_service(std::move(inner_service));
      });
  }


  void
  add_waitable(
    std::shared_ptr<rclcpp::Waitable> waitable,
    std::shared_ptr<void> associated_entity = nullptr)
  {
    if (nullptr == waitable) {
      throw std::invalid_argument("waitable is nullptr");
    }
    // this method comes from the SynchronizationPolicy
    this->sync_add_waitable(
      std::move(waitable),
      std::move(associated_entity),
      [this](
        std::shared_ptr<rclcpp::Waitable> && inner_waitable,
        std::shared_ptr<void> && associated_entity)
      {
        bool already_in_use = inner_waitable->exchange_in_use_by_wait_set_state(true);
        if (already_in_use) {
          throw std::runtime_error("waitable already in use by another wait set");
        }
        // This method comes from the StoragePolicy, and it may not exist for
        // fixed sized storage policies.
        // It will throw if the waitable has already been added.
        this->storage_add_waitable(std::move(inner_waitable), std::move(associated_entity));
      });
  }


  void
  remove_waitable(std::shared_ptr<rclcpp::Waitable> waitable)
  {
    if (nullptr == waitable) {
      throw std::invalid_argument("waitable is nullptr");
    }
    // this method comes from the SynchronizationPolicy
    this->sync_remove_waitable(
      std::move(waitable),
      [this](std::shared_ptr<rclcpp::Waitable> && inner_waitable) {
        inner_waitable->exchange_in_use_by_wait_set_state(false);
        // This method comes from the StoragePolicy, and it may not exist for
        // fixed sized storage policies.
        // It will throw if the waitable is not in the wait set.
        this->storage_remove_waitable(std::move(inner_waitable));
      });
  }


  void
  prune_deleted_entities()
  {
    // this method comes from the SynchronizationPolicy
    this->sync_prune_deleted_entities(
      [this]() {
        // This method comes from the StoragePolicy, and it may not exist for
        // fixed sized storage policies.
        this->storage_prune_deleted_entities();
      });
  }


  template<class Rep = int64_t, class Period = std::milli>
  RCUTILS_WARN_UNUSED
  WaitResult<WaitSetTemplate>
  wait(std::chrono::duration<Rep, Period> time_to_wait = std::chrono::duration<Rep, Period>(-1))
  {
    auto time_to_wait_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(time_to_wait);

    // ensure the ownership of the entities in the wait set is shared for the duration of wait
    this->storage_acquire_ownerships();
    RCPPUTILS_SCOPE_EXIT({this->storage_release_ownerships();});

    // this method comes from the SynchronizationPolicy
    return this->template sync_wait<WaitResult<WaitSetTemplate>>(
      // pass along the time_to_wait duration as nanoseconds
      time_to_wait_ns,
      // this method provides the ability to rebuild the wait set, if needed
      [this]() {
        // This method comes from the StoragePolicy
        this->storage_rebuild_rcl_wait_set(
          // This method comes from the SynchronizationPolicy
          this->get_extra_guard_conditions()
        );
      },
      // this method provides access to the rcl wait set
      [this]() -> rcl_wait_set_t & {
        // This method comes from the StoragePolicy
        return this->storage_get_rcl_wait_set();
      },
      // this method provides a way to create the WaitResult
      [this](WaitResultKind wait_result_kind) -> WaitResult<WaitSetTemplate> {
        // convert the result into a WaitResult
        switch (wait_result_kind) {
          case WaitResultKind::Ready:
            return WaitResult<WaitSetTemplate>::from_ready_wait_result_kind(*this);
          case WaitResultKind::Timeout:
            return WaitResult<WaitSetTemplate>::from_timeout_wait_result_kind();
          case WaitResultKind::Empty:
            return WaitResult<WaitSetTemplate>::from_empty_wait_result_kind();
          default:
            auto msg = "unknown WaitResultKind with value: " + std::to_string(wait_result_kind);
            throw std::runtime_error(msg);
        }
      }
    );
  }

private:
  // Add WaitResult type as a friend so it can call private methods for
  // acquiring and releasing resources as the WaitResult is initialized and
  // destructed, respectively.
  friend WaitResult<WaitSetTemplate>;


  void
  wait_result_acquire()
  {
    if (wait_result_holding_) {
      throw std::runtime_error("wait_result_acquire() called while already holding");
    }
    wait_result_holding_ = true;
    // this method comes from the SynchronizationPolicy
    this->sync_wait_result_acquire();
    // this method comes from the StoragePolicy
    this->storage_acquire_ownerships();
  }


  void
  wait_result_release()
  {
    if (!wait_result_holding_) {
      throw std::runtime_error("wait_result_release() called while not holding");
    }
    wait_result_holding_ = false;
    // this method comes from the StoragePolicy
    this->storage_release_ownerships();
    // this method comes from the SynchronizationPolicy
    this->sync_wait_result_release();
  }

  bool wait_result_holding_ = false;
};

}  // namespace rclcpp

#endif  // RCLCPP__WAIT_SET_TEMPLATE_HPP_