Program Listing for File StatefulReader.h
↰ Return to documentation for file (/tmp/ws/src/fastrtps/include/fastdds/rtps/reader/StatefulReader.h
)
// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef _FASTDDS_RTPS_READER_STATEFULREADER_H_
#define _FASTDDS_RTPS_READER_STATEFULREADER_H_
#ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC
#include <fastdds/rtps/reader/RTPSReader.h>
#include <fastrtps/utils/collections/ResourceLimitedVector.hpp>
#include <fastdds/rtps/common/CDRMessage_t.h>
#include <fastdds/rtps/messages/RTPSMessageGroup.h>
#include <mutex>
namespace eprosima {
namespace fastrtps {
namespace rtps {
class WriterProxy;
class RTPSMessageSenderInterface;
class StatefulReader : public RTPSReader
{
public:
friend class RTPSParticipantImpl;
virtual ~StatefulReader();
protected:
StatefulReader(
RTPSParticipantImpl* pimpl,
const GUID_t& guid,
const ReaderAttributes& att,
ReaderHistory* hist,
ReaderListener* listen = nullptr);
StatefulReader(
RTPSParticipantImpl* pimpl,
const GUID_t& guid,
const ReaderAttributes& att,
const std::shared_ptr<IPayloadPool>& payload_pool,
ReaderHistory* hist,
ReaderListener* listen = nullptr);
StatefulReader(
RTPSParticipantImpl* pimpl,
const GUID_t& guid,
const ReaderAttributes& att,
const std::shared_ptr<IPayloadPool>& payload_pool,
const std::shared_ptr<IChangePool>& change_pool,
ReaderHistory* hist,
ReaderListener* listen = nullptr);
public:
bool matched_writer_add(
const WriterProxyData& wdata) override;
bool matched_writer_remove(
const GUID_t& writer_guid,
bool removed_by_lease = false) override;
bool matched_writer_is_matched(
const GUID_t& writer_guid) override;
bool matched_writer_lookup(
const GUID_t& writerGUID,
WriterProxy** WP);
bool processDataMsg(
CacheChange_t* change) override;
bool processDataFragMsg(
CacheChange_t* change,
uint32_t sampleSize,
uint32_t fragmentStartingNum,
uint16_t fragmentsInSubmessage) override;
bool processHeartbeatMsg(
const GUID_t& writerGUID,
uint32_t hbCount,
const SequenceNumber_t& firstSN,
const SequenceNumber_t& lastSN,
bool finalFlag,
bool livelinessFlag) override;
bool processGapMsg(
const GUID_t& writerGUID,
const SequenceNumber_t& gapStart,
const SequenceNumberSet_t& gapList) override;
bool change_removed_by_history(
CacheChange_t* change,
WriterProxy* prox = nullptr) override;
bool change_received(
CacheChange_t* a_change,
WriterProxy* prox,
size_t unknown_missing_changes_up_to);
inline RTPSParticipantImpl* getRTPSParticipant() const
{
return mp_RTPSParticipant;
}
ResourceEvent& getEventResource() const;
bool nextUnreadCache(
CacheChange_t** change,
WriterProxy** wpout = nullptr) override;
bool nextUntakenCache(
CacheChange_t** change,
WriterProxy** wpout = nullptr) override;
bool updateTimes(
const ReaderTimes& times);
inline ReaderTimes& getTimes()
{
return times_;
}
inline size_t getMatchedWritersSize() const
{
return matched_writers_.size();
}
bool isInCleanState() override;
void send_acknack(
const WriterProxy* writer,
const SequenceNumberSet_t& sns,
RTPSMessageSenderInterface* sender,
bool is_final);
void send_acknack(
const WriterProxy* writer,
RTPSMessageSenderInterface* sender,
bool heartbeat_was_final);
bool send_sync_nts(
CDRMessage_t* message,
const Locators& locators_begin,
const Locators& locators_end,
std::chrono::steady_clock::time_point& max_blocking_time_point);
void assert_writer_liveliness(
const GUID_t& writer) override;
bool begin_sample_access_nts(
CacheChange_t* change,
WriterProxy*& wp,
bool& is_future_change) override;
void end_sample_access_nts(
CacheChange_t* change,
WriterProxy*& wp,
bool mark_as_read) override;
void change_read_by_user(
CacheChange_t* change,
WriterProxy* writer,
bool mark_as_read = true) override;
private:
void init(
RTPSParticipantImpl* pimpl,
const ReaderAttributes& att);
bool acceptMsgFrom(
const GUID_t& entityGUID,
WriterProxy** wp) const;
bool findWriterProxy(
const GUID_t& writerGUID,
WriterProxy** wp) const;
void NotifyChanges(
WriterProxy* wp);
void remove_changes_from(
const GUID_t& writerGUID,
bool is_payload_pool_lost = false);
uint32_t acknack_count_;
uint32_t nackfrag_count_;
ReaderTimes times_;
ResourceLimitedVector<WriterProxy*> matched_writers_;
ResourceLimitedVector<WriterProxy*> matched_writers_pool_;
ResourceLimitedContainerConfig proxy_changes_config_;
bool disable_positive_acks_;
bool is_alive_;
};
} /* namespace rtps */
} /* namespace fastrtps */
} /* namespace eprosima */
#endif // ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC
#endif // _FASTDDS_RTPS_READER_STATEFULREADER_H_