Class StatefulWriter

Inheritance Relationships

Base Type

Derived Type

Class Documentation

class StatefulWriter : public eprosima::fastrtps::rtps::RTPSWriter

Class StatefulWriter, specialization of RTPSWriter that maintains information of each matched Reader.

Subclassed by eprosima::fastrtps::rtps::StatefulPersistentWriter

Public Functions

virtual ~StatefulWriter()

Destructor.

virtual void unsent_change_added_to_history(CacheChange_t *p, const std::chrono::time_point<std::chrono::steady_clock> &max_blocking_time) override

Add a specific change to all ReaderLocators.

Parameters
  • p – Pointer to the change.

  • max_blocking_time

virtual bool change_removed_by_history(CacheChange_t *a_change) override

Indicate the writer that a change has been removed by the history due to some HistoryQos requirement.

Parameters

a_change – Pointer to the change that is going to be removed.

Returns

True if removed correctly.

bool intraprocess_delivery(CacheChange_t *change, ReaderProxy *reader_proxy)

Sends a change directly to a intraprocess reader.

inline bool intraprocess_gap(ReaderProxy *reader_proxy, const SequenceNumber_t &seq_num)
bool intraprocess_gap(ReaderProxy *reader_proxy, const SequenceNumber_t &first_seq, const SequenceNumber_t &last_seq)
bool intraprocess_heartbeat(ReaderProxy *reader_proxy, bool liveliness = false)
inline void incrementHBCount()

Increment the HB count.

virtual bool matched_reader_add(const ReaderProxyData &data) override

Add a matched reader.

Parameters

data – Pointer to the ReaderProxyData object added.

Returns

True if added.

virtual bool matched_reader_remove(const GUID_t &reader_guid) override

Remove a matched reader.

Parameters

reader_guid – GUID of the reader to remove.

Returns

True if removed.

virtual bool matched_reader_is_matched(const GUID_t &reader_guid) override

Tells us if a specific Reader is matched against this writer

Parameters

reader_guid – GUID of the reader to check.

Returns

True if it was matched.

virtual bool is_acked_by_all(const CacheChange_t *a_change) const override

Check if a specific change has been acknowledged by all Readers. Is only useful in reliable Writer. In BE Writers returns false when pending to be sent.

Returns

True if acknowledged by all.

template<typename Function>
inline Function for_each_reader_proxy(Function f) const
virtual bool wait_for_all_acked(const Duration_t &max_wait) override

Waits until all changes were acknowledged or max_wait.

Returns

True if all were acknowledged.

bool all_readers_updated()
virtual bool try_remove_change(const std::chrono::steady_clock::time_point &max_blocking_time_point, std::unique_lock<RecursiveTimedMutex> &lock) override

Remove the change with the minimum SequenceNumber

Returns

True if removed.

virtual bool wait_for_acknowledgement(const SequenceNumber_t &seq, const std::chrono::steady_clock::time_point &max_blocking_time_point, std::unique_lock<RecursiveTimedMutex> &lock) override

Waits till a change has been acknowledged.

Parameters
  • seq – Sequence number to wait for acknowledgement.

  • max_blocking_time_point – Maximum time to wait for.

  • lock – Lock of the Change list.

Returns

true when change was acknowledged, false when timeout is reached.

virtual void updateAttributes(const WriterAttributes &att) override

Update the Attributes of the Writer.

Parameters

att – New attributes

bool matched_reader_lookup(GUID_t &readerGuid, ReaderProxy **RP)

Find a Reader Proxy in this writer.

Parameters
  • readerGuid[in] The GUID_t of the reader.

  • RP[out] Pointer to pointer to return the ReaderProxy.

Returns

True if correct.

inline Count_t getHeartbeatCount() const

Get count of heartbeats

Returns

count of heartbeats

inline RTPSParticipantImpl *getRTPSParticipant() const

Get the RTPS participant

Returns

RTPS participant

inline size_t getMatchedReadersSize() const

Get the number of matched readers

Returns

Number of the matched readers

inline bool get_disable_positive_acks() const

Returns true if disable positive ACKs QoS is enabled.

Returns

True if positive acks are disabled, false otherwise

void updateTimes(const WriterTimes &times)

Update the WriterTimes attributes of all associated ReaderProxy.

Parameters

timesWriterTimes parameter.

SequenceNumber_t next_sequence_number() const
bool send_periodic_heartbeat(bool final = false, bool liveliness = false)

Sends a periodic heartbeat.

Parameters
  • final – Final flag

  • liveliness – Liveliness flag

Returns

True on success

void send_heartbeat_to_nts(ReaderProxy &remoteReaderProxy, bool liveliness = false, bool force = false)

Sends a heartbeat to a remote reader.

Remark

This function is non thread-safe.

void perform_nack_response()
void perform_nack_supression(const GUID_t &reader_guid)
virtual bool process_acknack(const GUID_t &writer_guid, const GUID_t &reader_guid, uint32_t ack_count, const SequenceNumberSet_t &sn_set, bool final_flag, bool &result) override

Process an incoming ACKNACK submessage.

Parameters
  • writer_guid[in] GUID of the writer the submessage is directed to.

  • reader_guid[in] GUID of the reader originating the submessage.

  • ack_count[in] Count field of the submessage.

  • sn_set[in] Sequence number bitmap field of the submessage.

  • final_flag[in] Final flag field of the submessage.

  • result[out] true if the writer could process the submessage. Only valid when returned value is true.

Returns

true when the submessage was destinated to this writer, false otherwise.

virtual bool process_nack_frag(const GUID_t &writer_guid, const GUID_t &reader_guid, uint32_t ack_count, const SequenceNumber_t &seq_num, const FragmentNumberSet_t fragments_state, bool &result) override

Process an incoming NACKFRAG submessage.

Parameters
  • writer_guid[in] GUID of the writer the submessage is directed to.

  • reader_guid[in] GUID of the reader originating the submessage.

  • ack_count[in] Count field of the submessage.

  • seq_num[in] Sequence number field of the submessage.

  • fragments_state[in] Sequence number field of the submessage.

  • result[out] true if the writer could process the submessage. Only valid when returned value is true.

Returns

true when the submessage was destinated to this writer, false otherwise.

virtual void reader_data_filter(fastdds::rtps::IReaderDataFilter *filter) final

Set a content filter to perform content filtering on this writer.

This method sets a content filter that will be used to check whether a cache change is relevant for a reader or not.

Parameters

filter – The content filter to use on this writer. May be nullptr to remove the content filter (i.e. treat all samples as relevant).

virtual const fastdds::rtps::IReaderDataFilter *reader_data_filter() const final

Get the content filter used to perform content filtering on this writer.

Returns

The content filter used on this writer.

virtual DeliveryRetCode deliver_sample_nts(CacheChange_t *cache_change, RTPSMessageGroup &group, LocatorSelectorSender &locator_selector, const std::chrono::time_point<std::chrono::steady_clock> &max_blocking_time) override

Tells writer the sample can be sent to the network. This function should be used by a fastdds::rtps::FlowController.

Note

Must be non-thread safe.

Parameters
  • cache_change – Pointer to the CacheChange_t that represents the sample which can be sent.

  • groupRTPSMessageGroup reference uses for generating the RTPS message.

  • locator_selectorRTPSMessageSenderInterface reference uses for selecting locators. The reference has to be a member of this RTPSWriter object.

  • max_blocking_time – Future timepoint where blocking send should end.

Returns

Return code.

inline virtual LocatorSelectorSender &get_general_locator_selector() override
inline virtual LocatorSelectorSender &get_async_locator_selector() override

Protected Functions

StatefulWriter(RTPSParticipantImpl *impl, const GUID_t &guid, const WriterAttributes &att, fastdds::rtps::FlowController *flow_controller, WriterHistory *hist, WriterListener *listen = nullptr)

Constructor.

StatefulWriter(RTPSParticipantImpl *impl, const GUID_t &guid, const WriterAttributes &att, const std::shared_ptr<IPayloadPool> &payload_pool, fastdds::rtps::FlowController *flow_controller, WriterHistory *hist, WriterListener *listen = nullptr)
StatefulWriter(RTPSParticipantImpl *impl, const GUID_t &guid, const WriterAttributes &att, const std::shared_ptr<IPayloadPool> &payload_pool, const std::shared_ptr<IChangePool> &change_pool, fastdds::rtps::FlowController *flow_controller, WriterHistory *hist, WriterListener *listen = nullptr)
void rebuild_status_after_load()
virtual void print_inconsistent_acknack(const GUID_t &writer_guid, const GUID_t &reader_guid, const SequenceNumber_t &min_requested_sequence_number, const SequenceNumber_t &max_requested_sequence_number, const SequenceNumber_t &next_sequence_number)