Program Listing for File RTPSReader.h
↰ Return to documentation for file (/tmp/ws/src/fastrtps/include/fastdds/rtps/reader/RTPSReader.h
)
// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef _FASTDDS_RTPS_READER_RTPSREADER_H_
#define _FASTDDS_RTPS_READER_RTPSREADER_H_
#include <functional>
#include <fastdds/rtps/Endpoint.h>
#include <fastdds/rtps/attributes/ReaderAttributes.h>
#include <fastdds/rtps/builtin/data/WriterProxyData.h>
#include <fastdds/rtps/common/SequenceNumber.h>
#include <fastdds/rtps/common/Time_t.h>
#include <fastdds/rtps/history/ReaderHistory.h>
#include <fastdds/rtps/interfaces/IReaderDataFilter.hpp>
#include <fastrtps/qos/LivelinessChangedStatus.h>
#include <fastrtps/utils/TimedConditionVariable.hpp>
#include <fastdds/statistics/rtps/StatisticsCommon.hpp>
namespace eprosima {
namespace fastrtps {
namespace rtps {
// Forward declarations
class LivelinessManager;
class ReaderListener;
class WriterProxy;
struct CacheChange_t;
struct ReaderHistoryState;
class WriterProxyData;
class IDataSharingListener;
class RTPSReader
: public Endpoint
, public fastdds::statistics::StatisticsReaderImpl
{
friend class ReaderHistory;
friend class RTPSParticipantImpl;
friend class MessageReceiver;
friend class EDP;
friend class WLP;
protected:
RTPSReader(
RTPSParticipantImpl* pimpl,
const GUID_t& guid,
const ReaderAttributes& att,
ReaderHistory* hist,
ReaderListener* listen = nullptr);
RTPSReader(
RTPSParticipantImpl* pimpl,
const GUID_t& guid,
const ReaderAttributes& att,
const std::shared_ptr<IPayloadPool>& payload_pool,
ReaderHistory* hist,
ReaderListener* listen = nullptr);
RTPSReader(
RTPSParticipantImpl* pimpl,
const GUID_t& guid,
const ReaderAttributes& att,
const std::shared_ptr<IPayloadPool>& payload_pool,
const std::shared_ptr<IChangePool>& change_pool,
ReaderHistory* hist,
ReaderListener* listen = nullptr);
virtual ~RTPSReader();
public:
RTPS_DllAPI virtual bool matched_writer_add(
const WriterProxyData& wdata) = 0;
RTPS_DllAPI virtual bool matched_writer_remove(
const GUID_t& writer_guid,
bool removed_by_lease = false) = 0;
RTPS_DllAPI virtual bool matched_writer_is_matched(
const GUID_t& writer_guid) = 0;
RTPS_DllAPI virtual bool processDataMsg(
CacheChange_t* change) = 0;
RTPS_DllAPI virtual bool processDataFragMsg(
CacheChange_t* change,
uint32_t sampleSize,
uint32_t fragmentStartingNum,
uint16_t fragmentsInSubmessage) = 0;
RTPS_DllAPI virtual bool processHeartbeatMsg(
const GUID_t& writerGUID,
uint32_t hbCount,
const SequenceNumber_t& firstSN,
const SequenceNumber_t& lastSN,
bool finalFlag,
bool livelinessFlag) = 0;
RTPS_DllAPI virtual bool processGapMsg(
const GUID_t& writerGUID,
const SequenceNumber_t& gapStart,
const SequenceNumberSet_t& gapList) = 0;
RTPS_DllAPI virtual bool change_removed_by_history(
CacheChange_t* change,
WriterProxy* prox = nullptr) = 0;
RTPS_DllAPI ReaderListener* getListener() const;
RTPS_DllAPI bool setListener(
ReaderListener* target);
RTPS_DllAPI bool reserveCache(
CacheChange_t** change,
uint32_t dataCdrSerializedSize);
RTPS_DllAPI void releaseCache(
CacheChange_t* change);
RTPS_DllAPI virtual bool nextUnreadCache(
CacheChange_t** change,
WriterProxy** wp) = 0;
RTPS_DllAPI virtual bool nextUntakenCache(
CacheChange_t** change,
WriterProxy** wp) = 0;
RTPS_DllAPI bool wait_for_unread_cache(
const eprosima::fastrtps::Duration_t& timeout);
RTPS_DllAPI uint64_t get_unread_count() const;
RTPS_DllAPI uint64_t get_unread_count(
bool mark_as_read);
RTPS_DllAPI inline bool expectsInlineQos()
{
return m_expectsInlineQos;
}
RTPS_DllAPI inline ReaderHistory* getHistory()
{
return mp_history;
}
RTPS_DllAPI eprosima::fastdds::rtps::IReaderDataFilter* get_content_filter() const
{
std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
return data_filter_;
}
RTPS_DllAPI void set_content_filter(
eprosima::fastdds::rtps::IReaderDataFilter* filter)
{
std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
data_filter_ = filter;
}
virtual bool isInCleanState() = 0;
LivelinessChangedStatus liveliness_changed_status_;
inline void enableMessagesFromUnkownWriters(
bool enable)
{
m_acceptMessagesFromUnkownWriters = enable;
}
void setTrustedWriter(
const EntityId_t& writer)
{
m_acceptMessagesFromUnkownWriters = false;
m_trustedWriterEntityId = writer;
}
virtual void assert_writer_liveliness(
const GUID_t& writer) = 0;
virtual bool begin_sample_access_nts(
CacheChange_t* change,
WriterProxy*& wp,
bool& is_future_change) = 0;
virtual void end_sample_access_nts(
CacheChange_t* change,
WriterProxy*& wp,
bool mark_as_read) = 0;
virtual void change_read_by_user(
CacheChange_t* change,
WriterProxy* writer,
bool mark_as_read = true) = 0;
RTPS_DllAPI bool is_sample_valid(
const void* data,
const GUID_t& writer,
const SequenceNumber_t& sn) const;
const std::unique_ptr<IDataSharingListener>& datasharing_listener() const
{
return datasharing_listener_;
}
#ifdef FASTDDS_STATISTICS
RTPS_DllAPI bool add_statistics_listener(
std::shared_ptr<fastdds::statistics::IListener> listener);
RTPS_DllAPI bool remove_statistics_listener(
std::shared_ptr<fastdds::statistics::IListener> listener);
RTPS_DllAPI void set_enabled_statistics_writers_mask(
uint32_t enabled_writers);
#endif // FASTDDS_STATISTICS
protected:
virtual bool may_remove_history_record(
bool removed_by_lease);
void add_persistence_guid(
const GUID_t& guid,
const GUID_t& persistence_guid);
void remove_persistence_guid(
const GUID_t& guid,
const GUID_t& persistence_guid,
bool removed_by_lease);
SequenceNumber_t get_last_notified(
const GUID_t& guid);
SequenceNumber_t update_last_notified(
const GUID_t& guid,
const SequenceNumber_t& seq);
virtual void set_last_notified(
const GUID_t& persistence_guid,
const SequenceNumber_t& seq);
History::const_iterator findCacheInFragmentedProcess(
const SequenceNumber_t& sequence_number,
const GUID_t& writer_guid,
CacheChange_t** change,
History::const_iterator hint) const;
void create_datasharing_listener(
ResourceLimitedContainerConfig limits);
bool is_datasharing_compatible_with(
const WriterProxyData& wdata);
ReaderHistory* mp_history;
ReaderListener* mp_listener;
bool m_acceptMessagesToUnknownReaders;
bool m_acceptMessagesFromUnkownWriters;
EntityId_t m_trustedWriterEntityId;
bool m_expectsInlineQos;
ReaderHistoryState* history_state_;
uint64_t total_unread_ = 0;
TimedConditionVariable new_notification_cv_;
LivelinessQosPolicyKind liveliness_kind_;
Duration_t liveliness_lease_duration_;
bool is_datasharing_compatible_ = false;
std::unique_ptr<IDataSharingListener> datasharing_listener_;
eprosima::fastdds::rtps::IReaderDataFilter* data_filter_ = nullptr;
private:
RTPSReader& operator =(
const RTPSReader&) = delete;
void init(
const std::shared_ptr<IPayloadPool>& payload_pool,
const std::shared_ptr<IChangePool>& change_pool,
const ReaderAttributes& att);
};
} /* namespace rtps */
} /* namespace fastrtps */
} /* namespace eprosima */
#endif /* _FASTDDS_RTPS_READER_RTPSREADER_H_ */