Program Listing for File RTPSReader.h

Return to documentation for file (/tmp/ws/src/fastrtps/include/fastdds/rtps/reader/RTPSReader.h)

// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef _FASTDDS_RTPS_READER_RTPSREADER_H_
#define _FASTDDS_RTPS_READER_RTPSREADER_H_

#include <functional>

#include <fastdds/rtps/Endpoint.h>
#include <fastdds/rtps/attributes/ReaderAttributes.h>
#include <fastdds/rtps/builtin/data/WriterProxyData.h>
#include <fastdds/rtps/common/SequenceNumber.h>
#include <fastdds/rtps/common/Time_t.h>
#include <fastdds/rtps/history/ReaderHistory.h>
#include <fastdds/rtps/interfaces/IReaderDataFilter.hpp>
#include <fastrtps/qos/LivelinessChangedStatus.h>
#include <fastrtps/utils/TimedConditionVariable.hpp>

#include <fastdds/statistics/rtps/StatisticsCommon.hpp>

namespace eprosima {
namespace fastrtps {
namespace rtps {

// Forward declarations
class LivelinessManager;
class ReaderListener;
class WriterProxy;
struct CacheChange_t;
struct ReaderHistoryState;
class WriterProxyData;
class IDataSharingListener;

class RTPSReader
    : public Endpoint
    , public fastdds::statistics::StatisticsReaderImpl
{
    friend class ReaderHistory;
    friend class RTPSParticipantImpl;
    friend class MessageReceiver;
    friend class EDP;
    friend class WLP;

protected:

    RTPSReader(
            RTPSParticipantImpl* pimpl,
            const GUID_t& guid,
            const ReaderAttributes& att,
            ReaderHistory* hist,
            ReaderListener* listen = nullptr);

    RTPSReader(
            RTPSParticipantImpl* pimpl,
            const GUID_t& guid,
            const ReaderAttributes& att,
            const std::shared_ptr<IPayloadPool>& payload_pool,
            ReaderHistory* hist,
            ReaderListener* listen = nullptr);

    RTPSReader(
            RTPSParticipantImpl* pimpl,
            const GUID_t& guid,
            const ReaderAttributes& att,
            const std::shared_ptr<IPayloadPool>& payload_pool,
            const std::shared_ptr<IChangePool>& change_pool,
            ReaderHistory* hist,
            ReaderListener* listen = nullptr);

    virtual ~RTPSReader();

public:

    RTPS_DllAPI virtual bool matched_writer_add(
            const WriterProxyData& wdata) = 0;

    RTPS_DllAPI virtual bool matched_writer_remove(
            const GUID_t& writer_guid,
            bool removed_by_lease = false) = 0;

    RTPS_DllAPI virtual bool matched_writer_is_matched(
            const GUID_t& writer_guid) = 0;

    RTPS_DllAPI virtual bool processDataMsg(
            CacheChange_t* change) = 0;

    RTPS_DllAPI virtual bool processDataFragMsg(
            CacheChange_t* change,
            uint32_t sampleSize,
            uint32_t fragmentStartingNum,
            uint16_t fragmentsInSubmessage) = 0;

    RTPS_DllAPI virtual bool processHeartbeatMsg(
            const GUID_t& writerGUID,
            uint32_t hbCount,
            const SequenceNumber_t& firstSN,
            const SequenceNumber_t& lastSN,
            bool finalFlag,
            bool livelinessFlag) = 0;

    RTPS_DllAPI virtual bool processGapMsg(
            const GUID_t& writerGUID,
            const SequenceNumber_t& gapStart,
            const SequenceNumberSet_t& gapList) = 0;

    RTPS_DllAPI virtual bool change_removed_by_history(
            CacheChange_t* change,
            WriterProxy* prox = nullptr) = 0;

    RTPS_DllAPI ReaderListener* getListener() const;

    RTPS_DllAPI bool setListener(
            ReaderListener* target);

    RTPS_DllAPI bool reserveCache(
            CacheChange_t** change,
            uint32_t dataCdrSerializedSize);

    RTPS_DllAPI void releaseCache(
            CacheChange_t* change);

    RTPS_DllAPI virtual bool nextUnreadCache(
            CacheChange_t** change,
            WriterProxy** wp) = 0;

    RTPS_DllAPI virtual bool nextUntakenCache(
            CacheChange_t** change,
            WriterProxy** wp) = 0;

    RTPS_DllAPI bool wait_for_unread_cache(
            const eprosima::fastrtps::Duration_t& timeout);

    RTPS_DllAPI uint64_t get_unread_count() const;

    RTPS_DllAPI uint64_t get_unread_count(
            bool mark_as_read);

    RTPS_DllAPI inline bool expectsInlineQos()
    {
        return m_expectsInlineQos;
    }

    RTPS_DllAPI inline ReaderHistory* getHistory()
    {
        return mp_history;
    }

    RTPS_DllAPI eprosima::fastdds::rtps::IReaderDataFilter* get_content_filter() const
    {
        std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
        return data_filter_;
    }

    RTPS_DllAPI void set_content_filter(
            eprosima::fastdds::rtps::IReaderDataFilter* filter)
    {
        std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
        data_filter_ = filter;
    }

    virtual bool isInCleanState() = 0;

    LivelinessChangedStatus liveliness_changed_status_;

    inline void enableMessagesFromUnkownWriters(
            bool enable)
    {
        m_acceptMessagesFromUnkownWriters = enable;
    }

    void setTrustedWriter(
            const EntityId_t& writer)
    {
        m_acceptMessagesFromUnkownWriters = false;
        m_trustedWriterEntityId = writer;
    }

    virtual void assert_writer_liveliness(
            const GUID_t& writer) = 0;

    virtual bool begin_sample_access_nts(
            CacheChange_t* change,
            WriterProxy*& wp,
            bool& is_future_change) = 0;

    virtual void end_sample_access_nts(
            CacheChange_t* change,
            WriterProxy*& wp,
            bool mark_as_read) = 0;

    virtual void change_read_by_user(
            CacheChange_t* change,
            WriterProxy* writer,
            bool mark_as_read = true) = 0;

    RTPS_DllAPI bool is_sample_valid(
            const void* data,
            const GUID_t& writer,
            const SequenceNumber_t& sn) const;

    const std::unique_ptr<IDataSharingListener>& datasharing_listener() const
    {
        return datasharing_listener_;
    }

#ifdef FASTDDS_STATISTICS

    RTPS_DllAPI bool add_statistics_listener(
            std::shared_ptr<fastdds::statistics::IListener> listener);

    RTPS_DllAPI bool remove_statistics_listener(
            std::shared_ptr<fastdds::statistics::IListener> listener);

    RTPS_DllAPI void set_enabled_statistics_writers_mask(
            uint32_t enabled_writers);

#endif // FASTDDS_STATISTICS

protected:

    virtual bool may_remove_history_record(
            bool removed_by_lease);

    void add_persistence_guid(
            const GUID_t& guid,
            const GUID_t& persistence_guid);

    void remove_persistence_guid(
            const GUID_t& guid,
            const GUID_t& persistence_guid,
            bool removed_by_lease);

    SequenceNumber_t get_last_notified(
            const GUID_t& guid);

    SequenceNumber_t update_last_notified(
            const GUID_t& guid,
            const SequenceNumber_t& seq);

    virtual void set_last_notified(
            const GUID_t& persistence_guid,
            const SequenceNumber_t& seq);

    History::const_iterator findCacheInFragmentedProcess(
            const SequenceNumber_t& sequence_number,
            const GUID_t& writer_guid,
            CacheChange_t** change,
            History::const_iterator hint) const;

    void create_datasharing_listener(
            ResourceLimitedContainerConfig limits);

    bool is_datasharing_compatible_with(
            const WriterProxyData& wdata);

    ReaderHistory* mp_history;
    ReaderListener* mp_listener;
    bool m_acceptMessagesToUnknownReaders;
    bool m_acceptMessagesFromUnkownWriters;
    EntityId_t m_trustedWriterEntityId;
    bool m_expectsInlineQos;

    ReaderHistoryState* history_state_;

    uint64_t total_unread_ = 0;

    TimedConditionVariable new_notification_cv_;

    LivelinessQosPolicyKind liveliness_kind_;
    Duration_t liveliness_lease_duration_;

    bool is_datasharing_compatible_ = false;
    std::unique_ptr<IDataSharingListener> datasharing_listener_;

    eprosima::fastdds::rtps::IReaderDataFilter* data_filter_ = nullptr;

private:

    RTPSReader& operator =(
            const RTPSReader&) = delete;

    void init(
            const std::shared_ptr<IPayloadPool>& payload_pool,
            const std::shared_ptr<IChangePool>& change_pool,
            const ReaderAttributes& att);

};

} /* namespace rtps */
} /* namespace fastrtps */
} /* namespace eprosima */

#endif /* _FASTDDS_RTPS_READER_RTPSREADER_H_ */