Program Listing for File RTPSWriter.h
↰ Return to documentation for file (/tmp/ws/src/fastrtps/include/fastdds/rtps/writer/RTPSWriter.h
)
// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef _FASTDDS_RTPS_RTPSWRITER_H_
#define _FASTDDS_RTPS_RTPSWRITER_H_
#include <chrono>
#include <functional>
#include <memory>
#include <mutex>
#include <vector>
#include <fastdds/rtps/Endpoint.h>
#include <fastdds/rtps/attributes/HistoryAttributes.h>
#include <fastdds/rtps/attributes/WriterAttributes.h>
#include <fastdds/rtps/builtin/data/ReaderProxyData.h>
#include <fastdds/rtps/interfaces/IReaderDataFilter.hpp>
#include <fastdds/rtps/messages/RTPSMessageGroup.h>
#include "DeliveryRetCode.hpp"
#include "LocatorSelectorSender.hpp"
#include <fastrtps/qos/LivelinessLostStatus.h>
#include <fastdds/statistics/rtps/StatisticsCommon.hpp>
namespace eprosima {
namespace fastdds {
namespace rtps {
class FlowController;
} // namespace rtps
namespace dds {
class DataWriterImpl;
} // namespace dds
} // namespace fastdds
namespace fastrtps {
namespace rtps {
class WriterListener;
class WriterHistory;
class DataSharingNotifier;
struct CacheChange_t;
class RTPSWriter
: public Endpoint
, public fastdds::statistics::StatisticsWriterImpl
{
friend class WriterHistory;
friend class RTPSParticipantImpl;
friend class RTPSMessageGroup;
friend class fastdds::dds::DataWriterImpl;
protected:
RTPSWriter(
RTPSParticipantImpl* impl,
const GUID_t& guid,
const WriterAttributes& att,
fastdds::rtps::FlowController* flow_controller,
WriterHistory* hist,
WriterListener* listen = nullptr);
RTPSWriter(
RTPSParticipantImpl* impl,
const GUID_t& guid,
const WriterAttributes& att,
const std::shared_ptr<IPayloadPool>& payload_pool,
fastdds::rtps::FlowController* flow_controller,
WriterHistory* hist,
WriterListener* listen = nullptr);
RTPSWriter(
RTPSParticipantImpl* impl,
const GUID_t& guid,
const WriterAttributes& att,
const std::shared_ptr<IPayloadPool>& payload_pool,
const std::shared_ptr<IChangePool>& change_pool,
fastdds::rtps::FlowController* flow_controller,
WriterHistory* hist,
WriterListener* listen = nullptr);
virtual ~RTPSWriter();
public:
template<typename T>
CacheChange_t* new_change(
T& data,
ChangeKind_t changeKind,
InstanceHandle_t handle = c_InstanceHandle_Unknown)
{
return new_change([data]() -> uint32_t
{
return (uint32_t)T::getCdrSerializedSize(data);
}, changeKind, handle);
}
RTPS_DllAPI CacheChange_t* new_change(
const std::function<uint32_t()>& dataCdrSerializedSize,
ChangeKind_t changeKind,
InstanceHandle_t handle = c_InstanceHandle_Unknown);
RTPS_DllAPI CacheChange_t* new_change(
ChangeKind_t changeKind,
InstanceHandle_t handle = c_InstanceHandle_Unknown);
RTPS_DllAPI bool release_change(
CacheChange_t* change);
RTPS_DllAPI virtual bool matched_reader_add(
const ReaderProxyData& data) = 0;
RTPS_DllAPI virtual bool matched_reader_remove(
const GUID_t& reader_guid) = 0;
RTPS_DllAPI virtual bool matched_reader_is_matched(
const GUID_t& reader_guid) = 0;
RTPS_DllAPI virtual void reader_data_filter(
fastdds::rtps::IReaderDataFilter* filter) = 0;
RTPS_DllAPI virtual const fastdds::rtps::IReaderDataFilter* reader_data_filter() const = 0;
RTPS_DllAPI virtual bool is_acked_by_all(
const CacheChange_t* /*a_change*/) const
{
return false;
}
RTPS_DllAPI virtual bool wait_for_all_acked(
const Duration_t& /*max_wait*/)
{
return true;
}
RTPS_DllAPI virtual void updateAttributes(
const WriterAttributes& att) = 0;
RTPS_DllAPI SequenceNumber_t get_seq_num_min();
RTPS_DllAPI SequenceNumber_t get_seq_num_max();
RTPS_DllAPI uint32_t getTypeMaxSerialized();
uint32_t getMaxDataSize();
uint32_t calculateMaxDataSize(
uint32_t length);
RTPS_DllAPI inline WriterListener* getListener()
{
return mp_listener;
}
RTPS_DllAPI inline bool set_listener(
WriterListener* listener)
{
mp_listener = listener;
return true;
}
RTPS_DllAPI inline bool isAsync() const
{
return is_async_;
}
RTPS_DllAPI bool remove_older_changes(
unsigned int max = 0);
virtual bool try_remove_change(
const std::chrono::steady_clock::time_point& max_blocking_time_point,
std::unique_lock<RecursiveTimedMutex>& lock) = 0;
virtual bool wait_for_acknowledgement(
const SequenceNumber_t& seq,
const std::chrono::steady_clock::time_point& max_blocking_time_point,
std::unique_lock<RecursiveTimedMutex>& lock) = 0;
#ifdef FASTDDS_STATISTICS
RTPS_DllAPI bool add_statistics_listener(
std::shared_ptr<fastdds::statistics::IListener> listener);
RTPS_DllAPI bool remove_statistics_listener(
std::shared_ptr<fastdds::statistics::IListener> listener);
RTPS_DllAPI void set_enabled_statistics_writers_mask(
uint32_t enabled_writers);
#endif // FASTDDS_STATISTICS
inline RTPSParticipantImpl* getRTPSParticipant() const
{
return mp_RTPSParticipant;
}
void set_separate_sending (
bool enable)
{
m_separateSendingEnabled = enable;
}
bool get_separate_sending () const
{
return m_separateSendingEnabled;
}
virtual bool process_acknack(
const GUID_t& writer_guid,
const GUID_t& reader_guid,
uint32_t ack_count,
const SequenceNumberSet_t& sn_set,
bool final_flag,
bool& result)
{
(void)reader_guid; (void)ack_count; (void)sn_set; (void)final_flag;
result = false;
return writer_guid == m_guid;
}
virtual bool process_nack_frag(
const GUID_t& writer_guid,
const GUID_t& reader_guid,
uint32_t ack_count,
const SequenceNumber_t& seq_num,
const FragmentNumberSet_t fragments_state,
bool& result)
{
(void)reader_guid; (void)ack_count; (void)seq_num; (void)fragments_state;
result = false;
return writer_guid == m_guid;
}
const LivelinessQosPolicyKind& get_liveliness_kind() const;
const Duration_t& get_liveliness_lease_duration() const;
const Duration_t& get_liveliness_announcement_period() const;
LivelinessLostStatus liveliness_lost_status_;
bool is_datasharing_compatible() const;
virtual DeliveryRetCode deliver_sample_nts(
CacheChange_t* cache_change,
RTPSMessageGroup& group,
LocatorSelectorSender& locator_selector,
const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time) = 0;
virtual LocatorSelectorSender& get_general_locator_selector() = 0;
virtual LocatorSelectorSender& get_async_locator_selector() = 0;
virtual bool send_nts(
CDRMessage_t* message,
const LocatorSelectorSender& locator_selector,
std::chrono::steady_clock::time_point& max_blocking_time_point) const;
protected:
bool m_pushMode = true;
fastdds::rtps::FlowController* flow_controller_;
WriterHistory* mp_history = nullptr;
WriterListener* mp_listener = nullptr;
bool is_async_ = false;
bool m_separateSendingEnabled = false;
LivelinessQosPolicyKind liveliness_kind_;
Duration_t liveliness_lease_duration_;
Duration_t liveliness_announcement_period_;
void add_guid(
LocatorSelectorSender& locator_selector,
const GUID_t& remote_guid);
void compute_selected_guids(
LocatorSelectorSender& locator_selector);
void update_cached_info_nts(
LocatorSelectorSender& locator_selector);
virtual void unsent_change_added_to_history(
CacheChange_t* change,
const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time) = 0;
virtual bool change_removed_by_history(
CacheChange_t* a_change) = 0;
bool is_datasharing_compatible_with(
const ReaderProxyData& rdata) const;
bool is_pool_initialized() const;
template<typename Functor>
bool send_data_or_fragments(
RTPSMessageGroup& group,
CacheChange_t* change,
bool inline_qos,
Functor sent_fun)
{
bool sent_ok = true;
uint32_t n_fragments = change->getFragmentCount();
if (n_fragments > 0)
{
for (FragmentNumber_t frag = 1; frag <= n_fragments; frag++)
{
sent_ok &= group.add_data_frag(*change, frag, inline_qos);
if (sent_ok)
{
sent_fun(change, frag);
}
else
{
EPROSIMA_LOG_ERROR(RTPS_WRITER,
"Error sending fragment (" << change->sequenceNumber << ", " << frag << ")");
break;
}
}
}
else
{
sent_ok = group.add_data(*change, inline_qos);
if (sent_ok)
{
sent_fun(change, 0);
}
else
{
EPROSIMA_LOG_ERROR(RTPS_WRITER, "Error sending change " << change->sequenceNumber);
}
}
return sent_ok;
}
void add_statistics_sent_submessage(
CacheChange_t* change,
size_t num_locators);
void deinit();
private:
RecursiveTimedMutex& get_mutex()
{
return mp_mutex;
}
RTPSWriter& operator =(
const RTPSWriter&) = delete;
void init(
const std::shared_ptr<IPayloadPool>& payload_pool,
const std::shared_ptr<IChangePool>& change_pool,
const WriterAttributes& att);
RTPSWriter* next_[2] = { nullptr, nullptr };
};
} /* namespace rtps */
} /* namespace fastrtps */
} /* namespace eprosima */
#endif /* _FASTDDS_RTPS_RTPSWRITER_H_ */