Program Listing for File rmw_wait.hpp

Return to documentation for file (/tmp/ws/src/rmw_gurumdds/rmw_gurumdds_shared_cpp/include/rmw_gurumdds_shared_cpp/rmw_wait.hpp)

// Copyright 2019 GurumNetworks, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef RMW_GURUMDDS_SHARED_CPP__RMW_WAIT_HPP_
#define RMW_GURUMDDS_SHARED_CPP__RMW_WAIT_HPP_

#include <chrono>
#include <unordered_map>
#include <unordered_set>
#include <utility>

#include "rmw/allocators.h"
#include "rmw/error_handling.h"
#include "rmw/rmw.h"
#include "rmw/impl/cpp/macros.hpp"

#include "rmw_gurumdds_shared_cpp/rmw_common.hpp"
#include "rmw_gurumdds_shared_cpp/types.hpp"
#include "rmw_gurumdds_shared_cpp/dds_include.hpp"
#include "rmw_gurumdds_shared_cpp/event_converter.hpp"

#define CHECK_ATTACH(ret) \
  if (ret == dds_RETCODE_OK) { \
    continue; \
  } else if (ret == dds_RETCODE_OUT_OF_RESOURCES) { \
    RMW_SET_ERROR_MSG("failed to attach condition to wait set: out of resources"); \
    return RMW_RET_ERROR; \
  } else if (ret == dds_RETCODE_BAD_PARAMETER) { \
    RMW_SET_ERROR_MSG("failed to attach condition to wait set: condition pointer was invalid"); \
    return RMW_RET_ERROR; \
  } else { \
    RMW_SET_ERROR_MSG("failed to attach condition to wait set"); \
    return RMW_RET_ERROR; \
  }

rmw_ret_t
__gather_event_conditions(
  rmw_events_t * events,
  std::unordered_set<dds_StatusCondition *> & status_conditions)
{
  RMW_CHECK_ARGUMENT_FOR_NULL(events, RMW_RET_INVALID_ARGUMENT);
  std::unordered_map<dds_StatusCondition *, dds_StatusMask> status_map;

  for (size_t i = 0; i < events->event_count; i++) {
    auto now = static_cast<rmw_event_t *>(events->events[i]);
    RMW_CHECK_ARGUMENT_FOR_NULL(events, RMW_RET_INVALID_ARGUMENT);

    auto event_info = static_cast<GurumddsEventInfo *>(now->data);
    if (event_info == nullptr) {
      RMW_SET_ERROR_MSG("event handle is null");
      return RMW_RET_ERROR;
    }

    dds_StatusCondition * status_condition = event_info->get_statuscondition();
    if (status_condition == nullptr) {
      RMW_SET_ERROR_MSG("failed to get status condition");
      return RMW_RET_ERROR;
    }

    if (is_event_supported(now->event_type)) {
      auto map_pair = status_map.insert(std::make_pair(status_condition, 0));
      auto it = map_pair.first;
      status_map[status_condition] = get_status_kind_from_rmw(now->event_type) | it->second;
    } else {
      RMW_SET_ERROR_MSG_WITH_FORMAT_STRING("unsupported event: %d", now->event_type);
    }
  }

  for (auto & map_pair : status_map) {
    dds_StatusCondition_set_enabled_statuses(map_pair.first, map_pair.second);
    status_conditions.insert(map_pair.first);
  }

  return RMW_RET_OK;
}

rmw_ret_t
__handle_active_event_conditions(rmw_events_t * events)
{
  if (events == nullptr) {
    return RMW_RET_OK;
  }

  for (size_t i = 0; i < events->event_count; i++) {
    auto now = static_cast<rmw_event_t *>(events->events[i]);
    RMW_CHECK_ARGUMENT_FOR_NULL(events, RMW_RET_INVALID_ARGUMENT);

    auto event_info = static_cast<GurumddsEventInfo *>(now->data);
    if (event_info == nullptr) {
      RMW_SET_ERROR_MSG("event handle is null");
      return RMW_RET_ERROR;
    }

    dds_StatusMask mask = event_info->get_status_changes();
    bool is_active = false;

    if (is_event_supported(now->event_type)) {
      is_active = ((mask & get_status_kind_from_rmw(now->event_type)) != 0);
    }

    if (!is_active) {
      events->events[i] = nullptr;
    }
  }

  return RMW_RET_OK;
}

rmw_ret_t __detach_condition(
  dds_WaitSet * dds_wait_set,
  dds_Condition * condition)
{
  dds_ReturnCode_t dds_return_code = dds_WaitSet_detach_condition(dds_wait_set, condition);
  rmw_ret_t from_dds = check_dds_ret_code(dds_return_code);
  if (from_dds != RMW_RET_OK) {
    RMW_SET_ERROR_MSG("failed to detach condition from wait set");
    return from_dds;
  }

  return RMW_RET_OK;
}

template<typename SubscriberInfo, typename ServiceInfo, typename ClientInfo>
rmw_ret_t
shared__rmw_wait(
  const char * implementation_identifier,
  rmw_subscriptions_t * subscriptions,
  rmw_guard_conditions_t * guard_conditions,
  rmw_services_t * services,
  rmw_clients_t * clients,
  rmw_events_t * events,
  rmw_wait_set_t * wait_set,
  const rmw_time_t * wait_timeout)
{
  (void)events;
  struct atexit_t
  {
    ~atexit_t()
    {
      if (wait_set == nullptr) {
        RMW_SET_ERROR_MSG("wait set handle is null");
        return;
      }

      RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
        wait set handle, wait_set->implementation_identifier,
        implementation_identifier, return )

      GurumddsWaitSetInfo * wait_set_info = static_cast<GurumddsWaitSetInfo *>(wait_set->data);
      if (wait_set_info == nullptr) {
        RMW_SET_ERROR_MSG("WaitSet implementation struct is null");
        return;
      }

      dds_WaitSet * dds_wait_set = static_cast<dds_WaitSet *>(wait_set_info->wait_set);
      if (dds_wait_set == nullptr) {
        RMW_SET_ERROR_MSG("DDS wait set handle is null");
        return;
      }

      dds_ConditionSeq * attached_conditions =
        static_cast<dds_ConditionSeq *>(wait_set_info->attached_conditions);
      if (attached_conditions == nullptr) {
        RMW_SET_ERROR_MSG("DDS condition sequence handle is null");
        return;
      }

      dds_ReturnCode_t ret = dds_WaitSet_get_conditions(dds_wait_set, attached_conditions);
      if (ret != dds_RETCODE_OK) {
        RMW_SET_ERROR_MSG("failed to get attached conditions for wait set");
        return;
      }

      for (uint32_t i = 0; i < dds_ConditionSeq_length(attached_conditions); ++i) {
        ret = dds_WaitSet_detach_condition(
          dds_wait_set, dds_ConditionSeq_get(attached_conditions, i));
        if (ret != dds_RETCODE_OK) {
          RMW_SET_ERROR_MSG("failed to detach condition from wait set");
        }
      }

      while (dds_ConditionSeq_length(attached_conditions) > 0) {
        dds_ConditionSeq_remove(attached_conditions, 0);
      }
    }
    rmw_wait_set_t * wait_set = nullptr;
    const char * implementation_identifier = nullptr;
  } atexit;

  atexit.wait_set = wait_set;
  atexit.implementation_identifier = implementation_identifier;

  RMW_CHECK_ARGUMENT_FOR_NULL(wait_set, RMW_RET_INVALID_ARGUMENT);
  RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
    wait set handle, wait_set->implementation_identifier,
    implementation_identifier, return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);

  GurumddsWaitSetInfo * wait_set_info = static_cast<GurumddsWaitSetInfo *>(wait_set->data);
  if (wait_set_info == nullptr) {
    RMW_SET_ERROR_MSG("WaitSet implementation struct is null");
    return RMW_RET_ERROR;
  }

  dds_WaitSet * dds_wait_set = static_cast<dds_WaitSet *>(wait_set_info->wait_set);
  if (dds_wait_set == nullptr) {
    RMW_SET_ERROR_MSG("DDS wait set handle is null");
    return RMW_RET_ERROR;
  }

  dds_ConditionSeq * active_conditions =
    static_cast<dds_ConditionSeq *>(wait_set_info->active_conditions);
  if (active_conditions == nullptr) {
    RMW_SET_ERROR_MSG("DDS condition sequence handle is null");
    return RMW_RET_ERROR;
  }

  if (subscriptions != nullptr) {
    for (size_t i = 0; i < subscriptions->subscriber_count; ++i) {
      SubscriberInfo * subscriber_info =
        static_cast<SubscriberInfo *>(subscriptions->subscribers[i]);
      if (subscriber_info == nullptr) {
        RMW_SET_ERROR_MSG("subscriber info handle is null");
        return RMW_RET_ERROR;
      }

      dds_ReadCondition * read_condition = subscriber_info->read_condition;
      if (read_condition == nullptr) {
        RMW_SET_ERROR_MSG("read condition handle is null");
        return RMW_RET_ERROR;
      }

      dds_ReturnCode_t ret = dds_WaitSet_attach_condition(
        dds_wait_set, reinterpret_cast<dds_Condition *>(read_condition));
      CHECK_ATTACH(ret);
    }
  }

  std::unordered_set<dds_StatusCondition *> status_conditions;

  rmw_ret_t ret_code = __gather_event_conditions(events, status_conditions);
  if (ret_code != RMW_RET_OK) {
    return ret_code;
  }

  for (auto status_condition : status_conditions) {
    dds_ReturnCode_t ret = dds_WaitSet_attach_condition(
      dds_wait_set,
      reinterpret_cast<dds_Condition *>(status_condition));
    CHECK_ATTACH(ret);
  }

  if (guard_conditions != nullptr) {
    for (size_t i = 0; i < guard_conditions->guard_condition_count; ++i) {
      dds_GuardCondition * guard_condition =
        static_cast<dds_GuardCondition *>(guard_conditions->guard_conditions[i]);
      if (guard_condition == nullptr) {
        RMW_SET_ERROR_MSG("guard condition handle is null");
        return RMW_RET_ERROR;
      }

      dds_ReturnCode_t ret = dds_WaitSet_attach_condition(
        dds_wait_set, reinterpret_cast<dds_Condition *>(guard_condition));
      CHECK_ATTACH(ret);
    }
  }

  if (services != nullptr) {
    for (size_t i = 0; i < services->service_count; ++i) {
      ServiceInfo * service_info = static_cast<ServiceInfo *>(services->services[i]);
      if (service_info == nullptr) {
        RMW_SET_ERROR_MSG("service info handle is null");
        return RMW_RET_ERROR;
      }

      dds_ReadCondition * read_condition = service_info->read_condition;
      if (read_condition == nullptr) {
        RMW_SET_ERROR_MSG("read condition handle is null");
        return RMW_RET_ERROR;
      }

      dds_ReturnCode_t ret = dds_WaitSet_attach_condition(
        dds_wait_set, reinterpret_cast<dds_Condition *>(read_condition));
      CHECK_ATTACH(ret);
    }
  }

  if (clients != nullptr) {
    for (size_t i = 0; i < clients->client_count; ++i) {
      ClientInfo * client_info = static_cast<ClientInfo *>(clients->clients[i]);
      if (client_info == nullptr) {
        RMW_SET_ERROR_MSG("client info handle is null");
        return RMW_RET_ERROR;
      }

      dds_ReadCondition * read_condition = client_info->read_condition;
      if (read_condition == nullptr) {
        RMW_SET_ERROR_MSG("read condition handle is null");
        return RMW_RET_ERROR;
      }

      dds_ReturnCode_t ret = dds_WaitSet_attach_condition(
        dds_wait_set, reinterpret_cast<dds_Condition *>(read_condition));
      CHECK_ATTACH(ret);
    }
  }

  rmw_ret_t rret = RMW_RET_OK;

  const char * env_name = "RMW_GURUMDDS_WAIT_USE_POLLING";
  char * env_value = nullptr;
  bool use_polling = false;

  env_value = getenv(env_name);
  if (env_value != nullptr) {
    use_polling = (strcmp(env_value, "1") == 0);
  }

  if (!use_polling) {  // Default: use dds_WaitSet_wait()
    dds_Duration_t timeout;
    if (wait_timeout == nullptr) {
      timeout.sec = dds_DURATION_INFINITE_SEC;
      timeout.nanosec = dds_DURATION_ZERO_NSEC;
    } else {
      timeout.sec = static_cast<int32_t>(wait_timeout->sec);
      timeout.nanosec = static_cast<uint32_t>(wait_timeout->nsec);
    }

    dds_ReturnCode_t status = dds_WaitSet_wait(dds_wait_set, active_conditions, &timeout);
    if (status != dds_RETCODE_OK && status != dds_RETCODE_TIMEOUT) {
      RMW_SET_ERROR_MSG("failed to wait on wait set");
      return RMW_RET_ERROR;
    }

    if (status == dds_RETCODE_TIMEOUT) {
      rret = RMW_RET_TIMEOUT;
    }
  } else {  // use polilng
    uint64_t sec, nsec;
    bool inf = false;
    if (wait_timeout != nullptr) {
      sec = wait_timeout->sec;
      nsec = wait_timeout->nsec;
      inf = false;
    } else {
      sec = 0;
      nsec = 0;
      inf = true;
    }
    auto t = std::chrono::steady_clock::now() +
      std::chrono::nanoseconds(sec * 1000000000ULL + nsec);
    bool triggered = false;

    while (dds_ConditionSeq_length(active_conditions) > 0) {
      dds_ConditionSeq_remove(active_conditions, 0);
    }

    dds_ConditionSeq * conds = dds_ConditionSeq_create(8);
    dds_WaitSet_get_conditions(dds_wait_set, conds);

    for (uint32_t i = 0; i < dds_ConditionSeq_length(conds); ++i) {
      dds_Condition * cond = dds_ConditionSeq_get(conds, i);
      if (cond == NULL) {
        continue;
      }

      if (dds_Condition_get_trigger_value(cond) == true) {
        dds_ConditionSeq_add(active_conditions, cond);
        triggered = true;
      }
    }

    for (uint32_t i = 0; (inf || std::chrono::steady_clock::now() <= t) && !triggered; ++i) {
      if (i >= dds_ConditionSeq_length(conds)) {
        i = 0;
      }

      dds_Condition * cond = dds_ConditionSeq_get(conds, i);
      if (cond == NULL) {
        continue;
      }

      if (dds_Condition_get_trigger_value(cond) == true) {
        dds_ConditionSeq_add(active_conditions, cond);
        triggered = true;
      }
    }
    dds_ConditionSeq_delete(conds);

    if (!triggered) {
      rret = RMW_RET_TIMEOUT;
    }
  }

  if (subscriptions != nullptr) {
    for (size_t i = 0; i < subscriptions->subscriber_count; ++i) {
      SubscriberInfo * subscriber_info =
        static_cast<SubscriberInfo *>(subscriptions->subscribers[i]);
      if (subscriber_info == nullptr) {
        RMW_SET_ERROR_MSG("subscriber info handle is null");
        return RMW_RET_ERROR;
      }

      dds_ReadCondition * read_condition = subscriber_info->read_condition;
      if (!read_condition) {
        RMW_SET_ERROR_MSG("read condition handle is null");
        return RMW_RET_ERROR;
      }

      uint32_t j = 0;
      for (; j < dds_ConditionSeq_length(active_conditions); ++j) {
        if (
          dds_ConditionSeq_get(active_conditions, j) ==
          reinterpret_cast<dds_Condition *>(read_condition))
        {
          break;
        }
      }

      if (j >= dds_ConditionSeq_length(active_conditions)) {
        subscriptions->subscribers[i] = 0;
      }

      rmw_ret_t rmw_ret_code = __detach_condition(
        dds_wait_set, reinterpret_cast<dds_Condition *>(read_condition));
      if (rmw_ret_code != RMW_RET_OK) {
        return rmw_ret_code;
      }
    }
  }

  if (guard_conditions != nullptr) {
    for (size_t i = 0; i < guard_conditions->guard_condition_count; ++i) {
      dds_Condition * condition =
        static_cast<dds_Condition *>(guard_conditions->guard_conditions[i]);
      if (condition == nullptr) {
        RMW_SET_ERROR_MSG("condition handle is null");
        return RMW_RET_ERROR;
      }

      uint32_t j = 0;
      for (; j < dds_ConditionSeq_length(active_conditions); ++j) {
        if (dds_ConditionSeq_get(active_conditions, j) == condition) {
          dds_GuardCondition * guard = reinterpret_cast<dds_GuardCondition *>(condition);
          dds_ReturnCode_t ret = dds_GuardCondition_set_trigger_value(guard, false);
          if (ret != dds_RETCODE_OK) {
            RMW_SET_ERROR_MSG("failed to set trigger value");
            return RMW_RET_ERROR;
          }
          break;
        }
      }

      if (j >= dds_ConditionSeq_length(active_conditions)) {
        guard_conditions->guard_conditions[i] = 0;
      }

      rmw_ret_t rmw_ret_code = __detach_condition(dds_wait_set, condition);
      if (rmw_ret_code != RMW_RET_OK) {
        return rmw_ret_code;
      }
    }
  }

  if (services != nullptr) {
    for (size_t i = 0; i < services->service_count; ++i) {
      ServiceInfo * service_info = static_cast<ServiceInfo *>(services->services[i]);
      if (service_info == nullptr) {
        RMW_SET_ERROR_MSG("service info handle is null");
        return RMW_RET_ERROR;
      }

      dds_ReadCondition * read_condition = service_info->read_condition;
      if (read_condition == nullptr) {
        RMW_SET_ERROR_MSG("read condition handle is null");
        return RMW_RET_ERROR;
      }

      uint32_t j = 0;
      for (; j < dds_ConditionSeq_length(active_conditions); ++j) {
        if (
          dds_ConditionSeq_get(active_conditions, j) ==
          reinterpret_cast<dds_Condition *>(read_condition))
        {
          break;
        }
      }

      if (j >= dds_ConditionSeq_length(active_conditions)) {
        services->services[i] = 0;
      }

      rmw_ret_t rmw_ret_code = __detach_condition(
        dds_wait_set, reinterpret_cast<dds_Condition *>(read_condition));
      if (rmw_ret_code != RMW_RET_OK) {
        return rmw_ret_code;
      }
    }
  }

  if (clients != nullptr) {
    for (size_t i = 0; i < clients->client_count; ++i) {
      ClientInfo * client_info = static_cast<ClientInfo *>(clients->clients[i]);
      if (client_info == nullptr) {
        RMW_SET_ERROR_MSG("client info handle is null");
        return RMW_RET_ERROR;
      }

      dds_ReadCondition * read_condition = client_info->read_condition;
      if (read_condition == nullptr) {
        RMW_SET_ERROR_MSG("read condition handle is null");
        return RMW_RET_ERROR;
      }

      uint32_t j = 0;
      for (; j < dds_ConditionSeq_length(active_conditions); ++j) {
        if (
          dds_ConditionSeq_get(active_conditions, j) ==
          reinterpret_cast<dds_Condition *>(read_condition))
        {
          break;
        }
      }

      if (j >= dds_ConditionSeq_length(active_conditions)) {
        clients->clients[i] = 0;
      }

      rmw_ret_t rmw_ret_code = __detach_condition(
        dds_wait_set, reinterpret_cast<dds_Condition *>(read_condition));
      if (rmw_ret_code != RMW_RET_OK) {
        return rmw_ret_code;
      }
    }
  }

  {
    rmw_ret_t rmw_ret_code = __handle_active_event_conditions(events);
    if (rmw_ret_code != RMW_RET_OK) {
      return rmw_ret_code;
    }
  }

  return rret;
}

#endif  // RMW_GURUMDDS_SHARED_CPP__RMW_WAIT_HPP_