Program Listing for File custom_client_info.hpp
↰ Return to documentation for file (include/rmw_fastrtps_shared_cpp/custom_client_info.hpp
)
// Copyright 2016-2018 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef RMW_FASTRTPS_SHARED_CPP__CUSTOM_CLIENT_INFO_HPP_
#define RMW_FASTRTPS_SHARED_CPP__CUSTOM_CLIENT_INFO_HPP_
#include <memory>
#include <mutex>
#include <set>
#include <utility>
#include <string>
#include "fastcdr/FastBuffer.h"
#include "fastdds/dds/core/status/PublicationMatchedStatus.hpp"
#include "fastdds/dds/core/status/SubscriptionMatchedStatus.hpp"
#include "fastdds/dds/publisher/DataWriter.hpp"
#include "fastdds/dds/publisher/DataWriterListener.hpp"
#include "fastdds/dds/subscriber/DataReader.hpp"
#include "fastdds/dds/subscriber/DataReaderListener.hpp"
#include "fastdds/dds/subscriber/SampleInfo.hpp"
#include "fastdds/dds/subscriber/qos/DataReaderQos.hpp"
#include "fastdds/dds/topic/TypeSupport.hpp"
#include "fastdds/rtps/common/Guid.h"
#include "fastdds/rtps/common/InstanceHandle.h"
#include "fastdds/rtps/common/SampleIdentity.h"
#include "rcpputils/thread_safety_annotations.hpp"
#include "rmw/event_callback_type.h"
#include "rmw_fastrtps_shared_cpp/TypeSupport.hpp"
class ClientListener;
class ClientPubListener;
typedef struct CustomClientInfo
{
eprosima::fastdds::dds::TypeSupport request_type_support_{nullptr};
const void * request_type_support_impl_{nullptr};
eprosima::fastdds::dds::TypeSupport response_type_support_{nullptr};
const void * response_type_support_impl_{nullptr};
eprosima::fastdds::dds::DataReader * response_reader_{nullptr};
eprosima::fastdds::dds::DataWriter * request_writer_{nullptr};
std::string request_topic_;
std::string response_topic_;
ClientListener * listener_{nullptr};
eprosima::fastrtps::rtps::GUID_t writer_guid_;
eprosima::fastrtps::rtps::GUID_t reader_guid_;
const char * typesupport_identifier_{nullptr};
ClientPubListener * pub_listener_{nullptr};
std::atomic_size_t response_subscriber_matched_count_;
std::atomic_size_t request_publisher_matched_count_;
} CustomClientInfo;
typedef struct CustomClientResponse
{
eprosima::fastrtps::rtps::SampleIdentity sample_identity_;
std::unique_ptr<eprosima::fastcdr::FastBuffer> buffer_;
} CustomClientResponse;
class ClientListener : public eprosima::fastdds::dds::DataReaderListener
{
public:
explicit ClientListener(
CustomClientInfo * info)
: info_(info)
{
}
void
on_data_available(
eprosima::fastdds::dds::DataReader *)
{
std::unique_lock<std::mutex> lock_mutex(on_new_response_m_);
if (on_new_response_cb_) {
auto unread_responses = get_unread_responses();
if (0 < unread_responses) {
on_new_response_cb_(user_data_, unread_responses);
}
}
}
void on_subscription_matched(
eprosima::fastdds::dds::DataReader *,
const eprosima::fastdds::dds::SubscriptionMatchedStatus & info) final
{
if (info_ == nullptr) {
return;
}
if (info.current_count_change == 1) {
publishers_.insert(eprosima::fastrtps::rtps::iHandle2GUID(info.last_publication_handle));
} else if (info.current_count_change == -1) {
publishers_.erase(eprosima::fastrtps::rtps::iHandle2GUID(info.last_publication_handle));
} else {
return;
}
info_->response_subscriber_matched_count_.store(publishers_.size());
}
size_t get_unread_responses()
{
return info_->response_reader_->get_unread_count(true);
}
// Provide handlers to perform an action when a
// new event from this listener has ocurred
void
set_on_new_response_callback(
const void * user_data,
rmw_event_callback_t callback)
{
if (callback) {
auto unread_responses = get_unread_responses();
std::lock_guard<std::mutex> lock_mutex(on_new_response_m_);
if (0 < unread_responses) {
callback(user_data, unread_responses);
}
user_data_ = user_data;
on_new_response_cb_ = callback;
eprosima::fastdds::dds::StatusMask status_mask = info_->response_reader_->get_status_mask();
status_mask |= eprosima::fastdds::dds::StatusMask::data_available();
info_->response_reader_->set_listener(this, status_mask);
} else {
std::lock_guard<std::mutex> lock_mutex(on_new_response_m_);
eprosima::fastdds::dds::StatusMask status_mask = info_->response_reader_->get_status_mask();
status_mask &= ~eprosima::fastdds::dds::StatusMask::data_available();
info_->response_reader_->set_listener(this, status_mask);
user_data_ = nullptr;
on_new_response_cb_ = nullptr;
}
}
private:
CustomClientInfo * info_;
std::set<eprosima::fastrtps::rtps::GUID_t> publishers_;
rmw_event_callback_t on_new_response_cb_{nullptr};
const void * user_data_{nullptr};
std::mutex on_new_response_m_;
};
class ClientPubListener : public eprosima::fastdds::dds::DataWriterListener
{
public:
explicit ClientPubListener(
CustomClientInfo * info)
: info_(info)
{
}
void on_publication_matched(
eprosima::fastdds::dds::DataWriter * /* writer */,
const eprosima::fastdds::dds::PublicationMatchedStatus & info) final
{
if (info_ == nullptr) {
return;
}
if (info.current_count_change == 1) {
subscriptions_.insert(eprosima::fastrtps::rtps::iHandle2GUID(info.last_subscription_handle));
} else if (info.current_count_change == -1) {
subscriptions_.erase(eprosima::fastrtps::rtps::iHandle2GUID(info.last_subscription_handle));
} else {
return;
}
info_->request_publisher_matched_count_.store(subscriptions_.size());
}
private:
CustomClientInfo * info_;
std::set<eprosima::fastrtps::rtps::GUID_t> subscriptions_;
};
#endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_CLIENT_INFO_HPP_