Class RTPSWriter

Inheritance Relationships

Base Types

Derived Types

Class Documentation

class RTPSWriter : public eprosima::fastrtps::rtps::Endpoint, public eprosima::fastdds::statistics::StatisticsWriterImpl

Class RTPSWriter, manages the sending of data to the readers. Is always associated with a HistoryCache.

Subclassed by eprosima::fastrtps::rtps::StatefulWriter, eprosima::fastrtps::rtps::StatelessWriter

Public Functions

template<typename T>
inline CacheChange_t *new_change(T &data, ChangeKind_t changeKind, InstanceHandle_t handle = c_InstanceHandle_Unknown)

Create a new change based with the provided changeKind.

Parameters
  • data – Data of the change.

  • changeKind – The type of change.

  • handle – InstanceHandle to assign.

Returns

Pointer to the CacheChange or nullptr if incorrect.

RTPS_DllAPI CacheChange_t * new_change (const std::function< uint32_t()> &dataCdrSerializedSize, ChangeKind_t changeKind, InstanceHandle_t handle=c_InstanceHandle_Unknown)
RTPS_DllAPI CacheChange_t * new_change (ChangeKind_t changeKind, InstanceHandle_t handle=c_InstanceHandle_Unknown)
RTPS_DllAPI bool release_change (CacheChange_t *change)

Release a change when it is not being used anymore.

Parameters

change – Pointer to the cache change to be released.

Returns

whether the operation succeeded or not

Pre

  • change is not nullptr

  • change points to a cache change obtained from a call to this->new_change

Post

memory pointed to by change is not accessed

virtual RTPS_DllAPI bool matched_reader_add (const ReaderProxyData &data)=0

Add a matched reader.

Parameters

data – Pointer to the ReaderProxyData object added.

Returns

True if added.

virtual RTPS_DllAPI bool matched_reader_remove (const GUID_t &reader_guid)=0

Remove a matched reader.

Parameters

reader_guid – GUID of the reader to remove.

Returns

True if removed.

virtual RTPS_DllAPI bool matched_reader_is_matched (const GUID_t &reader_guid)=0

Tells us if a specific Reader is matched against this writer.

Parameters

reader_guid – GUID of the reader to check.

Returns

True if it was matched.

virtual RTPS_DllAPI void reader_data_filter (fastdds::rtps::IReaderDataFilter *filter)=0

Set a content filter to perform content filtering on this writer.

This method sets a content filter that will be used to check whether a cache change is relevant for a reader or not.

Parameters

filter – The content filter to use on this writer. May be nullptr to remove the content filter (i.e. treat all samples as relevant).

virtual RTPS_DllAPI const fastdds::rtps::IReaderDataFilter * reader_data_filter () const =0

Get the content filter used to perform content filtering on this writer.

Returns

The content filter used on this writer.

inline virtual RTPS_DllAPI bool is_acked_by_all (const CacheChange_t *) const

Check if a specific change has been acknowledged by all Readers. Is only useful in reliable Writer. In BE Writers returns false when pending to be sent.

Returns

True if acknowledged by all.

inline virtual RTPS_DllAPI bool wait_for_all_acked (const Duration_t &)

Waits until all changes were acknowledged or max_wait.

Returns

True if all were acknowledged.

virtual RTPS_DllAPI void updateAttributes (const WriterAttributes &att)=0

Update the Attributes of the Writer.

Parameters

att – New attributes

RTPS_DllAPI SequenceNumber_t get_seq_num_min ()

Get Min Seq Num in History.

Returns

Minimum sequence number in history

RTPS_DllAPI SequenceNumber_t get_seq_num_max ()

Get Max Seq Num in History.

Returns

Maximum sequence number in history

RTPS_DllAPI uint32_t getTypeMaxSerialized ()

Get maximum size of the serialized type

Returns

Maximum size of the serialized type

uint32_t getMaxDataSize()

Get maximum size of the data.

uint32_t calculateMaxDataSize(uint32_t length)

Calculates the maximum size of the data.

inline RTPS_DllAPI WriterListener * getListener ()

Get listener

Returns

Listener

inline RTPS_DllAPI bool set_listener (WriterListener *listener)
inline RTPS_DllAPI bool isAsync () const

Get the publication mode

Returns

publication mode

RTPS_DllAPI bool remove_older_changes (unsigned int max=0)

Remove an specified max number of changes

Parameters

max – Maximum number of changes to remove.

Returns

at least one change has been removed

virtual bool try_remove_change(const std::chrono::steady_clock::time_point &max_blocking_time_point, std::unique_lock<RecursiveTimedMutex> &lock) = 0

Tries to remove a change waiting a maximum of the provided microseconds.

Parameters
  • max_blocking_time_point – Maximum time to wait for.

  • lock – Lock of the Change list.

Returns

at least one change has been removed

virtual bool wait_for_acknowledgement(const SequenceNumber_t &seq, const std::chrono::steady_clock::time_point &max_blocking_time_point, std::unique_lock<RecursiveTimedMutex> &lock) = 0

Waits till a change has been acknowledged.

Parameters
  • seq – Sequence number to wait for acknowledgement.

  • max_blocking_time_point – Maximum time to wait for.

  • lock – Lock of the Change list.

Returns

true when change was acknowledged, false when timeout is reached.

inline RTPSParticipantImpl *getRTPSParticipant() const

Get RTPS participant

Returns

RTPS participant

inline void set_separate_sending(bool enable)

Enable or disable sending data to readers separately NOTE: This will only work for synchronous writers

Parameters

enable – If separate sending should be enabled

inline bool get_separate_sending() const

Inform if data is sent to readers separately

Returns

true if separate sending is enabled

inline virtual bool process_acknack(const GUID_t &writer_guid, const GUID_t &reader_guid, uint32_t ack_count, const SequenceNumberSet_t &sn_set, bool final_flag, bool &result)

Process an incoming ACKNACK submessage.

Parameters
  • writer_guid[in] GUID of the writer the submessage is directed to.

  • reader_guid[in] GUID of the reader originating the submessage.

  • ack_count[in] Count field of the submessage.

  • sn_set[in] Sequence number bitmap field of the submessage.

  • final_flag[in] Final flag field of the submessage.

  • result[out] true if the writer could process the submessage. Only valid when returned value is true.

Returns

true when the submessage was destinated to this writer, false otherwise.

inline virtual bool process_nack_frag(const GUID_t &writer_guid, const GUID_t &reader_guid, uint32_t ack_count, const SequenceNumber_t &seq_num, const FragmentNumberSet_t fragments_state, bool &result)

Process an incoming NACKFRAG submessage.

Parameters
  • writer_guid[in] GUID of the writer the submessage is directed to.

  • reader_guid[in] GUID of the reader originating the submessage.

  • ack_count[in] Count field of the submessage.

  • seq_num[in] Sequence number field of the submessage.

  • fragments_state[in] Fragment number bitmap field of the submessage.

  • result[out] true if the writer could process the submessage. Only valid when returned value is true.

Returns

true when the submessage was destinated to this writer, false otherwise.

const LivelinessQosPolicyKind &get_liveliness_kind() const

A method to retrieve the liveliness kind.

Returns

Liveliness kind

const Duration_t &get_liveliness_lease_duration() const

A method to retrieve the liveliness lease duration.

Returns

Lease duration

const Duration_t &get_liveliness_announcement_period() const

A method to return the liveliness announcement period.

Returns

The announcement period

bool is_datasharing_compatible() const
Returns

Whether the writer is data sharing compatible or not

virtual DeliveryRetCode deliver_sample_nts(CacheChange_t *cache_change, RTPSMessageGroup &group, LocatorSelectorSender &locator_selector, const std::chrono::time_point<std::chrono::steady_clock> &max_blocking_time) = 0

Tells writer the sample can be sent to the network. This function should be used by a fastdds::rtps::FlowController.

Note

Must be non-thread safe.

Parameters
  • cache_change – Pointer to the CacheChange_t that represents the sample which can be sent.

  • groupRTPSMessageGroup reference uses for generating the RTPS message.

  • locator_selectorRTPSMessageSenderInterface reference uses for selecting locators. The reference has to be a member of this RTPSWriter object.

  • max_blocking_time – Future timepoint where blocking send should end.

Returns

Return code.

virtual LocatorSelectorSender &get_general_locator_selector() = 0
virtual LocatorSelectorSender &get_async_locator_selector() = 0
virtual bool send_nts(CDRMessage_t *message, const LocatorSelectorSender &locator_selector, std::chrono::steady_clock::time_point &max_blocking_time_point) const

Send a message through this interface.

Parameters
  • message – Pointer to the buffer with the message already serialized.

  • locator_selectorRTPSMessageSenderInterface reference uses for selecting locators. The reference has to be a member of this RTPSWriter object.

  • max_blocking_time_point – Future timepoint where blocking send should end.

Public Members

LivelinessLostStatus liveliness_lost_status_

Liveliness lost status of this writer.

Protected Functions

RTPSWriter(RTPSParticipantImpl *impl, const GUID_t &guid, const WriterAttributes &att, fastdds::rtps::FlowController *flow_controller, WriterHistory *hist, WriterListener *listen = nullptr)
RTPSWriter(RTPSParticipantImpl *impl, const GUID_t &guid, const WriterAttributes &att, const std::shared_ptr<IPayloadPool> &payload_pool, fastdds::rtps::FlowController *flow_controller, WriterHistory *hist, WriterListener *listen = nullptr)
RTPSWriter(RTPSParticipantImpl *impl, const GUID_t &guid, const WriterAttributes &att, const std::shared_ptr<IPayloadPool> &payload_pool, const std::shared_ptr<IChangePool> &change_pool, fastdds::rtps::FlowController *flow_controller, WriterHistory *hist, WriterListener *listen = nullptr)
virtual ~RTPSWriter()
void add_guid(LocatorSelectorSender &locator_selector, const GUID_t &remote_guid)
void compute_selected_guids(LocatorSelectorSender &locator_selector)
void update_cached_info_nts(LocatorSelectorSender &locator_selector)
virtual void unsent_change_added_to_history(CacheChange_t *change, const std::chrono::time_point<std::chrono::steady_clock> &max_blocking_time) = 0

Add a change to the unsent list.

Parameters
  • change – Pointer to the change to add.

  • max_blocking_time

virtual bool change_removed_by_history(CacheChange_t *a_change) = 0

Indicate the writer that a change has been removed by the history due to some HistoryQos requirement.

Parameters

a_change – Pointer to the change that is going to be removed.

Returns

True if removed correctly.

bool is_datasharing_compatible_with(const ReaderProxyData &rdata) const
bool is_pool_initialized() const
template<typename Functor>
inline bool send_data_or_fragments(RTPSMessageGroup &group, CacheChange_t *change, bool inline_qos, Functor sent_fun)
void add_statistics_sent_submessage(CacheChange_t *change, size_t num_locators)
void deinit()

Protected Attributes

bool m_pushMode = true

Is the data sent directly or announced by HB and THEN sent to the ones who ask for it?.

fastdds::rtps::FlowController *flow_controller_

Flow controller.

WriterHistory *mp_history = nullptr

WriterHistory.

WriterListener *mp_listener = nullptr

Listener.

bool is_async_ = false

Asynchronous publication activated.

bool m_separateSendingEnabled = false

Separate sending activated.

LivelinessQosPolicyKind liveliness_kind_

The liveliness kind of this writer.

Duration_t liveliness_lease_duration_

The liveliness lease duration of this writer.

Duration_t liveliness_announcement_period_

The liveliness announcement period.

Friends

friend class fastdds::dds::DataWriterImpl