Class StatefulWriter

Inheritance Relationships

Base Type

Derived Type

Class Documentation

class StatefulWriter : public eprosima::fastrtps::rtps::RTPSWriter

Class StatefulWriter, specialization of RTPSWriter that maintains information of each matched Reader.

Subclassed by eprosima::fastrtps::rtps::StatefulPersistentWriter

Public Functions

virtual ~StatefulWriter()

Destructor.

virtual void unsent_change_added_to_history(CacheChange_t *p, const std::chrono::time_point<std::chrono::steady_clock> &max_blocking_time) override

Add a specific change to all ReaderLocators.

Parameters:
  • p – Pointer to the change.

  • max_blocking_time[in] Maximum time this method has to complete the task.

virtual bool change_removed_by_history(CacheChange_t *a_change, const std::chrono::time_point<std::chrono::steady_clock> &max_blocking_time) override

Indicate the writer that a change has been removed by the history due to some HistoryQos requirement.

Parameters:
  • a_change – Pointer to the change that is going to be removed.

  • max_blocking_time[in] Maximum time this method has to complete the task.

Returns:

True if removed correctly.

bool intraprocess_delivery(CacheChange_t *change, ReaderProxy *reader_proxy)

Sends a change directly to a intraprocess reader.

inline bool intraprocess_gap(ReaderProxy *reader_proxy, const SequenceNumber_t &seq_num)
bool intraprocess_gap(ReaderProxy *reader_proxy, const SequenceNumber_t &first_seq, const SequenceNumber_t &last_seq)
bool intraprocess_heartbeat(ReaderProxy *reader_proxy, bool liveliness = false)
inline void incrementHBCount()

Increment the HB count.

virtual bool matched_reader_add(const ReaderProxyData &data) override

Add a matched reader.

Parameters:

data – Pointer to the ReaderProxyData object added.

Returns:

True if added.

virtual bool matched_reader_remove(const GUID_t &reader_guid) override

Remove a matched reader.

Parameters:

reader_guid – GUID of the reader to remove.

Returns:

True if removed.

virtual bool matched_reader_is_matched(const GUID_t &reader_guid) override

Tells us if a specific Reader is matched against this writer

Parameters:

reader_guid – GUID of the reader to check.

Returns:

True if it was matched.

virtual bool has_been_fully_delivered(const SequenceNumber_t &seq_num) const override

Check if a specific change has been delivered to the transport layer at least once for every matched remote RTPSReader.

Parameters:

seq_num – Sequence number of the change to check.

Returns:

true if delivered.

Returns:

false otherwise.

virtual bool is_acked_by_all(const CacheChange_t *a_change) const override

Check if a specific change has been acknowledged by all Readers. Is only useful in reliable Writer. In BE Writers returns false when pending to be sent.

Returns:

True if acknowledged by all.

template<typename Function>
inline Function for_each_reader_proxy(Function f) const
virtual bool wait_for_all_acked(const Duration_t &max_wait) override

Waits until all changes were acknowledged or max_wait.

Returns:

True if all were acknowledged.

bool all_readers_updated()
virtual bool try_remove_change(const std::chrono::steady_clock::time_point &max_blocking_time_point, std::unique_lock<RecursiveTimedMutex> &lock) override

Remove the change with the minimum SequenceNumber

Returns:

True if removed.

virtual bool wait_for_acknowledgement(const SequenceNumber_t &seq, const std::chrono::steady_clock::time_point &max_blocking_time_point, std::unique_lock<RecursiveTimedMutex> &lock) override

Waits till a change has been acknowledged.

Parameters:
  • seq – Sequence number to wait for acknowledgement.

  • max_blocking_time_point – Maximum time to wait for.

  • lock – Lock of the Change list.

Returns:

true when change was acknowledged, false when timeout is reached.

virtual void updateAttributes(const WriterAttributes &att) override

Update the Attributes of the Writer.

Parameters:

att – New attributes

bool matched_reader_lookup(GUID_t &readerGuid, ReaderProxy **RP)

Find a Reader Proxy in this writer.

Parameters:
  • readerGuid[in] The GUID_t of the reader.

  • RP[out] Pointer to pointer to return the ReaderProxy.

Returns:

True if correct.

inline Count_t getHeartbeatCount() const

Get count of heartbeats

Returns:

count of heartbeats

inline RTPSParticipantImpl *getRTPSParticipant() const

Get the RTPS participant

Returns:

RTPS participant

inline size_t getMatchedReadersSize() const

Get the number of matched readers

Returns:

Number of the matched readers

inline virtual bool get_disable_positive_acks() const override

Returns true if disable positive ACKs QoS is enabled.

Returns:

True if positive acks are disabled, false otherwise

void updateTimes(const WriterTimes &times)

Update the WriterTimes attributes of all associated ReaderProxy.

Parameters:

timesWriterTimes parameter.

void updatePositiveAcks(const WriterAttributes &att)

Update the period of the disable positive ACKs policy.

Parameters:

attWriterAttributes parameter.

SequenceNumber_t next_sequence_number() const
bool send_periodic_heartbeat(bool final = false, bool liveliness = false)

Sends a periodic heartbeat.

Parameters:
  • final – Final flag

  • liveliness – Liveliness flag

Returns:

True on success

void send_heartbeat_to_nts(ReaderProxy &remoteReaderProxy, bool liveliness = false, bool force = false)

Sends a heartbeat to a remote reader.

Remark

This function is non thread-safe.

void perform_nack_response()
void perform_nack_supression(const GUID_t &reader_guid)
virtual bool process_acknack(const GUID_t &writer_guid, const GUID_t &reader_guid, uint32_t ack_count, const SequenceNumberSet_t &sn_set, bool final_flag, bool &result, fastdds::rtps::VendorId_t origin_vendor_id = c_VendorId_Unknown) override

Process an incoming ACKNACK submessage.

Parameters:
  • writer_guid[in] GUID of the writer the submessage is directed to.

  • reader_guid[in] GUID of the reader originating the submessage.

  • ack_count[in] Count field of the submessage.

  • sn_set[in] Sequence number bitmap field of the submessage.

  • final_flag[in] Final flag field of the submessage.

  • result[out] true if the writer could process the submessage. Only valid when returned value is true.

  • origin_vendor_id[in] VendorId of the source participant from which the message was received

Returns:

true when the submessage was destinated to this writer, false otherwise.

virtual bool process_nack_frag(const GUID_t &writer_guid, const GUID_t &reader_guid, uint32_t ack_count, const SequenceNumber_t &seq_num, const FragmentNumberSet_t fragments_state, bool &result, fastdds::rtps::VendorId_t origin_vendor_id = c_VendorId_Unknown) override

Process an incoming NACKFRAG submessage.

Parameters:
  • writer_guid[in] GUID of the writer the submessage is directed to.

  • reader_guid[in] GUID of the reader originating the submessage.

  • ack_count[in] Count field of the submessage.

  • seq_num[in] Sequence number field of the submessage.

  • fragments_state[in] Sequence number field of the submessage.

  • result[out] true if the writer could process the submessage. Only valid when returned value is true.

  • origin_vendor_id[in] VendorId of the source participant from which the message was received

Returns:

true when the submessage was destinated to this writer, false otherwise.

virtual void reader_data_filter(fastdds::rtps::IReaderDataFilter *filter) final

Set a content filter to perform content filtering on this writer.

This method sets a content filter that will be used to check whether a cache change is relevant for a reader or not.

Parameters:

filter – The content filter to use on this writer. May be nullptr to remove the content filter (i.e. treat all samples as relevant).

virtual const fastdds::rtps::IReaderDataFilter *reader_data_filter() const final

Get the content filter used to perform content filtering on this writer.

Returns:

The content filter used on this writer.

virtual DeliveryRetCode deliver_sample_nts(CacheChange_t *cache_change, RTPSMessageGroup &group, LocatorSelectorSender &locator_selector, const std::chrono::time_point<std::chrono::steady_clock> &max_blocking_time) override

Tells writer the sample can be sent to the network. This function should be used by a fastdds::rtps::FlowController.

Note

Must be non-thread safe.

Parameters:
  • cache_change – Pointer to the CacheChange_t that represents the sample which can be sent.

  • groupRTPSMessageGroup reference uses for generating the RTPS message.

  • locator_selectorRTPSMessageSenderInterface reference uses for selecting locators. The reference has to be a member of this RTPSWriter object.

  • max_blocking_time – Future timepoint where blocking send should end.

Returns:

Return code.

inline virtual LocatorSelectorSender &get_general_locator_selector() override
inline virtual LocatorSelectorSender &get_async_locator_selector() override

Protected Functions

StatefulWriter(RTPSParticipantImpl *impl, const GUID_t &guid, const WriterAttributes &att, fastdds::rtps::FlowController *flow_controller, WriterHistory *hist, WriterListener *listen = nullptr)

Constructor.

StatefulWriter(RTPSParticipantImpl *impl, const GUID_t &guid, const WriterAttributes &att, const std::shared_ptr<IPayloadPool> &payload_pool, fastdds::rtps::FlowController *flow_controller, WriterHistory *hist, WriterListener *listen = nullptr)
StatefulWriter(RTPSParticipantImpl *impl, const GUID_t &guid, const WriterAttributes &att, const std::shared_ptr<IPayloadPool> &payload_pool, const std::shared_ptr<IChangePool> &change_pool, fastdds::rtps::FlowController *flow_controller, WriterHistory *hist, WriterListener *listen = nullptr)
void rebuild_status_after_load()
virtual void print_inconsistent_acknack(const GUID_t &writer_guid, const GUID_t &reader_guid, const SequenceNumber_t &min_requested_sequence_number, const SequenceNumber_t &max_requested_sequence_number, const SequenceNumber_t &next_sequence_number)