Class StatefulWriter
Defined in File StatefulWriter.h
Inheritance Relationships
Base Type
public eprosima::fastrtps::rtps::RTPSWriter
(Class RTPSWriter)
Derived Type
public eprosima::fastrtps::rtps::StatefulPersistentWriter
(Class StatefulPersistentWriter)
Class Documentation
-
class StatefulWriter : public eprosima::fastrtps::rtps::RTPSWriter
Class StatefulWriter, specialization of RTPSWriter that maintains information of each matched Reader.
Subclassed by eprosima::fastrtps::rtps::StatefulPersistentWriter
Public Functions
-
virtual ~StatefulWriter()
Destructor.
-
virtual void unsent_change_added_to_history(CacheChange_t *p, const std::chrono::time_point<std::chrono::steady_clock> &max_blocking_time) override
Add a specific change to all ReaderLocators.
- Parameters
p – Pointer to the change.
max_blocking_time –
-
virtual bool change_removed_by_history(CacheChange_t *a_change) override
Indicate the writer that a change has been removed by the history due to some HistoryQos requirement.
- Parameters
a_change – Pointer to the change that is going to be removed.
- Returns
True if removed correctly.
-
bool intraprocess_delivery(CacheChange_t *change, ReaderProxy *reader_proxy)
Sends a change directly to a intraprocess reader.
-
inline bool intraprocess_gap(ReaderProxy *reader_proxy, const SequenceNumber_t &seq_num)
-
bool intraprocess_gap(ReaderProxy *reader_proxy, const SequenceNumber_t &first_seq, const SequenceNumber_t &last_seq)
-
bool intraprocess_heartbeat(ReaderProxy *reader_proxy, bool liveliness = false)
-
inline void incrementHBCount()
Increment the HB count.
-
virtual bool matched_reader_add(const ReaderProxyData &data) override
Add a matched reader.
- Parameters
data – Pointer to the ReaderProxyData object added.
- Returns
True if added.
-
virtual bool matched_reader_remove(const GUID_t &reader_guid) override
Remove a matched reader.
- Parameters
reader_guid – GUID of the reader to remove.
- Returns
True if removed.
-
virtual bool matched_reader_is_matched(const GUID_t &reader_guid) override
Tells us if a specific Reader is matched against this writer
- Parameters
reader_guid – GUID of the reader to check.
- Returns
True if it was matched.
-
virtual bool is_acked_by_all(const CacheChange_t *a_change) const override
Check if a specific change has been acknowledged by all Readers. Is only useful in reliable Writer. In BE Writers returns false when pending to be sent.
- Returns
True if acknowledged by all.
-
virtual bool wait_for_all_acked(const Duration_t &max_wait) override
Waits until all changes were acknowledged or max_wait.
- Returns
True if all were acknowledged.
-
bool all_readers_updated()
-
virtual bool try_remove_change(const std::chrono::steady_clock::time_point &max_blocking_time_point, std::unique_lock<RecursiveTimedMutex> &lock) override
Remove the change with the minimum SequenceNumber
- Returns
True if removed.
-
virtual bool wait_for_acknowledgement(const SequenceNumber_t &seq, const std::chrono::steady_clock::time_point &max_blocking_time_point, std::unique_lock<RecursiveTimedMutex> &lock) override
Waits till a change has been acknowledged.
- Parameters
seq – Sequence number to wait for acknowledgement.
max_blocking_time_point – Maximum time to wait for.
lock – Lock of the Change list.
- Returns
true when change was acknowledged, false when timeout is reached.
-
virtual void updateAttributes(const WriterAttributes &att) override
Update the Attributes of the Writer.
- Parameters
att – New attributes
-
bool matched_reader_lookup(GUID_t &readerGuid, ReaderProxy **RP)
Find a Reader Proxy in this writer.
- Parameters
readerGuid – [in] The GUID_t of the reader.
RP – [out] Pointer to pointer to return the ReaderProxy.
- Returns
True if correct.
-
inline RTPSParticipantImpl *getRTPSParticipant() const
Get the RTPS participant
- Returns
RTPS participant
-
inline size_t getMatchedReadersSize() const
Get the number of matched readers
- Returns
Number of the matched readers
-
inline bool get_disable_positive_acks() const
Returns true if disable positive ACKs QoS is enabled.
- Returns
True if positive acks are disabled, false otherwise
-
void updateTimes(const WriterTimes ×)
Update the WriterTimes attributes of all associated ReaderProxy.
- Parameters
times – WriterTimes parameter.
-
SequenceNumber_t next_sequence_number() const
-
bool send_periodic_heartbeat(bool final = false, bool liveliness = false)
Sends a periodic heartbeat.
- Parameters
final – Final flag
liveliness – Liveliness flag
- Returns
True on success
-
void send_heartbeat_to_nts(ReaderProxy &remoteReaderProxy, bool liveliness = false, bool force = false)
Sends a heartbeat to a remote reader.
Remark
This function is non thread-safe.
-
void perform_nack_response()
-
virtual bool process_acknack(const GUID_t &writer_guid, const GUID_t &reader_guid, uint32_t ack_count, const SequenceNumberSet_t &sn_set, bool final_flag, bool &result) override
Process an incoming ACKNACK submessage.
- Parameters
writer_guid – [in] GUID of the writer the submessage is directed to.
reader_guid – [in] GUID of the reader originating the submessage.
ack_count – [in] Count field of the submessage.
sn_set – [in] Sequence number bitmap field of the submessage.
final_flag – [in] Final flag field of the submessage.
result – [out] true if the writer could process the submessage. Only valid when returned value is true.
- Returns
true when the submessage was destinated to this writer, false otherwise.
-
virtual bool process_nack_frag(const GUID_t &writer_guid, const GUID_t &reader_guid, uint32_t ack_count, const SequenceNumber_t &seq_num, const FragmentNumberSet_t fragments_state, bool &result) override
Process an incoming NACKFRAG submessage.
- Parameters
writer_guid – [in] GUID of the writer the submessage is directed to.
reader_guid – [in] GUID of the reader originating the submessage.
ack_count – [in] Count field of the submessage.
seq_num – [in] Sequence number field of the submessage.
fragments_state – [in] Sequence number field of the submessage.
result – [out] true if the writer could process the submessage. Only valid when returned value is true.
- Returns
true when the submessage was destinated to this writer, false otherwise.
-
virtual void reader_data_filter(fastdds::rtps::IReaderDataFilter *filter) final
Set a content filter to perform content filtering on this writer.
This method sets a content filter that will be used to check whether a cache change is relevant for a reader or not.
- Parameters
filter – The content filter to use on this writer. May be
nullptr
to remove the content filter (i.e. treat all samples as relevant).
-
virtual const fastdds::rtps::IReaderDataFilter *reader_data_filter() const final
Get the content filter used to perform content filtering on this writer.
- Returns
The content filter used on this writer.
-
virtual DeliveryRetCode deliver_sample_nts(CacheChange_t *cache_change, RTPSMessageGroup &group, LocatorSelectorSender &locator_selector, const std::chrono::time_point<std::chrono::steady_clock> &max_blocking_time) override
Tells writer the sample can be sent to the network. This function should be used by a fastdds::rtps::FlowController.
Note
Must be non-thread safe.
- Parameters
cache_change – Pointer to the CacheChange_t that represents the sample which can be sent.
group – RTPSMessageGroup reference uses for generating the RTPS message.
locator_selector – RTPSMessageSenderInterface reference uses for selecting locators. The reference has to be a member of this RTPSWriter object.
max_blocking_time – Future timepoint where blocking send should end.
- Returns
Return code.
-
inline virtual LocatorSelectorSender &get_general_locator_selector() override
-
inline virtual LocatorSelectorSender &get_async_locator_selector() override
Protected Functions
-
StatefulWriter(RTPSParticipantImpl *impl, const GUID_t &guid, const WriterAttributes &att, fastdds::rtps::FlowController *flow_controller, WriterHistory *hist, WriterListener *listen = nullptr)
Constructor.
-
void rebuild_status_after_load()
-
virtual void print_inconsistent_acknack(const GUID_t &writer_guid, const GUID_t &reader_guid, const SequenceNumber_t &min_requested_sequence_number, const SequenceNumber_t &max_requested_sequence_number, const SequenceNumber_t &next_sequence_number)
-
virtual ~StatefulWriter()