Program Listing for File StatefulWriter.h
↰ Return to documentation for file (/tmp/ws/src/fastrtps/include/fastdds/rtps/writer/StatefulWriter.h
)
// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef _FASTDDS_RTPS_STATEFULWRITER_H_
#define _FASTDDS_RTPS_STATEFULWRITER_H_
#ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC
#include <fastdds/rtps/writer/RTPSWriter.h>
#include <fastdds/rtps/interfaces/IReaderDataFilter.hpp>
#include <fastdds/rtps/history/IChangePool.h>
#include <fastdds/rtps/history/IPayloadPool.h>
#include <fastrtps/utils/collections/ResourceLimitedVector.hpp>
#include <condition_variable>
#include <mutex>
namespace eprosima {
namespace fastrtps {
namespace rtps {
class ReaderProxy;
class TimedEvent;
class StatefulWriter : public RTPSWriter
{
friend class RTPSParticipantImpl;
friend class ReaderProxy;
public:
virtual ~StatefulWriter();
protected:
StatefulWriter(
RTPSParticipantImpl* impl,
const GUID_t& guid,
const WriterAttributes& att,
fastdds::rtps::FlowController* flow_controller,
WriterHistory* hist,
WriterListener* listen = nullptr);
StatefulWriter(
RTPSParticipantImpl* impl,
const GUID_t& guid,
const WriterAttributes& att,
const std::shared_ptr<IPayloadPool>& payload_pool,
fastdds::rtps::FlowController* flow_controller,
WriterHistory* hist,
WriterListener* listen = nullptr);
StatefulWriter(
RTPSParticipantImpl* impl,
const GUID_t& guid,
const WriterAttributes& att,
const std::shared_ptr<IPayloadPool>& payload_pool,
const std::shared_ptr<IChangePool>& change_pool,
fastdds::rtps::FlowController* flow_controller,
WriterHistory* hist,
WriterListener* listen = nullptr);
void rebuild_status_after_load();
virtual void print_inconsistent_acknack(
const GUID_t& writer_guid,
const GUID_t& reader_guid,
const SequenceNumber_t& min_requested_sequence_number,
const SequenceNumber_t& max_requested_sequence_number,
const SequenceNumber_t& next_sequence_number);
private:
void init(
RTPSParticipantImpl* pimpl,
const WriterAttributes& att);
TimedEvent* periodic_hb_event_;
TimedEvent* nack_response_event_;
TimedEvent* ack_event_;
Count_t m_heartbeatCount;
WriterTimes m_times;
ResourceLimitedVector<ReaderProxy*> matched_remote_readers_;
ResourceLimitedVector<ReaderProxy*> matched_readers_pool_;
using ReaderProxyIterator = ResourceLimitedVector<ReaderProxy*>::iterator;
using ReaderProxyConstIterator = ResourceLimitedVector<ReaderProxy*>::const_iterator;
SequenceNumber_t next_all_acked_notify_sequence_;
SequenceNumber_t min_readers_low_mark_;
// TODO Join this mutex when main mutex would not be recursive.
std::mutex all_acked_mutex_;
std::condition_variable all_acked_cond_;
// TODO Also remove when main mutex not recursive.
bool all_acked_;
std::condition_variable_any may_remove_change_cond_;
unsigned int may_remove_change_;
public:
void unsent_change_added_to_history(
CacheChange_t* p,
const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time) override;
bool change_removed_by_history(
CacheChange_t* a_change) override;
bool intraprocess_delivery(
CacheChange_t* change,
ReaderProxy* reader_proxy);
bool intraprocess_gap(
ReaderProxy* reader_proxy,
const SequenceNumber_t& seq_num)
{
SequenceNumber_t last_seq = seq_num + 1;
return intraprocess_gap(reader_proxy, seq_num, last_seq);
}
bool intraprocess_gap(
ReaderProxy* reader_proxy,
const SequenceNumber_t& first_seq,
const SequenceNumber_t& last_seq);
bool intraprocess_heartbeat(
ReaderProxy* reader_proxy,
bool liveliness = false);
inline void incrementHBCount()
{
on_heartbeat(++m_heartbeatCount);
}
bool matched_reader_add(
const ReaderProxyData& data) override;
bool matched_reader_remove(
const GUID_t& reader_guid) override;
bool matched_reader_is_matched(
const GUID_t& reader_guid) override;
bool is_acked_by_all(
const CacheChange_t* a_change) const override;
template <typename Function>
Function for_each_reader_proxy(
Function f) const
{
// we cannot directly pass iterators neither const_iterators to matched_readers_ because then the functor would
// be able to modify ReaderProxy elements
for ( const ReaderProxy* rp : matched_local_readers_ )
{
f(rp);
}
for ( const ReaderProxy* rp : matched_datasharing_readers_ )
{
f(rp);
}
for ( const ReaderProxy* rp : matched_remote_readers_ )
{
f(rp);
}
return f;
}
bool wait_for_all_acked(
const Duration_t& max_wait) override;
bool all_readers_updated();
bool try_remove_change(
const std::chrono::steady_clock::time_point& max_blocking_time_point,
std::unique_lock<RecursiveTimedMutex>& lock) override;
bool wait_for_acknowledgement(
const SequenceNumber_t& seq,
const std::chrono::steady_clock::time_point& max_blocking_time_point,
std::unique_lock<RecursiveTimedMutex>& lock) override;
void updateAttributes(
const WriterAttributes& att) override;
bool matched_reader_lookup(
GUID_t& readerGuid,
ReaderProxy** RP);
inline Count_t getHeartbeatCount() const
{
return this->m_heartbeatCount;
}
inline RTPSParticipantImpl* getRTPSParticipant() const
{
return mp_RTPSParticipant;
}
inline size_t getMatchedReadersSize() const
{
std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
return matched_remote_readers_.size()
+ matched_local_readers_.size()
+ matched_datasharing_readers_.size();
}
inline bool get_disable_positive_acks() const
{
return disable_positive_acks_;
}
void updateTimes(
const WriterTimes& times);
SequenceNumber_t next_sequence_number() const;
bool send_periodic_heartbeat(
bool final = false,
bool liveliness = false);
void send_heartbeat_to_nts(
ReaderProxy& remoteReaderProxy,
bool liveliness = false,
bool force = false);
void perform_nack_response();
void perform_nack_supression(
const GUID_t& reader_guid);
bool process_acknack(
const GUID_t& writer_guid,
const GUID_t& reader_guid,
uint32_t ack_count,
const SequenceNumberSet_t& sn_set,
bool final_flag,
bool& result) override;
virtual bool process_nack_frag(
const GUID_t& writer_guid,
const GUID_t& reader_guid,
uint32_t ack_count,
const SequenceNumber_t& seq_num,
const FragmentNumberSet_t fragments_state,
bool& result) override;
void reader_data_filter(
fastdds::rtps::IReaderDataFilter* filter) final;
const fastdds::rtps::IReaderDataFilter* reader_data_filter() const final;
DeliveryRetCode deliver_sample_nts(
CacheChange_t* cache_change,
RTPSMessageGroup& group,
LocatorSelectorSender& locator_selector,
const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time) override;
LocatorSelectorSender& get_general_locator_selector() override
{
return locator_selector_general_;
}
LocatorSelectorSender& get_async_locator_selector() override
{
return locator_selector_async_;
}
private:
bool is_acked_by_all(
const SequenceNumber_t seq) const;
void update_reader_info(
LocatorSelectorSender& locator_selector,
bool create_sender_resources);
void select_all_readers_nts(
RTPSMessageGroup& group,
LocatorSelectorSender& locator_selector);
void send_heartbeat_piggyback_nts_(
RTPSMessageGroup& message_group,
LocatorSelectorSender& locator_selector,
uint32_t& last_bytes_processed);
void send_heartbeat_nts_(
size_t number_of_readers,
RTPSMessageGroup& message_group,
bool final,
bool liveliness = false);
void check_acked_status();
bool ack_timer_expired();
void send_heartbeat_to_all_readers();
void deliver_sample_to_intraprocesses(
CacheChange_t* change);
void deliver_sample_to_datasharing(
CacheChange_t* change);
DeliveryRetCode deliver_sample_to_network(
CacheChange_t* change,
RTPSMessageGroup& group,
LocatorSelectorSender& locator_selector,
const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time);
void prepare_datasharing_delivery(
CacheChange_t* change);
void add_gaps_for_holes_in_history_(
RTPSMessageGroup& group);
bool disable_heartbeat_piggyback_;
bool disable_positive_acks_;
std::chrono::duration<double, std::ratio<1, 1000000>> keep_duration_us_;
SequenceNumber_t last_sequence_number_;
SequenceNumber_t biggest_removed_sequence_number_;
const uint32_t sendBufferSize_;
int32_t currentUsageSendBufferSize_;
bool there_are_remote_readers_ = false;
bool there_are_local_readers_ = false;
StatefulWriter& operator =(
const StatefulWriter&) = delete;
fastdds::rtps::IReaderDataFilter* reader_data_filter_ = nullptr;
ResourceLimitedVector<ReaderProxy*> matched_local_readers_;
ResourceLimitedVector<ReaderProxy*> matched_datasharing_readers_;
bool there_are_datasharing_readers_ = false;
LocatorSelectorSender locator_selector_general_;
LocatorSelectorSender locator_selector_async_;
};
} /* namespace rtps */
} /* namespace fastrtps */
} /* namespace eprosima */
#endif // ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC
#endif /* _FASTDDS_RTPS_STATEFULWRITER_H_ */