Class StatefulReader

Inheritance Relationships

Base Type

Derived Type

Class Documentation

class StatefulReader : public eprosima::fastrtps::rtps::RTPSReader

Class StatefulReader, specialization of RTPSReader than stores the state of the matched writers.

Subclassed by eprosima::fastrtps::rtps::StatefulPersistentReader

Public Functions

virtual ~StatefulReader()
virtual bool matched_writer_add(const WriterProxyData &wdata) override

Add a matched writer represented by its attributes.

Parameters:

wdata – Attributes of the writer to add.

Returns:

True if correctly added.

virtual bool matched_writer_remove(const GUID_t &writer_guid, bool removed_by_lease = false) override

Remove a WriterProxyData from the matached writers.

Parameters:
  • writer_guid – GUID of the writer to remove.

  • removed_by_lease – true it the writer was removed due to lease duration.

Returns:

True if correct.

virtual bool matched_writer_is_matched(const GUID_t &writer_guid) override

Tells us if a specific Writer is matched against this reader.

Parameters:

writer_guid – GUID of the writer to check.

Returns:

True if it is matched.

bool matched_writer_lookup(const GUID_t &writerGUID, WriterProxy **WP)

Look for a specific WriterProxy.

Parameters:
  • writerGUIDGUID_t of the writer we are looking for.

  • WP – Pointer to pointer to a WriterProxy.

Returns:

True if found.

virtual bool processDataMsg(CacheChange_t *change) override

Processes a new DATA message.

Parameters:

change – Pointer to the CacheChange_t.

Returns:

true if the reader accepts messages.

virtual bool processDataFragMsg(CacheChange_t *change, uint32_t sampleSize, uint32_t fragmentStartingNum, uint16_t fragmentsInSubmessage) override

Processes a new DATA FRAG message.

Parameters:
  • change – Pointer to the CacheChange_t.

  • sampleSize – Size of the complete, assembled message.

  • fragmentStartingNum – Starting number of this particular message.

  • fragmentsInSubmessage – Number of fragments on this particular message.

Returns:

true if the reader accepts message.

virtual bool processHeartbeatMsg(const GUID_t &writerGUID, uint32_t hbCount, const SequenceNumber_t &firstSN, const SequenceNumber_t &lastSN, bool finalFlag, bool livelinessFlag, fastdds::rtps::VendorId_t origin_vendor_id = c_VendorId_Unknown) override

Processes a new HEARTBEAT message.

Returns:

true if the reader accepts messages.

virtual bool processGapMsg(const GUID_t &writerGUID, const SequenceNumber_t &gapStart, const SequenceNumberSet_t &gapList, fastdds::rtps::VendorId_t origin_vendor_id = c_VendorId_Unknown) override

Processes a new GAP message.

Parameters:
  • writerGUID

  • gapStart

  • gapList

  • origin_vendor_id

Returns:

true if the reader accepts messages from the.

virtual bool change_removed_by_history(CacheChange_t *change, WriterProxy *prox = nullptr) override

Method to indicate the reader that some change has been removed due to HistoryQos requirements.

Parameters:
  • change – Pointer to the CacheChange_t.

  • prox – Pointer to the WriterProxy.

Returns:

True if correctly removed.

bool change_received(CacheChange_t *a_change, WriterProxy *prox, size_t unknown_missing_changes_up_to)

This method is called when a new change is received. This method calls the received_change of the History and depending on the implementation performs different actions.

Parameters:
  • a_change – Pointer of the change to add.

  • prox – Pointer to the WriterProxy that adds the Change.

  • unknown_missing_changes_up_to – The number of changes from the same writer with a lower sequence number that could potentially be received in the future.

Returns:

True if added.

inline RTPSParticipantImpl *getRTPSParticipant() const

Get the RTPS participant

Returns:

Associated RTPS participant

ResourceEvent &getEventResource() const

Get reference to associated RTPS partiicipant’s ResourceEvent

Returns:

Reference to associated RTPS partiicipant’s ResourceEvent

virtual bool nextUnreadCache(CacheChange_t **change, WriterProxy **wpout = nullptr) override

Read the next unread CacheChange_t from the history

Parameters:
  • change – Pointer to pointer of CacheChange_t

  • wpout – Pointer to pointer the matched writer proxy

Returns:

True if read.

virtual bool nextUntakenCache(CacheChange_t **change, WriterProxy **wpout = nullptr) override

Take the next CacheChange_t from the history;

Parameters:
  • change – Pointer to pointer of CacheChange_t

  • wpout – Pointer to pointer the matched writer proxy

Returns:

True if read.

bool updateTimes(const ReaderTimes &times)

Update the times parameters of the Reader.

Parameters:

timesReaderTimes reference.

Returns:

True if correctly updated.

inline ReaderTimes &getTimes()
Returns:

Reference to the ReaderTimes.

inline size_t getMatchedWritersSize() const

Get the number of matched writers

Returns:

Number of matched writers

virtual bool isInCleanState() override

Returns there is a clean state with all Writers. It occurs when the Reader received all samples sent by Writers. In other words, its WriterProxies are up to date.

Returns:

There is a clean state with all Writers.

void send_acknack(const WriterProxy *writer, const SequenceNumberSet_t &sns, RTPSMessageSenderInterface *sender, bool is_final)

Sends an acknack message from this reader.

Parameters:
  • writer – Pointer to the info of the remote writer.

  • sns – Sequence number bitmap with the acknack information.

  • sender – Message sender interface.

  • is_final – Value for final flag.

void send_acknack(const WriterProxy *writer, RTPSMessageSenderInterface *sender, bool heartbeat_was_final)

Sends an acknack message from this reader in response to a heartbeat.

Parameters:
  • writer – Pointer to the proxy representing the writer to send the acknack to.

  • sender – Message sender interface.

  • heartbeat_was_final – Final flag of the last received heartbeat.

bool send_sync_nts(CDRMessage_t *message, const Locators &locators_begin, const Locators &locators_end, std::chrono::steady_clock::time_point &max_blocking_time_point)

Use the participant of this reader to send a message to certain locator.

Parameters:
  • message – Message to be sent.

  • locators_begin – Destination locators iterator begin.

  • locators_end – Destination locators iterator end.

  • max_blocking_time_point – Future time point where any blocking should end.

virtual void assert_writer_liveliness(const GUID_t &writer) override

Assert the livelines of a matched writer.

Parameters:

writer – GUID of the writer to assert.

virtual bool begin_sample_access_nts(CacheChange_t *change, WriterProxy *&wp, bool &is_future_change) override

Called just before a change is going to be deserialized.

Parameters:
  • change[in] Pointer to the change being accessed.

  • wp[out] Writer proxy the change belongs to.

  • is_future_change[out] Whether the change is in the future (i.e. there are earlier unreceived changes from the same writer).

Returns:

Whether the change is still valid or not.

virtual void end_sample_access_nts(CacheChange_t *change, WriterProxy *&wp, bool mark_as_read) override

Called after the change has been deserialized.

Parameters:
  • change[in] Pointer to the change being accessed.

  • wp[in] Writer proxy the change belongs to.

  • mark_as_read[in] Whether the change should be marked as read or not.

virtual void change_read_by_user(CacheChange_t *change, WriterProxy *writer, bool mark_as_read = true) override

Called when the user has retrieved a change from the history.

Parameters:
  • change – Pointer to the change to ACK

  • writer – Writer proxy of the change.

  • mark_as_read – Whether the change should be marked as read or not

Protected Functions

StatefulReader(RTPSParticipantImpl *pimpl, const GUID_t &guid, const ReaderAttributes &att, ReaderHistory *hist, ReaderListener *listen = nullptr)
StatefulReader(RTPSParticipantImpl *pimpl, const GUID_t &guid, const ReaderAttributes &att, const std::shared_ptr<IPayloadPool> &payload_pool, ReaderHistory *hist, ReaderListener *listen = nullptr)
StatefulReader(RTPSParticipantImpl *pimpl, const GUID_t &guid, const ReaderAttributes &att, const std::shared_ptr<IPayloadPool> &payload_pool, const std::shared_ptr<IChangePool> &change_pool, ReaderHistory *hist, ReaderListener *listen = nullptr)

Friends

friend class RTPSParticipantImpl