Class RTPSWriter

Inheritance Relationships

Base Types

Derived Types

Class Documentation

class RTPSWriter : public eprosima::fastrtps::rtps::Endpoint, public eprosima::fastdds::statistics::StatisticsWriterImpl

Class RTPSWriter, manages the sending of data to the readers. Is always associated with a HistoryCache.

Subclassed by eprosima::fastrtps::rtps::StatefulWriter, eprosima::fastrtps::rtps::StatelessWriter

Public Functions

template<typename T>
inline CacheChange_t *new_change(T &data, ChangeKind_t changeKind, InstanceHandle_t handle = c_InstanceHandle_Unknown)

Create a new change based with the provided changeKind.

Parameters:
  • data – Data of the change.

  • changeKind – The type of change.

  • handle – InstanceHandle to assign.

Returns:

Pointer to the CacheChange or nullptr if incorrect.

RTPS_DllAPI CacheChange_t * new_change (const std::function< uint32_t()> &dataCdrSerializedSize, ChangeKind_t changeKind, InstanceHandle_t handle=c_InstanceHandle_Unknown)
RTPS_DllAPI CacheChange_t * new_change (ChangeKind_t changeKind, InstanceHandle_t handle=c_InstanceHandle_Unknown)
RTPS_DllAPI bool release_change (CacheChange_t *change)

Release a change when it is not being used anymore.

Parameters:

change – Pointer to the cache change to be released.

Returns:

whether the operation succeeded or not

Pre:

  • change is not nullptr

  • change points to a cache change obtained from a call to this->new_change

Post:

memory pointed to by change is not accessed

virtual RTPS_DllAPI bool matched_reader_add (const ReaderProxyData &data)=0

Add a matched reader.

Parameters:

data – Pointer to the ReaderProxyData object added.

Returns:

True if added.

virtual RTPS_DllAPI bool matched_reader_remove (const GUID_t &reader_guid)=0

Remove a matched reader.

Parameters:

reader_guid – GUID of the reader to remove.

Returns:

True if removed.

virtual RTPS_DllAPI bool matched_reader_is_matched (const GUID_t &reader_guid)=0

Tells us if a specific Reader is matched against this writer.

Parameters:

reader_guid – GUID of the reader to check.

Returns:

True if it was matched.

virtual RTPS_DllAPI void reader_data_filter (fastdds::rtps::IReaderDataFilter *filter)=0

Set a content filter to perform content filtering on this writer.

This method sets a content filter that will be used to check whether a cache change is relevant for a reader or not.

Parameters:

filter – The content filter to use on this writer. May be nullptr to remove the content filter (i.e. treat all samples as relevant).

virtual RTPS_DllAPI const fastdds::rtps::IReaderDataFilter * reader_data_filter () const =0

Get the content filter used to perform content filtering on this writer.

Returns:

The content filter used on this writer.

inline virtual RTPS_DllAPI bool has_been_fully_delivered (const SequenceNumber_t &seq_num) const

Check if a specific change has been delivered to the transport layer of every matched remote RTPSReader at least once.

Parameters:

seq_num – Sequence number of the change to check.

Returns:

true if delivered. False otherwise.

inline virtual RTPS_DllAPI bool is_acked_by_all (const CacheChange_t *) const

Check if a specific change has been acknowledged by all Readers. Is only useful in reliable Writer. In BE Writers returns false when pending to be sent.

Returns:

True if acknowledged by all.

inline virtual RTPS_DllAPI bool wait_for_all_acked (const Duration_t &)

Waits until all changes were acknowledged or max_wait.

Returns:

True if all were acknowledged.

virtual RTPS_DllAPI void updateAttributes (const WriterAttributes &att)=0

Update the Attributes of the Writer.

Parameters:

att – New attributes

RTPS_DllAPI SequenceNumber_t get_seq_num_min ()

Get Min Seq Num in History.

Returns:

Minimum sequence number in history

RTPS_DllAPI SequenceNumber_t get_seq_num_max ()

Get Max Seq Num in History.

Returns:

Maximum sequence number in history

RTPS_DllAPI uint32_t getTypeMaxSerialized ()

Get maximum size of the serialized type

Returns:

Maximum size of the serialized type

uint32_t getMaxDataSize()

Get maximum size of the data.

uint32_t calculateMaxDataSize(uint32_t length)

Calculates the maximum size of the data.

inline RTPS_DllAPI WriterListener * getListener ()

Get listener

Returns:

Listener

inline RTPS_DllAPI bool set_listener (WriterListener *listener)
inline RTPS_DllAPI bool isAsync () const

Get the publication mode

Returns:

publication mode

RTPS_DllAPI bool remove_older_changes (unsigned int max=0)

Remove an specified max number of changes

Parameters:

max – Maximum number of changes to remove.

Returns:

at least one change has been removed

inline virtual RTPS_DllAPI bool get_disable_positive_acks () const

Returns if disable positive ACKs QoS is enabled.

Returns:

Best effort writers always return false. Reliable writers override this method.

virtual bool try_remove_change(const std::chrono::steady_clock::time_point &max_blocking_time_point, std::unique_lock<RecursiveTimedMutex> &lock) = 0

Tries to remove a change waiting a maximum of the provided microseconds.

Parameters:
  • max_blocking_time_point – Maximum time to wait for.

  • lock – Lock of the Change list.

Returns:

at least one change has been removed

virtual bool wait_for_acknowledgement(const SequenceNumber_t &seq, const std::chrono::steady_clock::time_point &max_blocking_time_point, std::unique_lock<RecursiveTimedMutex> &lock) = 0

Waits till a change has been acknowledged.

Parameters:
  • seq – Sequence number to wait for acknowledgement.

  • max_blocking_time_point – Maximum time to wait for.

  • lock – Lock of the Change list.

Returns:

true when change was acknowledged, false when timeout is reached.

inline RTPSParticipantImpl *getRTPSParticipant() const

Get RTPS participant

Returns:

RTPS participant

inline void set_separate_sending(bool enable)

Enable or disable sending data to readers separately NOTE: This will only work for synchronous writers

Parameters:

enable – If separate sending should be enabled

inline bool get_separate_sending() const

Inform if data is sent to readers separately

Returns:

true if separate sending is enabled

inline virtual bool process_acknack(const GUID_t &writer_guid, const GUID_t &reader_guid, uint32_t ack_count, const SequenceNumberSet_t &sn_set, bool final_flag, bool &result, fastdds::rtps::VendorId_t origin_vendor_id = c_VendorId_Unknown)

Process an incoming ACKNACK submessage.

Parameters:
  • writer_guid[in] GUID of the writer the submessage is directed to.

  • reader_guid[in] GUID of the reader originating the submessage.

  • ack_count[in] Count field of the submessage.

  • sn_set[in] Sequence number bitmap field of the submessage.

  • final_flag[in] Final flag field of the submessage.

  • result[out] true if the writer could process the submessage. Only valid when returned value is true.

  • origin_vendor_id[in] VendorId of the source participant from which the message was received

Returns:

true when the submessage was destinated to this writer, false otherwise.

inline virtual bool process_nack_frag(const GUID_t &writer_guid, const GUID_t &reader_guid, uint32_t ack_count, const SequenceNumber_t &seq_num, const FragmentNumberSet_t fragments_state, bool &result, fastdds::rtps::VendorId_t origin_vendor_id = c_VendorId_Unknown)

Process an incoming NACKFRAG submessage.

Parameters:
  • writer_guid[in] GUID of the writer the submessage is directed to.

  • reader_guid[in] GUID of the reader originating the submessage.

  • ack_count[in] Count field of the submessage.

  • seq_num[in] Sequence number field of the submessage.

  • fragments_state[in] Fragment number bitmap field of the submessage.

  • result[out] true if the writer could process the submessage. Only valid when returned value is true.

  • origin_vendor_id[in] VendorId of the source participant from which the message was received

Returns:

true when the submessage was destinated to this writer, false otherwise.

const LivelinessQosPolicyKind &get_liveliness_kind() const

A method to retrieve the liveliness kind.

Returns:

Liveliness kind

const Duration_t &get_liveliness_lease_duration() const

A method to retrieve the liveliness lease duration.

Returns:

Lease duration

const Duration_t &get_liveliness_announcement_period() const

A method to return the liveliness announcement period.

Returns:

The announcement period

bool is_datasharing_compatible() const
Returns:

Whether the writer is data sharing compatible or not

virtual DeliveryRetCode deliver_sample_nts(CacheChange_t *cache_change, RTPSMessageGroup &group, LocatorSelectorSender &locator_selector, const std::chrono::time_point<std::chrono::steady_clock> &max_blocking_time) = 0

Tells writer the sample can be sent to the network. This function should be used by a fastdds::rtps::FlowController.

Note

Must be non-thread safe.

Parameters:
  • cache_change – Pointer to the CacheChange_t that represents the sample which can be sent.

  • groupRTPSMessageGroup reference uses for generating the RTPS message.

  • locator_selectorRTPSMessageSenderInterface reference uses for selecting locators. The reference has to be a member of this RTPSWriter object.

  • max_blocking_time – Future timepoint where blocking send should end.

Returns:

Return code.

virtual LocatorSelectorSender &get_general_locator_selector() = 0
virtual LocatorSelectorSender &get_async_locator_selector() = 0
virtual bool send_nts(CDRMessage_t *message, const LocatorSelectorSender &locator_selector, std::chrono::steady_clock::time_point &max_blocking_time_point) const

Send a message through this interface.

Parameters:
  • message – Pointer to the buffer with the message already serialized.

  • locator_selectorRTPSMessageSenderInterface reference uses for selecting locators. The reference has to be a member of this RTPSWriter object.

  • max_blocking_time_point – Future timepoint where blocking send should end.

Public Members

LivelinessLostStatus liveliness_lost_status_

Liveliness lost status of this writer.

Protected Functions

RTPSWriter(RTPSParticipantImpl *impl, const GUID_t &guid, const WriterAttributes &att, fastdds::rtps::FlowController *flow_controller, WriterHistory *hist, WriterListener *listen = nullptr)
RTPSWriter(RTPSParticipantImpl *impl, const GUID_t &guid, const WriterAttributes &att, const std::shared_ptr<IPayloadPool> &payload_pool, fastdds::rtps::FlowController *flow_controller, WriterHistory *hist, WriterListener *listen = nullptr)
RTPSWriter(RTPSParticipantImpl *impl, const GUID_t &guid, const WriterAttributes &att, const std::shared_ptr<IPayloadPool> &payload_pool, const std::shared_ptr<IChangePool> &change_pool, fastdds::rtps::FlowController *flow_controller, WriterHistory *hist, WriterListener *listen = nullptr)
virtual ~RTPSWriter()
void add_guid(LocatorSelectorSender &locator_selector, const GUID_t &remote_guid)
void compute_selected_guids(LocatorSelectorSender &locator_selector)
void update_cached_info_nts(LocatorSelectorSender &locator_selector)
virtual void unsent_change_added_to_history(CacheChange_t *change, const std::chrono::time_point<std::chrono::steady_clock> &max_blocking_time) = 0

Add a change to the unsent list.

Parameters:
  • change – Pointer to the change to add.

  • max_blocking_time[in] Maximum time this method has to complete the task.

virtual bool change_removed_by_history(CacheChange_t *a_change, const std::chrono::time_point<std::chrono::steady_clock> &max_blocking_time) = 0

Indicate the writer that a change has been removed by the history due to some HistoryQos requirement.

Parameters:
  • a_change – Pointer to the change that is going to be removed.

  • max_blocking_time[in] Maximum time this method has to complete the task.

Returns:

True if removed correctly.

bool is_datasharing_compatible_with(const ReaderProxyData &rdata) const
bool is_pool_initialized() const
template<typename Functor>
inline bool send_data_or_fragments(RTPSMessageGroup &group, CacheChange_t *change, bool inline_qos, Functor sent_fun)
void add_statistics_sent_submessage(CacheChange_t *change, size_t num_locators)
void deinit()

Protected Attributes

bool m_pushMode = true

Is the data sent directly or announced by HB and THEN sent to the ones who ask for it?.

fastdds::rtps::FlowController *flow_controller_

Flow controller.

WriterHistory *mp_history = nullptr

WriterHistory.

WriterListener *mp_listener = nullptr

Listener.

bool is_async_ = false

Asynchronous publication activated.

bool m_separateSendingEnabled = false

Separate sending activated.

LivelinessQosPolicyKind liveliness_kind_

The liveliness kind of this writer.

Duration_t liveliness_lease_duration_

The liveliness lease duration of this writer.

Duration_t liveliness_announcement_period_

The liveliness announcement period.

Friends

friend class fastdds::dds::DataWriterImpl