.. _program_listing_file__tmp_ws_src_fastrtps_include_fastdds_rtps_writer_StatefulWriter.h: Program Listing for File StatefulWriter.h ========================================= |exhale_lsh| :ref:`Return to documentation for file ` (``/tmp/ws/src/fastrtps/include/fastdds/rtps/writer/StatefulWriter.h``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp // Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima). // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #ifndef _FASTDDS_RTPS_STATEFULWRITER_H_ #define _FASTDDS_RTPS_STATEFULWRITER_H_ #ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC #include #include #include #include #include #include #include namespace eprosima { namespace fastrtps { namespace rtps { class ReaderProxy; class TimedEvent; class StatefulWriter : public RTPSWriter { friend class RTPSParticipantImpl; friend class ReaderProxy; public: virtual ~StatefulWriter(); protected: StatefulWriter( RTPSParticipantImpl* impl, const GUID_t& guid, const WriterAttributes& att, fastdds::rtps::FlowController* flow_controller, WriterHistory* hist, WriterListener* listen = nullptr); StatefulWriter( RTPSParticipantImpl* impl, const GUID_t& guid, const WriterAttributes& att, const std::shared_ptr& payload_pool, fastdds::rtps::FlowController* flow_controller, WriterHistory* hist, WriterListener* listen = nullptr); StatefulWriter( RTPSParticipantImpl* impl, const GUID_t& guid, const WriterAttributes& att, const std::shared_ptr& payload_pool, const std::shared_ptr& change_pool, fastdds::rtps::FlowController* flow_controller, WriterHistory* hist, WriterListener* listen = nullptr); void rebuild_status_after_load(); virtual void print_inconsistent_acknack( const GUID_t& writer_guid, const GUID_t& reader_guid, const SequenceNumber_t& min_requested_sequence_number, const SequenceNumber_t& max_requested_sequence_number, const SequenceNumber_t& next_sequence_number); private: void init( RTPSParticipantImpl* pimpl, const WriterAttributes& att); TimedEvent* periodic_hb_event_; TimedEvent* nack_response_event_; TimedEvent* ack_event_; Count_t m_heartbeatCount; WriterTimes m_times; ResourceLimitedVector matched_remote_readers_; ResourceLimitedVector matched_readers_pool_; using ReaderProxyIterator = ResourceLimitedVector::iterator; using ReaderProxyConstIterator = ResourceLimitedVector::const_iterator; SequenceNumber_t next_all_acked_notify_sequence_; SequenceNumber_t min_readers_low_mark_; // TODO Join this mutex when main mutex would not be recursive. std::mutex all_acked_mutex_; std::condition_variable all_acked_cond_; // TODO Also remove when main mutex not recursive. bool all_acked_; std::condition_variable_any may_remove_change_cond_; unsigned int may_remove_change_; public: void unsent_change_added_to_history( CacheChange_t* p, const std::chrono::time_point& max_blocking_time) override; bool change_removed_by_history( CacheChange_t* a_change) override; bool intraprocess_delivery( CacheChange_t* change, ReaderProxy* reader_proxy); bool intraprocess_gap( ReaderProxy* reader_proxy, const SequenceNumber_t& seq_num) { SequenceNumber_t last_seq = seq_num + 1; return intraprocess_gap(reader_proxy, seq_num, last_seq); } bool intraprocess_gap( ReaderProxy* reader_proxy, const SequenceNumber_t& first_seq, const SequenceNumber_t& last_seq); bool intraprocess_heartbeat( ReaderProxy* reader_proxy, bool liveliness = false); inline void incrementHBCount() { on_heartbeat(++m_heartbeatCount); } bool matched_reader_add( const ReaderProxyData& data) override; bool matched_reader_remove( const GUID_t& reader_guid) override; bool matched_reader_is_matched( const GUID_t& reader_guid) override; bool has_been_fully_delivered( const SequenceNumber_t& seq_num) const override; bool is_acked_by_all( const CacheChange_t* a_change) const override; template Function for_each_reader_proxy( Function f) const { // we cannot directly pass iterators neither const_iterators to matched_readers_ because then the functor would // be able to modify ReaderProxy elements for ( const ReaderProxy* rp : matched_local_readers_ ) { f(rp); } for ( const ReaderProxy* rp : matched_datasharing_readers_ ) { f(rp); } for ( const ReaderProxy* rp : matched_remote_readers_ ) { f(rp); } return f; } bool wait_for_all_acked( const Duration_t& max_wait) override; bool all_readers_updated(); bool try_remove_change( const std::chrono::steady_clock::time_point& max_blocking_time_point, std::unique_lock& lock) override; bool wait_for_acknowledgement( const SequenceNumber_t& seq, const std::chrono::steady_clock::time_point& max_blocking_time_point, std::unique_lock& lock) override; void updateAttributes( const WriterAttributes& att) override; bool matched_reader_lookup( GUID_t& readerGuid, ReaderProxy** RP); inline Count_t getHeartbeatCount() const { return this->m_heartbeatCount; } inline RTPSParticipantImpl* getRTPSParticipant() const { return mp_RTPSParticipant; } inline size_t getMatchedReadersSize() const { std::lock_guard guard(mp_mutex); return matched_remote_readers_.size() + matched_local_readers_.size() + matched_datasharing_readers_.size(); } bool get_disable_positive_acks() const override { return disable_positive_acks_; } void updateTimes( const WriterTimes& times); SequenceNumber_t next_sequence_number() const; bool send_periodic_heartbeat( bool final = false, bool liveliness = false); void send_heartbeat_to_nts( ReaderProxy& remoteReaderProxy, bool liveliness = false, bool force = false); void perform_nack_response(); void perform_nack_supression( const GUID_t& reader_guid); bool process_acknack( const GUID_t& writer_guid, const GUID_t& reader_guid, uint32_t ack_count, const SequenceNumberSet_t& sn_set, bool final_flag, bool& result) override; virtual bool process_nack_frag( const GUID_t& writer_guid, const GUID_t& reader_guid, uint32_t ack_count, const SequenceNumber_t& seq_num, const FragmentNumberSet_t fragments_state, bool& result) override; void reader_data_filter( fastdds::rtps::IReaderDataFilter* filter) final; const fastdds::rtps::IReaderDataFilter* reader_data_filter() const final; DeliveryRetCode deliver_sample_nts( CacheChange_t* cache_change, RTPSMessageGroup& group, LocatorSelectorSender& locator_selector, const std::chrono::time_point& max_blocking_time) override; LocatorSelectorSender& get_general_locator_selector() override { return locator_selector_general_; } LocatorSelectorSender& get_async_locator_selector() override { return locator_selector_async_; } private: bool is_acked_by_all( const SequenceNumber_t seq) const; void update_reader_info( LocatorSelectorSender& locator_selector, bool create_sender_resources); void select_all_readers_nts( RTPSMessageGroup& group, LocatorSelectorSender& locator_selector); void send_heartbeat_piggyback_nts_( RTPSMessageGroup& message_group, LocatorSelectorSender& locator_selector, uint32_t& last_bytes_processed); void send_heartbeat_nts_( size_t number_of_readers, RTPSMessageGroup& message_group, bool final, bool liveliness = false); void check_acked_status(); bool ack_timer_expired(); void send_heartbeat_to_all_readers(); void deliver_sample_to_intraprocesses( CacheChange_t* change); void deliver_sample_to_datasharing( CacheChange_t* change); DeliveryRetCode deliver_sample_to_network( CacheChange_t* change, RTPSMessageGroup& group, LocatorSelectorSender& locator_selector, const std::chrono::time_point& max_blocking_time); void prepare_datasharing_delivery( CacheChange_t* change); void add_gaps_for_holes_in_history_( RTPSMessageGroup& group); bool disable_heartbeat_piggyback_; bool disable_positive_acks_; std::chrono::duration> keep_duration_us_; SequenceNumber_t last_sequence_number_; SequenceNumber_t biggest_removed_sequence_number_; const uint32_t sendBufferSize_; int32_t currentUsageSendBufferSize_; bool there_are_remote_readers_ = false; bool there_are_local_readers_ = false; StatefulWriter& operator =( const StatefulWriter&) = delete; fastdds::rtps::IReaderDataFilter* reader_data_filter_ = nullptr; ResourceLimitedVector matched_local_readers_; ResourceLimitedVector matched_datasharing_readers_; bool there_are_datasharing_readers_ = false; LocatorSelectorSender locator_selector_general_; LocatorSelectorSender locator_selector_async_; }; } /* namespace rtps */ } /* namespace fastrtps */ } /* namespace eprosima */ #endif // ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC #endif /* _FASTDDS_RTPS_STATEFULWRITER_H_ */