.. _program_listing_file_include_rmw_fastrtps_shared_cpp_custom_client_info.hpp: Program Listing for File custom_client_info.hpp =============================================== |exhale_lsh| :ref:`Return to documentation for file ` (``include/rmw_fastrtps_shared_cpp/custom_client_info.hpp``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp // Copyright 2016-2018 Proyectos y Sistemas de Mantenimiento SL (eProsima). // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #ifndef RMW_FASTRTPS_SHARED_CPP__CUSTOM_CLIENT_INFO_HPP_ #define RMW_FASTRTPS_SHARED_CPP__CUSTOM_CLIENT_INFO_HPP_ #include #include #include #include #include #include "fastcdr/FastBuffer.h" #include "fastdds/dds/core/status/PublicationMatchedStatus.hpp" #include "fastdds/dds/core/status/SubscriptionMatchedStatus.hpp" #include "fastdds/dds/publisher/DataWriter.hpp" #include "fastdds/dds/publisher/DataWriterListener.hpp" #include "fastdds/dds/subscriber/DataReader.hpp" #include "fastdds/dds/subscriber/DataReaderListener.hpp" #include "fastdds/dds/subscriber/SampleInfo.hpp" #include "fastdds/dds/subscriber/qos/DataReaderQos.hpp" #include "fastdds/dds/topic/TypeSupport.hpp" #include "fastdds/rtps/common/Guid.h" #include "fastdds/rtps/common/InstanceHandle.h" #include "fastdds/rtps/common/SampleIdentity.h" #include "rcpputils/thread_safety_annotations.hpp" #include "rmw/event_callback_type.h" #include "rmw_fastrtps_shared_cpp/TypeSupport.hpp" class ClientListener; class ClientPubListener; typedef struct CustomClientInfo { eprosima::fastdds::dds::TypeSupport request_type_support_{nullptr}; const void * request_type_support_impl_{nullptr}; eprosima::fastdds::dds::TypeSupport response_type_support_{nullptr}; const void * response_type_support_impl_{nullptr}; eprosima::fastdds::dds::DataReader * response_reader_{nullptr}; eprosima::fastdds::dds::DataWriter * request_writer_{nullptr}; std::string request_topic_; std::string response_topic_; ClientListener * listener_{nullptr}; eprosima::fastrtps::rtps::GUID_t writer_guid_; eprosima::fastrtps::rtps::GUID_t reader_guid_; const char * typesupport_identifier_{nullptr}; ClientPubListener * pub_listener_{nullptr}; std::atomic_size_t response_subscriber_matched_count_; std::atomic_size_t request_publisher_matched_count_; } CustomClientInfo; typedef struct CustomClientResponse { eprosima::fastrtps::rtps::SampleIdentity sample_identity_; std::unique_ptr buffer_; } CustomClientResponse; class ClientListener : public eprosima::fastdds::dds::DataReaderListener { public: explicit ClientListener( CustomClientInfo * info) : info_(info) { } void on_data_available( eprosima::fastdds::dds::DataReader *) { std::unique_lock lock_mutex(on_new_response_m_); if (on_new_response_cb_) { auto unread_responses = get_unread_responses(); if (0 < unread_responses) { on_new_response_cb_(user_data_, unread_responses); } } } void on_subscription_matched( eprosima::fastdds::dds::DataReader *, const eprosima::fastdds::dds::SubscriptionMatchedStatus & info) final { if (info_ == nullptr) { return; } if (info.current_count_change == 1) { publishers_.insert(eprosima::fastrtps::rtps::iHandle2GUID(info.last_publication_handle)); } else if (info.current_count_change == -1) { publishers_.erase(eprosima::fastrtps::rtps::iHandle2GUID(info.last_publication_handle)); } else { return; } info_->response_subscriber_matched_count_.store(publishers_.size()); } size_t get_unread_responses() { return info_->response_reader_->get_unread_count(true); } // Provide handlers to perform an action when a // new event from this listener has ocurred void set_on_new_response_callback( const void * user_data, rmw_event_callback_t callback) { if (callback) { auto unread_responses = get_unread_responses(); std::lock_guard lock_mutex(on_new_response_m_); if (0 < unread_responses) { callback(user_data, unread_responses); } user_data_ = user_data; on_new_response_cb_ = callback; eprosima::fastdds::dds::StatusMask status_mask = info_->response_reader_->get_status_mask(); status_mask |= eprosima::fastdds::dds::StatusMask::data_available(); info_->response_reader_->set_listener(this, status_mask); } else { std::lock_guard lock_mutex(on_new_response_m_); eprosima::fastdds::dds::StatusMask status_mask = info_->response_reader_->get_status_mask(); status_mask &= ~eprosima::fastdds::dds::StatusMask::data_available(); info_->response_reader_->set_listener(this, status_mask); user_data_ = nullptr; on_new_response_cb_ = nullptr; } } private: CustomClientInfo * info_; std::set publishers_; rmw_event_callback_t on_new_response_cb_{nullptr}; const void * user_data_{nullptr}; std::mutex on_new_response_m_; }; class ClientPubListener : public eprosima::fastdds::dds::DataWriterListener { public: explicit ClientPubListener( CustomClientInfo * info) : info_(info) { } void on_publication_matched( eprosima::fastdds::dds::DataWriter * /* writer */, const eprosima::fastdds::dds::PublicationMatchedStatus & info) final { if (info_ == nullptr) { return; } if (info.current_count_change == 1) { subscriptions_.insert(eprosima::fastrtps::rtps::iHandle2GUID(info.last_subscription_handle)); } else if (info.current_count_change == -1) { subscriptions_.erase(eprosima::fastrtps::rtps::iHandle2GUID(info.last_subscription_handle)); } else { return; } info_->request_publisher_matched_count_.store(subscriptions_.size()); } private: CustomClientInfo * info_; std::set subscriptions_; }; #endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_CLIENT_INFO_HPP_