.. _program_listing_file__tmp_ws_src_fastrtps_include_fastdds_rtps_writer_RTPSWriter.h: Program Listing for File RTPSWriter.h ===================================== |exhale_lsh| :ref:`Return to documentation for file ` (``/tmp/ws/src/fastrtps/include/fastdds/rtps/writer/RTPSWriter.h``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp // Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima). // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #ifndef _FASTDDS_RTPS_RTPSWRITER_H_ #define _FASTDDS_RTPS_RTPSWRITER_H_ #include #include #include #include #include #include #include #include #include #include #include #include "DeliveryRetCode.hpp" #include "LocatorSelectorSender.hpp" #include #include namespace eprosima { namespace fastdds { namespace rtps { class FlowController; } // namespace rtps namespace dds { class DataWriterImpl; } // namespace dds } // namespace fastdds namespace fastrtps { namespace rtps { class WriterListener; class WriterHistory; class DataSharingNotifier; struct CacheChange_t; class RTPSWriter : public Endpoint , public fastdds::statistics::StatisticsWriterImpl { friend class WriterHistory; friend class RTPSParticipantImpl; friend class RTPSMessageGroup; friend class fastdds::dds::DataWriterImpl; protected: RTPSWriter( RTPSParticipantImpl* impl, const GUID_t& guid, const WriterAttributes& att, fastdds::rtps::FlowController* flow_controller, WriterHistory* hist, WriterListener* listen = nullptr); RTPSWriter( RTPSParticipantImpl* impl, const GUID_t& guid, const WriterAttributes& att, const std::shared_ptr& payload_pool, fastdds::rtps::FlowController* flow_controller, WriterHistory* hist, WriterListener* listen = nullptr); RTPSWriter( RTPSParticipantImpl* impl, const GUID_t& guid, const WriterAttributes& att, const std::shared_ptr& payload_pool, const std::shared_ptr& change_pool, fastdds::rtps::FlowController* flow_controller, WriterHistory* hist, WriterListener* listen = nullptr); virtual ~RTPSWriter(); public: template CacheChange_t* new_change( T& data, ChangeKind_t changeKind, InstanceHandle_t handle = c_InstanceHandle_Unknown) { return new_change([data]() -> uint32_t { return (uint32_t)T::getCdrSerializedSize(data); }, changeKind, handle); } RTPS_DllAPI CacheChange_t* new_change( const std::function& dataCdrSerializedSize, ChangeKind_t changeKind, InstanceHandle_t handle = c_InstanceHandle_Unknown); RTPS_DllAPI CacheChange_t* new_change( ChangeKind_t changeKind, InstanceHandle_t handle = c_InstanceHandle_Unknown); RTPS_DllAPI bool release_change( CacheChange_t* change); RTPS_DllAPI virtual bool matched_reader_add( const ReaderProxyData& data) = 0; RTPS_DllAPI virtual bool matched_reader_remove( const GUID_t& reader_guid) = 0; RTPS_DllAPI virtual bool matched_reader_is_matched( const GUID_t& reader_guid) = 0; RTPS_DllAPI virtual void reader_data_filter( fastdds::rtps::IReaderDataFilter* filter) = 0; RTPS_DllAPI virtual const fastdds::rtps::IReaderDataFilter* reader_data_filter() const = 0; RTPS_DllAPI virtual bool has_been_fully_delivered( const SequenceNumber_t& seq_num) const { static_cast(seq_num); return false; } RTPS_DllAPI virtual bool is_acked_by_all( const CacheChange_t* /*a_change*/) const { return false; } RTPS_DllAPI virtual bool wait_for_all_acked( const Duration_t& /*max_wait*/) { return true; } RTPS_DllAPI virtual void updateAttributes( const WriterAttributes& att) = 0; RTPS_DllAPI SequenceNumber_t get_seq_num_min(); RTPS_DllAPI SequenceNumber_t get_seq_num_max(); RTPS_DllAPI uint32_t getTypeMaxSerialized(); uint32_t getMaxDataSize(); uint32_t calculateMaxDataSize( uint32_t length); RTPS_DllAPI inline WriterListener* getListener() { return mp_listener; } RTPS_DllAPI inline bool set_listener( WriterListener* listener) { mp_listener = listener; return true; } RTPS_DllAPI inline bool isAsync() const { return is_async_; } RTPS_DllAPI bool remove_older_changes( unsigned int max = 0); RTPS_DllAPI virtual bool get_disable_positive_acks() const { return false; } virtual bool try_remove_change( const std::chrono::steady_clock::time_point& max_blocking_time_point, std::unique_lock& lock) = 0; virtual bool wait_for_acknowledgement( const SequenceNumber_t& seq, const std::chrono::steady_clock::time_point& max_blocking_time_point, std::unique_lock& lock) = 0; #ifdef FASTDDS_STATISTICS RTPS_DllAPI bool add_statistics_listener( std::shared_ptr listener); RTPS_DllAPI bool remove_statistics_listener( std::shared_ptr listener); RTPS_DllAPI void set_enabled_statistics_writers_mask( uint32_t enabled_writers); #endif // FASTDDS_STATISTICS inline RTPSParticipantImpl* getRTPSParticipant() const { return mp_RTPSParticipant; } void set_separate_sending ( bool enable) { m_separateSendingEnabled = enable; } bool get_separate_sending () const { return m_separateSendingEnabled; } virtual bool process_acknack( const GUID_t& writer_guid, const GUID_t& reader_guid, uint32_t ack_count, const SequenceNumberSet_t& sn_set, bool final_flag, bool& result) { (void)reader_guid; (void)ack_count; (void)sn_set; (void)final_flag; result = false; return writer_guid == m_guid; } virtual bool process_nack_frag( const GUID_t& writer_guid, const GUID_t& reader_guid, uint32_t ack_count, const SequenceNumber_t& seq_num, const FragmentNumberSet_t fragments_state, bool& result) { (void)reader_guid; (void)ack_count; (void)seq_num; (void)fragments_state; result = false; return writer_guid == m_guid; } const LivelinessQosPolicyKind& get_liveliness_kind() const; const Duration_t& get_liveliness_lease_duration() const; const Duration_t& get_liveliness_announcement_period() const; LivelinessLostStatus liveliness_lost_status_; bool is_datasharing_compatible() const; virtual DeliveryRetCode deliver_sample_nts( CacheChange_t* cache_change, RTPSMessageGroup& group, LocatorSelectorSender& locator_selector, const std::chrono::time_point& max_blocking_time) = 0; virtual LocatorSelectorSender& get_general_locator_selector() = 0; virtual LocatorSelectorSender& get_async_locator_selector() = 0; virtual bool send_nts( CDRMessage_t* message, const LocatorSelectorSender& locator_selector, std::chrono::steady_clock::time_point& max_blocking_time_point) const; protected: bool m_pushMode = true; fastdds::rtps::FlowController* flow_controller_; WriterHistory* mp_history = nullptr; WriterListener* mp_listener = nullptr; bool is_async_ = false; bool m_separateSendingEnabled = false; LivelinessQosPolicyKind liveliness_kind_; Duration_t liveliness_lease_duration_; Duration_t liveliness_announcement_period_; void add_guid( LocatorSelectorSender& locator_selector, const GUID_t& remote_guid); void compute_selected_guids( LocatorSelectorSender& locator_selector); void update_cached_info_nts( LocatorSelectorSender& locator_selector); virtual void unsent_change_added_to_history( CacheChange_t* change, const std::chrono::time_point& max_blocking_time) = 0; virtual bool change_removed_by_history( CacheChange_t* a_change) = 0; bool is_datasharing_compatible_with( const ReaderProxyData& rdata) const; bool is_pool_initialized() const; template bool send_data_or_fragments( RTPSMessageGroup& group, CacheChange_t* change, bool inline_qos, Functor sent_fun) { bool sent_ok = true; uint32_t n_fragments = change->getFragmentCount(); if (n_fragments > 0) { for (FragmentNumber_t frag = 1; frag <= n_fragments; frag++) { sent_ok &= group.add_data_frag(*change, frag, inline_qos); if (sent_ok) { sent_fun(change, frag); } else { EPROSIMA_LOG_ERROR(RTPS_WRITER, "Error sending fragment (" << change->sequenceNumber << ", " << frag << ")"); break; } } } else { sent_ok = group.add_data(*change, inline_qos); if (sent_ok) { sent_fun(change, 0); } else { EPROSIMA_LOG_ERROR(RTPS_WRITER, "Error sending change " << change->sequenceNumber); } } return sent_ok; } void add_statistics_sent_submessage( CacheChange_t* change, size_t num_locators); void deinit(); private: RecursiveTimedMutex& get_mutex() { return mp_mutex; } RTPSWriter& operator =( const RTPSWriter&) = delete; void init( const std::shared_ptr& payload_pool, const std::shared_ptr& change_pool, const WriterAttributes& att); RTPSWriter* next_[2] = { nullptr, nullptr }; }; } /* namespace rtps */ } /* namespace fastrtps */ } /* namespace eprosima */ #endif /* _FASTDDS_RTPS_RTPSWRITER_H_ */