Program Listing for File RTPSWriter.h

Return to documentation for file (/tmp/ws/src/fastrtps/include/fastdds/rtps/writer/RTPSWriter.h)

// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.


#include <chrono>
#include <functional>
#include <memory>
#include <mutex>
#include <vector>

#include <fastdds/rtps/Endpoint.h>
#include <fastdds/rtps/attributes/HistoryAttributes.h>
#include <fastdds/rtps/attributes/WriterAttributes.h>
#include <fastdds/rtps/builtin/data/ReaderProxyData.h>
#include <fastdds/rtps/interfaces/IReaderDataFilter.hpp>
#include <fastdds/rtps/messages/RTPSMessageGroup.h>
#include "DeliveryRetCode.hpp"
#include "LocatorSelectorSender.hpp"
#include <fastrtps/qos/LivelinessLostStatus.h>

#include <fastdds/statistics/rtps/StatisticsCommon.hpp>

namespace eprosima {

namespace fastdds {
namespace rtps {

class FlowController;

} // namespace rtps

namespace dds {

class DataWriterImpl;

} // namespace dds
} // namespace fastdds

namespace fastrtps {
namespace rtps {

class WriterListener;
class WriterHistory;
class DataSharingNotifier;
struct CacheChange_t;

class RTPSWriter
    : public Endpoint
    , public fastdds::statistics::StatisticsWriterImpl
    friend class WriterHistory;
    friend class RTPSParticipantImpl;
    friend class RTPSMessageGroup;
    friend class fastdds::dds::DataWriterImpl;


            RTPSParticipantImpl* impl,
            const GUID_t& guid,
            const WriterAttributes& att,
            fastdds::rtps::FlowController* flow_controller,
            WriterHistory* hist,
            WriterListener* listen = nullptr);

            RTPSParticipantImpl* impl,
            const GUID_t& guid,
            const WriterAttributes& att,
            const std::shared_ptr<IPayloadPool>& payload_pool,
            fastdds::rtps::FlowController* flow_controller,
            WriterHistory* hist,
            WriterListener* listen = nullptr);

            RTPSParticipantImpl* impl,
            const GUID_t& guid,
            const WriterAttributes& att,
            const std::shared_ptr<IPayloadPool>& payload_pool,
            const std::shared_ptr<IChangePool>& change_pool,
            fastdds::rtps::FlowController* flow_controller,
            WriterHistory* hist,
            WriterListener* listen = nullptr);

    virtual ~RTPSWriter();


    template<typename T>
    CacheChange_t* new_change(
            T& data,
            ChangeKind_t changeKind,
            InstanceHandle_t handle = c_InstanceHandle_Unknown)
        return new_change([data]() -> uint32_t
                           return (uint32_t)T::getCdrSerializedSize(data);
                       }, changeKind, handle);

    RTPS_DllAPI CacheChange_t* new_change(
            const std::function<uint32_t()>& dataCdrSerializedSize,
            ChangeKind_t changeKind,
            InstanceHandle_t handle = c_InstanceHandle_Unknown);

    RTPS_DllAPI CacheChange_t* new_change(
            ChangeKind_t changeKind,
            InstanceHandle_t handle = c_InstanceHandle_Unknown);

    RTPS_DllAPI bool release_change(
            CacheChange_t* change);

    RTPS_DllAPI virtual bool matched_reader_add(
            const ReaderProxyData& data) = 0;

    RTPS_DllAPI virtual bool matched_reader_remove(
            const GUID_t& reader_guid) = 0;

    RTPS_DllAPI virtual bool matched_reader_is_matched(
            const GUID_t& reader_guid) = 0;

    RTPS_DllAPI virtual void reader_data_filter(
            fastdds::rtps::IReaderDataFilter* filter) = 0;

    RTPS_DllAPI virtual const fastdds::rtps::IReaderDataFilter* reader_data_filter() const = 0;

    RTPS_DllAPI virtual bool is_acked_by_all(
            const CacheChange_t* /*a_change*/) const
        return false;

    RTPS_DllAPI virtual bool wait_for_all_acked(
            const Duration_t& /*max_wait*/)
        return true;

    RTPS_DllAPI virtual void updateAttributes(
            const WriterAttributes& att) = 0;

    RTPS_DllAPI SequenceNumber_t get_seq_num_min();

    RTPS_DllAPI SequenceNumber_t get_seq_num_max();

    RTPS_DllAPI uint32_t getTypeMaxSerialized();

    uint32_t getMaxDataSize();

    uint32_t calculateMaxDataSize(
            uint32_t length);

    RTPS_DllAPI inline WriterListener* getListener()
        return mp_listener;

    RTPS_DllAPI inline bool set_listener(
            WriterListener* listener)
        mp_listener = listener;
        return true;

    RTPS_DllAPI inline bool isAsync() const
        return is_async_;

    RTPS_DllAPI bool remove_older_changes(
            unsigned int max = 0);

    virtual bool try_remove_change(
            const std::chrono::steady_clock::time_point& max_blocking_time_point,
            std::unique_lock<RecursiveTimedMutex>& lock) = 0;

    virtual bool wait_for_acknowledgement(
            const SequenceNumber_t& seq,
            const std::chrono::steady_clock::time_point& max_blocking_time_point,
            std::unique_lock<RecursiveTimedMutex>& lock) = 0;


    RTPS_DllAPI bool add_statistics_listener(
            std::shared_ptr<fastdds::statistics::IListener> listener);

    RTPS_DllAPI bool remove_statistics_listener(
            std::shared_ptr<fastdds::statistics::IListener> listener);

    RTPS_DllAPI void set_enabled_statistics_writers_mask(
            uint32_t enabled_writers);


    inline RTPSParticipantImpl* getRTPSParticipant() const
        return mp_RTPSParticipant;

    void set_separate_sending (
            bool enable)
        m_separateSendingEnabled = enable;

    bool get_separate_sending () const
        return m_separateSendingEnabled;

    virtual bool process_acknack(
            const GUID_t& writer_guid,
            const GUID_t& reader_guid,
            uint32_t ack_count,
            const SequenceNumberSet_t& sn_set,
            bool final_flag,
            bool& result)
        (void)reader_guid; (void)ack_count; (void)sn_set; (void)final_flag;

        result = false;
        return writer_guid == m_guid;

    virtual bool process_nack_frag(
            const GUID_t& writer_guid,
            const GUID_t& reader_guid,
            uint32_t ack_count,
            const SequenceNumber_t& seq_num,
            const FragmentNumberSet_t fragments_state,
            bool& result)
        (void)reader_guid; (void)ack_count; (void)seq_num; (void)fragments_state;

        result = false;
        return writer_guid == m_guid;

    const LivelinessQosPolicyKind& get_liveliness_kind() const;

    const Duration_t& get_liveliness_lease_duration() const;

    const Duration_t& get_liveliness_announcement_period() const;

    LivelinessLostStatus liveliness_lost_status_;

    bool is_datasharing_compatible() const;

    virtual DeliveryRetCode deliver_sample_nts(
            CacheChange_t* cache_change,
            RTPSMessageGroup& group,
            LocatorSelectorSender& locator_selector,
            const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time) = 0;

    virtual LocatorSelectorSender& get_general_locator_selector() = 0;

    virtual LocatorSelectorSender& get_async_locator_selector() = 0;

    virtual bool send_nts(
            CDRMessage_t* message,
            const LocatorSelectorSender& locator_selector,
            std::chrono::steady_clock::time_point& max_blocking_time_point) const;


    bool m_pushMode = true;

    fastdds::rtps::FlowController* flow_controller_;

    WriterHistory* mp_history = nullptr;
    WriterListener* mp_listener = nullptr;
    bool is_async_ = false;
    bool m_separateSendingEnabled = false;

    LivelinessQosPolicyKind liveliness_kind_;
    Duration_t liveliness_lease_duration_;
    Duration_t liveliness_announcement_period_;

    void add_guid(
            LocatorSelectorSender& locator_selector,
            const GUID_t& remote_guid);

    void compute_selected_guids(
            LocatorSelectorSender& locator_selector);

    void update_cached_info_nts(
            LocatorSelectorSender& locator_selector);

    virtual void unsent_change_added_to_history(
            CacheChange_t* change,
            const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time) = 0;

    virtual bool change_removed_by_history(
            CacheChange_t* a_change) = 0;

    bool is_datasharing_compatible_with(
            const ReaderProxyData& rdata) const;

    bool is_pool_initialized() const;

    template<typename Functor>
    bool send_data_or_fragments(
            RTPSMessageGroup& group,
            CacheChange_t* change,
            bool inline_qos,
            Functor sent_fun)
        bool sent_ok = true;

        uint32_t n_fragments = change->getFragmentCount();
        if (n_fragments > 0)
            for (FragmentNumber_t frag = 1; frag <= n_fragments; frag++)
                sent_ok &= group.add_data_frag(*change, frag, inline_qos);
                if (sent_ok)
                    sent_fun(change, frag);
                            "Error sending fragment (" << change->sequenceNumber << ", " << frag << ")");
            sent_ok = group.add_data(*change, inline_qos);
            if (sent_ok)
                sent_fun(change, 0);
                EPROSIMA_LOG_ERROR(RTPS_WRITER, "Error sending change " << change->sequenceNumber);

        return sent_ok;

    void add_statistics_sent_submessage(
            CacheChange_t* change,
            size_t num_locators);

    void deinit();


    RecursiveTimedMutex& get_mutex()
        return mp_mutex;

    RTPSWriter& operator =(
            const RTPSWriter&) = delete;

    void init(
            const std::shared_ptr<IPayloadPool>& payload_pool,
            const std::shared_ptr<IChangePool>& change_pool,
            const WriterAttributes& att);

    RTPSWriter* next_[2] = { nullptr, nullptr };

} /* namespace rtps */
} /* namespace fastrtps */
} /* namespace eprosima */