Class ReaderProxy

Class Documentation

class ReaderProxy

ReaderProxy class that helps to keep the state of a specific Reader with respect to the RTPSWriter.

Public Functions

~ReaderProxy()
ReaderProxy(const WriterTimes &times, const RemoteLocatorsAllocationAttributes &loc_alloc, StatefulWriter *writer)

Constructor.

Parameters:
void start(const ReaderProxyData &reader_attributes, bool is_datasharing = false)

Activate this proxy associating it to a remote reader.

Parameters:
  • reader_attributesReaderProxyData of the reader for which to keep state.

  • is_datasharing – whether the reader is datasharing compatible with the writer or not.

bool update(const ReaderProxyData &reader_attributes)

Update information about the remote reader.

Parameters:

reader_attributesReaderProxyData with updated information of the reader.

Returns:

true if data was modified, false otherwise.

void stop()

Disable this proxy.

void add_change(const ChangeForReader_t &change, bool is_relevant, bool restart_nack_supression)

Called when a change is added to the writer’s history.

Parameters:
  • change – Information regarding the change added.

  • is_relevant – Specify if change is relevant for this remote reader.

  • restart_nack_supression – Whether nack-supression event should be restarted.

void add_change(const ChangeForReader_t &change, bool is_relevant, bool restart_nack_supression, const std::chrono::time_point<std::chrono::steady_clock> &max_blocking_time)
bool has_changes() const

Check if there are changes pending for this reader.

Returns:

true when there are pending changes, false otherwise.

bool change_is_acked(const SequenceNumber_t &seq_num) const

Check if a specific change has been already acknowledged for this reader.

Parameters:

seq_num – Sequence number of the change to be checked.

Returns:

true when the change is irrelevant or has been already acknowledged, false otherwise.

bool change_is_unsent(const SequenceNumber_t &seq_num, FragmentNumber_t &next_unsent_frag, SequenceNumber_t &gap_seq, const SequenceNumber_t &min_seq, bool &need_reactivate_periodic_heartbeat) const

Check if a specific change is marked to be sent to this reader.

Parameters:
  • seq_num[in] Sequence number of the change to be checked.

  • next_unsent_frag[out] Return next fragment to be sent.

  • gap_seq[out] Return, when it is its first delivery (should be relevant seq_num), the sequence number of the first sequence of the gap [first, seq_num). Otherwise return SequenceNumber_t::unknown().

  • min_seq[in] Minimum sequence number managed by the History. It could be SequenceNumber_t::unknown() if history is empty.

  • need_reactivate_periodic_heartbeat[out] Indicates if the heartbeat period event has to be restarted.

Returns:

true if the change is marked to be sent. False otherwise.

void acked_changes_set(const SequenceNumber_t &seq_num)

Mark all changes up to the one indicated by seq_num as Acknowledged. For instance, when seq_num is 30, changes 1-29 are marked as acknowledged.

Parameters:

seq_num – Sequence number of the first change not to be marked as acknowledged.

bool requested_changes_set(const SequenceNumberSet_t &seq_num_set, RTPSGapBuilder &gap_builder, const SequenceNumber_t &min_seq_in_history)

Mark all changes in the vector as requested.

Parameters:
  • seq_num_set – Bitmap of sequence numbers.

  • gap_builder – RTPSGapBuilder reference uses for adding each requested change that is irrelevant for the requester.

  • min_seq_in_history[in] Minimum SequenceNumber_t in the writer’s history. If writer’s history is empty, SequenceNumber_t::unknown() is expected.

Returns:

true if at least one change has been marked as REQUESTED, false otherwise.

bool process_initial_acknack(const std::function<void(ChangeForReader_t &change)> &func)

Performs processing of preemptive acknack

Parameters:

func – functor called, if the requester is a local reader, for each changes moved to UNSENT status.

Returns:

true if a heartbeat should be sent, false otherwise.

void from_unsent_to_status(const SequenceNumber_t &seq_num, ChangeForReaderStatus_t status, bool restart_nack_supression, bool delivered = true)

Sets a change to a particular status (if present in the ReaderProxy)

Parameters:
  • seq_num – Sequence number of the change to update.

  • status – Status to apply.

  • restart_nack_supression – Whether nack supression event should be restarted or not.

  • delivered – true if change was able to be delivered to its addressees. false otherwise.

bool mark_fragment_as_sent_for_change(const SequenceNumber_t &seq_num, FragmentNumber_t frag_num, bool &was_last_fragment)

Mark a particular fragment as sent.

Parameters:
  • seq_num[in] Sequence number of the change to update.

  • frag_num[in] Fragment number to mark as sent.

  • was_last_fragment[out] Indicates if the fragment was the last one pending.

Returns:

true when the change was found, false otherwise.

bool perform_nack_supression()

Turns all UNDERWAY changes into UNACKNOWLEDGED.

Returns:

true if at least one change changed its status, false otherwise.

uint32_t perform_acknack_response(const std::function<void(ChangeForReader_t &change)> &func)

Turns all REQUESTED changes into UNSENT.

Parameters:

func – Function executed for each change which changes its status.

Returns:

the number of changes that changed its status.

void change_has_been_removed(const SequenceNumber_t &seq_num)

Call this to inform a change was removed from history.

Parameters:

seq_num – Sequence number of the removed change.

bool has_unacknowledged(const SequenceNumber_t &first_seq_in_history) const

Returns there is some UNACKNOWLEDGED change.

Parameters:

first_seq_in_history – Minimum sequence number in the writer history.

Returns:

There is some UNACKNOWLEDGED change.

inline const GUID_t &guid() const

Get the GUID of the reader represented by this proxy.

Returns:

the GUID of the reader represented by this proxy.

inline DurabilityKind_t durability_kind() const

Get the durability of the reader represented by this proxy.

Returns:

the durability of the reader represented by this proxy.

inline bool expects_inline_qos() const

Check if the reader represented by this proxy expexts inline QOS to be received.

Returns:

true if the reader represented by this proxy expexts inline QOS to be received.

inline bool is_reliable() const

Check if the reader represented by this proxy is reliable.

Returns:

true if the reader represented by this proxy is reliable.

inline bool disable_positive_acks() const
inline bool is_remote_and_reliable() const

Check if the reader represented by this proxy is remote and reliable.

Returns:

true if the reader represented by this proxy is remote and reliable.

inline bool is_local_reader()

Check if the reader is on the same process.

Returns:

true if the reader is no the same process.

inline RTPSReader *local_reader()

Get the local reader on the same process (if any).

Returns:

The local reader on the same process.

inline bool check_and_set_acknack_count(uint32_t acknack_count)

Called when an ACKNACK is received to set a new value for the minimum count accepted for following received ACKNACKs.

Parameters:

acknack_count – The count of the received ACKNACK.

Returns:

true if internal count changed (i.e. received ACKNACK is accepted)

bool process_nack_frag(const GUID_t &reader_guid, uint32_t nack_count, const SequenceNumber_t &seq_num, const FragmentNumberSet_t &fragments_state)

Process an incoming NACKFRAG submessage.

Parameters:
  • reader_guid – Destination guid of the submessage.

  • nack_count – Counter field of the submessage.

  • seq_num – Sequence number field of the submessage.

  • fragments_state – Bitmap indicating the requested fragments.

Returns:

true if a change was modified, false otherwise.

bool rtps_is_relevant(CacheChange_t *change) const

Filter a CacheChange_t using the StatefulWriter’s IReaderDataFilter.

Parameters:

change

Returns:

true if the change is relevant, false otherwise.

inline SequenceNumber_t changes_low_mark() const

Get the highest fully acknowledged sequence number.

Returns:

the highest fully acknowledged sequence number.

void update_nack_supression_interval(const Duration_t &interval)

Change the interval of nack-supression event.

Parameters:

interval – Time from data sending to acknack processing.

inline LocatorSelectorEntry *general_locator_selector_entry()
inline LocatorSelectorEntry *async_locator_selector_entry()
inline RTPSMessageSenderInterface *message_sender()
inline bool is_datasharing_reader() const
inline IDataSharingNotifier *datasharing_notifier()
inline const IDataSharingNotifier *datasharing_notifier() const
inline void datasharing_notify()
inline size_t locators_size() const
inline bool active() const
inline void active(bool active)
bool has_been_delivered(const SequenceNumber_t &seq_number, bool &found) const

Check if the sequence number given has been delivered at least once to the transport layer.

Parameters:
  • seq_number – Sequence number of the change to check.

  • found – The sequence number has been found in the list of changes pending to be sent/ack. This flag allows to differentiate the case when the change is not found from the one that is found but it has not been delivered yet.

Returns:

true if the change has been delivered.

Returns:

false otherwise.