Class RTPSReader

Inheritance Relationships

Base Types

Derived Types

Class Documentation

class RTPSReader : public eprosima::fastrtps::rtps::Endpoint, public eprosima::fastdds::statistics::StatisticsReaderImpl

Class RTPSReader, manages the reception of data from its matched writers.

Subclassed by eprosima::fastrtps::rtps::StatefulReader, eprosima::fastrtps::rtps::StatelessReader

Public Functions

virtual RTPS_DllAPI bool matched_writer_add (const WriterProxyData &wdata)=0

Add a matched writer represented by its attributes.

Parameters:

wdata – Attributes of the writer to add.

Returns:

True if correctly added.

virtual RTPS_DllAPI bool matched_writer_remove (const GUID_t &writer_guid, bool removed_by_lease=false)=0

Remove a writer represented by its attributes from the matched writers.

Parameters:
  • writer_guid – GUID of the writer to remove.

  • removed_by_lease – Whether the writer is being unmatched due to a participant drop.

Returns:

True if correctly removed.

virtual RTPS_DllAPI bool matched_writer_is_matched (const GUID_t &writer_guid)=0

Tells us if a specific Writer is matched against this reader.

Parameters:

writer_guid – GUID of the writer to check.

Returns:

True if it is matched.

virtual RTPS_DllAPI bool processDataMsg (CacheChange_t *change)=0

Processes a new DATA message. Previously the message must have been accepted by function acceptMsgDirectedTo.

Parameters:

change – Pointer to the CacheChange_t.

Returns:

true if the reader accepts messages from the.

virtual RTPS_DllAPI bool processDataFragMsg (CacheChange_t *change, uint32_t sampleSize, uint32_t fragmentStartingNum, uint16_t fragmentsInSubmessage)=0

Processes a new DATA FRAG message.

Parameters:
  • change – Pointer to the CacheChange_t.

  • sampleSize – Size of the complete, assembled message.

  • fragmentStartingNum – Starting number of this particular message.

  • fragmentsInSubmessage – Number of fragments on this particular message.

Returns:

true if the reader accepts message.

virtual RTPS_DllAPI bool processHeartbeatMsg (const GUID_t &writerGUID, uint32_t hbCount, const SequenceNumber_t &firstSN, const SequenceNumber_t &lastSN, bool finalFlag, bool livelinessFlag, fastdds::rtps::VendorId_t origin_vendor_id=c_VendorId_Unknown)=0

Processes a new HEARTBEAT message.

Parameters:
  • writerGUID

  • hbCount

  • firstSN

  • lastSN

  • finalFlag

  • livelinessFlag

  • origin_vendor_id

Returns:

true if the reader accepts messages from the.

virtual RTPS_DllAPI bool processGapMsg (const GUID_t &writerGUID, const SequenceNumber_t &gapStart, const SequenceNumberSet_t &gapList, fastdds::rtps::VendorId_t origin_vendor_id=c_VendorId_Unknown)=0

Processes a new GAP message.

Parameters:
  • writerGUID

  • gapStart

  • gapList

  • origin_vendor_id

Returns:

true if the reader accepts messages from the.

virtual RTPS_DllAPI bool change_removed_by_history (CacheChange_t *change, WriterProxy *prox=nullptr)=0

Method to indicate the reader that some change has been removed due to HistoryQos requirements.

Parameters:
  • change – Pointer to the CacheChange_t.

  • prox – Pointer to the WriterProxy.

Returns:

True if correctly removed.

RTPS_DllAPI ReaderListener * getListener () const

Get the associated listener, secondary attached Listener in case it is of compound type

Returns:

Pointer to the associated reader listener.

RTPS_DllAPI bool setListener (ReaderListener *target)

Switch the ReaderListener kind for the Reader. If the RTPSReader does not belong to the built-in protocols it switches out the old one. If it belongs to the built-in protocols, it sets the new ReaderListener callbacks to be called after the built-in ReaderListener ones.

Parameters:

target – Pointed to ReaderLister to attach

Returns:

True is correctly set.

RTPS_DllAPI bool reserveCache (CacheChange_t **change, uint32_t dataCdrSerializedSize)

Reserve a CacheChange_t.

Parameters:
  • change – Pointer to pointer to the Cache.

  • dataCdrSerializedSize – Size of the Cache.

Returns:

True if correctly reserved.

RTPS_DllAPI void releaseCache (CacheChange_t *change)

Release a cacheChange.

virtual RTPS_DllAPI bool nextUnreadCache (CacheChange_t **change, WriterProxy **wp)=0

Read the next unread CacheChange_t from the history

Parameters:
  • change – Pointer to pointer of CacheChange_t

  • wp – Pointer to pointer to the WriterProxy

Returns:

True if read.

virtual RTPS_DllAPI bool nextUntakenCache (CacheChange_t **change, WriterProxy **wp)=0

Get the next CacheChange_t from the history to take.

Parameters:
  • change – Pointer to pointer of CacheChange_t.

  • wp – Pointer to pointer to the WriterProxy.

Returns:

True if read.

RTPS_DllAPI bool wait_for_unread_cache (const eprosima::fastrtps::Duration_t &timeout)
RTPS_DllAPI uint64_t get_unread_count () const
RTPS_DllAPI uint64_t get_unread_count (bool mark_as_read)
inline RTPS_DllAPI bool expectsInlineQos ()
Returns:

True if the reader expects Inline QOS.

inline RTPS_DllAPI ReaderHistory * getHistory ()

Returns a pointer to the associated History.

inline RTPS_DllAPI eprosima::fastdds::rtps::IReaderDataFilter * get_content_filter () const
Returns:

The content filter associated to this reader.

inline RTPS_DllAPI void set_content_filter (eprosima::fastdds::rtps::IReaderDataFilter *filter)

Set the content filter associated to this reader.

Parameters:

filter – Pointer to the content filter to associate to this reader.

virtual bool isInCleanState() = 0

Returns there is a clean state with all Writers. It occurs when the Reader received all samples sent by Writers. In other words, its WriterProxies are up to date.

Returns:

There is a clean state with all Writers.

inline void enableMessagesFromUnkownWriters(bool enable)
inline void setTrustedWriter(const EntityId_t &writer)
virtual void assert_writer_liveliness(const GUID_t &writer) = 0

Assert the liveliness of a matched writer.

Parameters:

writer – GUID of the writer to assert.

virtual bool begin_sample_access_nts(CacheChange_t *change, WriterProxy *&wp, bool &is_future_change) = 0

Called just before a change is going to be deserialized.

Parameters:
  • change[in] Pointer to the change being accessed.

  • wp[out] Writer proxy the change belongs to.

  • is_future_change[out] Whether the change is in the future (i.e. there are earlier unreceived changes from the same writer).

Returns:

Whether the change is still valid or not.

virtual void end_sample_access_nts(CacheChange_t *change, WriterProxy *&wp, bool mark_as_read) = 0

Called after the change has been deserialized.

Parameters:
  • change[in] Pointer to the change being accessed.

  • wp[in] Writer proxy the change belongs to.

  • mark_as_read[in] Whether the change should be marked as read or not.

virtual void change_read_by_user(CacheChange_t *change, WriterProxy *writer, bool mark_as_read = true) = 0

Called when the user has retrieved a change from the history.

Parameters:
  • change – Pointer to the change to ACK

  • writer – Writer proxy of the change.

  • mark_as_read – Whether the change should be marked as read or not

RTPS_DllAPI bool is_sample_valid (const void *data, const GUID_t &writer, const SequenceNumber_t &sn) const

Checks whether the sample is still valid or is corrupted.

Parameters:
  • data – Pointer to the sample data to check. If it does not belong to the payload pool passed to the reader on construction, it yields undefined behavior.

  • writer – GUID of the writer that sent data.

  • sn – Sequence number related to data.

Returns:

true if the sample is valid

inline const std::unique_ptr<IDataSharingListener> &datasharing_listener() const

Public Members

LivelinessChangedStatus liveliness_changed_status_

The liveliness changed status struct as defined in the DDS.

Protected Functions

RTPSReader(RTPSParticipantImpl *pimpl, const GUID_t &guid, const ReaderAttributes &att, ReaderHistory *hist, ReaderListener *listen = nullptr)
RTPSReader(RTPSParticipantImpl *pimpl, const GUID_t &guid, const ReaderAttributes &att, const std::shared_ptr<IPayloadPool> &payload_pool, ReaderHistory *hist, ReaderListener *listen = nullptr)
RTPSReader(RTPSParticipantImpl *pimpl, const GUID_t &guid, const ReaderAttributes &att, const std::shared_ptr<IPayloadPool> &payload_pool, const std::shared_ptr<IChangePool> &change_pool, ReaderHistory *hist, ReaderListener *listen = nullptr)
virtual ~RTPSReader()
virtual bool may_remove_history_record(bool removed_by_lease)
void add_persistence_guid(const GUID_t &guid, const GUID_t &persistence_guid)

Add a remote writer to the persistence_guid map.

Parameters:
  • guid – GUID of the remote writer

  • persistence_guid – Persistence GUID of the remote writer

void remove_persistence_guid(const GUID_t &guid, const GUID_t &persistence_guid, bool removed_by_lease)

Remove a remote writer from the persistence_guid map.

Parameters:
  • guid – GUID of the remote writer

  • persistence_guid – Persistence GUID of the remote writer

  • removed_by_lease – Whether the GUIDs are being removed due to a participant drop.

SequenceNumber_t get_last_notified(const GUID_t &guid)

Get the last notified sequence for a RTPS guid.

Remark

Takes persistence_guid into consideration

Parameters:

guid – The RTPS guid to query

Returns:

Last notified sequence number for input guid

SequenceNumber_t update_last_notified(const GUID_t &guid, const SequenceNumber_t &seq)

Update the last notified sequence for a RTPS guid.

Remark

Takes persistence_guid into consideration

Parameters:
  • guid – The RTPS guid of the writer

  • seq – Max sequence number available on writer

Returns:

Previous value of last notified sequence number for input guid

virtual void set_last_notified(const GUID_t &persistence_guid, const SequenceNumber_t &seq)

Set the last notified sequence for a persistence guid.

Remark

Persistent readers will write to DB

Parameters:
  • persistence_guid – The persistence guid to update

  • seq – Sequence number to set for input guid

History::const_iterator findCacheInFragmentedProcess(const SequenceNumber_t &sequence_number, const GUID_t &writer_guid, CacheChange_t **change, History::const_iterator hint) const

Search if there is a CacheChange_t, giving SequenceNumber_t and writer GUID_t, waiting to be completed because it is fragmented.

Parameters:
  • sequence_numberSequenceNumber_t of the searched CacheChange_t.

  • writer_guid – writer GUID_t of the searched CacheChange_t.

  • change – If a CacheChange_t was found, this argument will fill with its pointer. In other case nullptr is returned.

  • hint – Iterator since the search will start. Used to improve the search.

Returns:

Iterator pointing to the position were CacheChange_t was found. It can be used to improve next search.

void create_datasharing_listener(ResourceLimitedContainerConfig limits)

Creates the listener for the datasharing notifications

Parameters:

limits – Resource limits for the number of matched datasharing writers

bool is_datasharing_compatible_with(const WriterProxyData &wdata)

Protected Attributes

ReaderHistory *mp_history

ReaderHistory.

ReaderListener *mp_listener

Listener.

bool m_acceptMessagesToUnknownReaders

Accept msg to unknwon readers (default=true)

bool m_acceptMessagesFromUnkownWriters

Accept msg from unknwon writers (BE-true,RE-false)

EntityId_t m_trustedWriterEntityId

Trusted writer (for Builtin)

bool m_expectsInlineQos

Expects Inline Qos.

ReaderHistoryState *history_state_

ReaderHistoryState.

uint64_t total_unread_ = 0
TimedConditionVariable new_notification_cv_
LivelinessQosPolicyKind liveliness_kind_

The liveliness kind of this reader.

Duration_t liveliness_lease_duration_

The liveliness lease duration of this reader.

bool is_datasharing_compatible_ = false

Whether the writer is datasharing compatible or not.

std::unique_ptr<IDataSharingListener> datasharing_listener_

The listener for the datasharing notifications.

eprosima::fastdds::rtps::IReaderDataFilter *data_filter_ = nullptr