Class StatefulWriter

Inheritance Relationships

Base Type

Derived Type

Class Documentation

class StatefulWriter : public eprosima::fastrtps::rtps::RTPSWriter

Class StatefulWriter, specialization of RTPSWriter that maintains information of each matched Reader.

Subclassed by eprosima::fastrtps::rtps::StatefulPersistentWriter

Public Functions

virtual ~StatefulWriter()


virtual void unsent_change_added_to_history(CacheChange_t *p, const std::chrono::time_point<std::chrono::steady_clock> &max_blocking_time) override

Add a specific change to all ReaderLocators.

  • p – Pointer to the change.

  • max_blocking_time

virtual bool change_removed_by_history(CacheChange_t *a_change) override

Indicate the writer that a change has been removed by the history due to some HistoryQos requirement.


a_change – Pointer to the change that is going to be removed.


True if removed correctly.

bool intraprocess_delivery(CacheChange_t *change, ReaderProxy *reader_proxy)

Sends a change directly to a intraprocess reader.

inline bool intraprocess_gap(ReaderProxy *reader_proxy, const SequenceNumber_t &seq_num)
bool intraprocess_gap(ReaderProxy *reader_proxy, const SequenceNumber_t &first_seq, const SequenceNumber_t &last_seq)
bool intraprocess_heartbeat(ReaderProxy *reader_proxy, bool liveliness = false)
inline void incrementHBCount()

Increment the HB count.

virtual bool matched_reader_add(const ReaderProxyData &data) override

Add a matched reader.


data – Pointer to the ReaderProxyData object added.


True if added.

virtual bool matched_reader_remove(const GUID_t &reader_guid) override

Remove a matched reader.


reader_guid – GUID of the reader to remove.


True if removed.

virtual bool matched_reader_is_matched(const GUID_t &reader_guid) override

Tells us if a specific Reader is matched against this writer


reader_guid – GUID of the reader to check.


True if it was matched.

virtual bool is_acked_by_all(const CacheChange_t *a_change) const override

Check if a specific change has been acknowledged by all Readers. Is only useful in reliable Writer. In BE Writers returns false when pending to be sent.


True if acknowledged by all.

template<typename Function>
inline Function for_each_reader_proxy(Function f) const
virtual bool wait_for_all_acked(const Duration_t &max_wait) override

Waits until all changes were acknowledged or max_wait.


True if all were acknowledged.

bool all_readers_updated()
virtual bool try_remove_change(const std::chrono::steady_clock::time_point &max_blocking_time_point, std::unique_lock<RecursiveTimedMutex> &lock) override

Remove the change with the minimum SequenceNumber


True if removed.

virtual bool wait_for_acknowledgement(const SequenceNumber_t &seq, const std::chrono::steady_clock::time_point &max_blocking_time_point, std::unique_lock<RecursiveTimedMutex> &lock) override

Waits till a change has been acknowledged.

  • seq – Sequence number to wait for acknowledgement.

  • max_blocking_time_point – Maximum time to wait for.

  • lock – Lock of the Change list.


true when change was acknowledged, false when timeout is reached.

virtual void updateAttributes(const WriterAttributes &att) override

Update the Attributes of the Writer.


att – New attributes

bool matched_reader_lookup(GUID_t &readerGuid, ReaderProxy **RP)

Find a Reader Proxy in this writer.

  • readerGuid[in] The GUID_t of the reader.

  • RP[out] Pointer to pointer to return the ReaderProxy.


True if correct.

inline Count_t getHeartbeatCount() const

Get count of heartbeats


count of heartbeats

inline RTPSParticipantImpl *getRTPSParticipant() const

Get the RTPS participant


RTPS participant

inline size_t getMatchedReadersSize() const

Get the number of matched readers


Number of the matched readers

inline bool get_disable_positive_acks() const

Returns true if disable positive ACKs QoS is enabled.


True if positive acks are disabled, false otherwise

void updateTimes(const WriterTimes &times)

Update the WriterTimes attributes of all associated ReaderProxy.


timesWriterTimes parameter.

SequenceNumber_t next_sequence_number() const
bool send_periodic_heartbeat(bool final = false, bool liveliness = false)

Sends a periodic heartbeat.

  • final – Final flag

  • liveliness – Liveliness flag


True on success

void send_heartbeat_to_nts(ReaderProxy &remoteReaderProxy, bool liveliness = false, bool force = false)

Sends a heartbeat to a remote reader.


This function is non thread-safe.

void perform_nack_response()
void perform_nack_supression(const GUID_t &reader_guid)
virtual bool process_acknack(const GUID_t &writer_guid, const GUID_t &reader_guid, uint32_t ack_count, const SequenceNumberSet_t &sn_set, bool final_flag, bool &result) override

Process an incoming ACKNACK submessage.

  • writer_guid[in] GUID of the writer the submessage is directed to.

  • reader_guid[in] GUID of the reader originating the submessage.

  • ack_count[in] Count field of the submessage.

  • sn_set[in] Sequence number bitmap field of the submessage.

  • final_flag[in] Final flag field of the submessage.

  • result[out] true if the writer could process the submessage. Only valid when returned value is true.


true when the submessage was destinated to this writer, false otherwise.

virtual bool process_nack_frag(const GUID_t &writer_guid, const GUID_t &reader_guid, uint32_t ack_count, const SequenceNumber_t &seq_num, const FragmentNumberSet_t fragments_state, bool &result) override

Process an incoming NACKFRAG submessage.

  • writer_guid[in] GUID of the writer the submessage is directed to.

  • reader_guid[in] GUID of the reader originating the submessage.

  • ack_count[in] Count field of the submessage.

  • seq_num[in] Sequence number field of the submessage.

  • fragments_state[in] Sequence number field of the submessage.

  • result[out] true if the writer could process the submessage. Only valid when returned value is true.


true when the submessage was destinated to this writer, false otherwise.

virtual void reader_data_filter(fastdds::rtps::IReaderDataFilter *filter) final

Set a content filter to perform content filtering on this writer.

This method sets a content filter that will be used to check whether a cache change is relevant for a reader or not.


filter – The content filter to use on this writer. May be nullptr to remove the content filter (i.e. treat all samples as relevant).

virtual const fastdds::rtps::IReaderDataFilter *reader_data_filter() const final

Get the content filter used to perform content filtering on this writer.


The content filter used on this writer.

virtual DeliveryRetCode deliver_sample_nts(CacheChange_t *cache_change, RTPSMessageGroup &group, LocatorSelectorSender &locator_selector, const std::chrono::time_point<std::chrono::steady_clock> &max_blocking_time) override

Tells writer the sample can be sent to the network. This function should be used by a fastdds::rtps::FlowController.


Must be non-thread safe.

  • cache_change – Pointer to the CacheChange_t that represents the sample which can be sent.

  • groupRTPSMessageGroup reference uses for generating the RTPS message.

  • locator_selectorRTPSMessageSenderInterface reference uses for selecting locators. The reference has to be a member of this RTPSWriter object.

  • max_blocking_time – Future timepoint where blocking send should end.


Return code.

inline virtual LocatorSelectorSender &get_general_locator_selector() override
inline virtual LocatorSelectorSender &get_async_locator_selector() override

Protected Functions

StatefulWriter(RTPSParticipantImpl *impl, const GUID_t &guid, const WriterAttributes &att, fastdds::rtps::FlowController *flow_controller, WriterHistory *hist, WriterListener *listen = nullptr)


StatefulWriter(RTPSParticipantImpl *impl, const GUID_t &guid, const WriterAttributes &att, const std::shared_ptr<IPayloadPool> &payload_pool, fastdds::rtps::FlowController *flow_controller, WriterHistory *hist, WriterListener *listen = nullptr)
StatefulWriter(RTPSParticipantImpl *impl, const GUID_t &guid, const WriterAttributes &att, const std::shared_ptr<IPayloadPool> &payload_pool, const std::shared_ptr<IChangePool> &change_pool, fastdds::rtps::FlowController *flow_controller, WriterHistory *hist, WriterListener *listen = nullptr)
void rebuild_status_after_load()
virtual void print_inconsistent_acknack(const GUID_t &writer_guid, const GUID_t &reader_guid, const SequenceNumber_t &min_requested_sequence_number, const SequenceNumber_t &max_requested_sequence_number, const SequenceNumber_t &next_sequence_number)