Program Listing for File RTPSWriter.h

Return to documentation for file (/tmp/ws/src/fastrtps/include/fastdds/rtps/writer/RTPSWriter.h)

// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef _FASTDDS_RTPS_RTPSWRITER_H_
#define _FASTDDS_RTPS_RTPSWRITER_H_

#include <chrono>
#include <functional>
#include <memory>
#include <mutex>
#include <vector>

#include <fastdds/rtps/Endpoint.h>
#include <fastdds/rtps/attributes/HistoryAttributes.h>
#include <fastdds/rtps/attributes/WriterAttributes.h>
#include <fastdds/rtps/builtin/data/ReaderProxyData.h>
#include <fastdds/rtps/interfaces/IReaderDataFilter.hpp>
#include <fastdds/rtps/messages/RTPSMessageGroup.h>
#include "DeliveryRetCode.hpp"
#include "LocatorSelectorSender.hpp"
#include <fastrtps/qos/LivelinessLostStatus.h>

#include <fastdds/statistics/rtps/StatisticsCommon.hpp>

namespace eprosima {

namespace fastdds {
namespace rtps {

class FlowController;

} // namespace rtps

namespace dds {

class DataWriterImpl;

} // namespace dds
} // namespace fastdds

namespace fastrtps {
namespace rtps {

class WriterListener;
class WriterHistory;
class DataSharingNotifier;
struct CacheChange_t;

class RTPSWriter
    : public Endpoint
    , public fastdds::statistics::StatisticsWriterImpl
{
    friend class WriterHistory;
    friend class RTPSParticipantImpl;
    friend class RTPSMessageGroup;
    friend class fastdds::dds::DataWriterImpl;

protected:

    RTPSWriter(
            RTPSParticipantImpl* impl,
            const GUID_t& guid,
            const WriterAttributes& att,
            fastdds::rtps::FlowController* flow_controller,
            WriterHistory* hist,
            WriterListener* listen = nullptr);

    RTPSWriter(
            RTPSParticipantImpl* impl,
            const GUID_t& guid,
            const WriterAttributes& att,
            const std::shared_ptr<IPayloadPool>& payload_pool,
            fastdds::rtps::FlowController* flow_controller,
            WriterHistory* hist,
            WriterListener* listen = nullptr);

    RTPSWriter(
            RTPSParticipantImpl* impl,
            const GUID_t& guid,
            const WriterAttributes& att,
            const std::shared_ptr<IPayloadPool>& payload_pool,
            const std::shared_ptr<IChangePool>& change_pool,
            fastdds::rtps::FlowController* flow_controller,
            WriterHistory* hist,
            WriterListener* listen = nullptr);

    virtual ~RTPSWriter();

public:

    template<typename T>
    CacheChange_t* new_change(
            T& data,
            ChangeKind_t changeKind,
            InstanceHandle_t handle = c_InstanceHandle_Unknown)
    {
        return new_change([data]() -> uint32_t
                       {
                           return (uint32_t)T::getCdrSerializedSize(data);
                       }, changeKind, handle);
    }

    RTPS_DllAPI CacheChange_t* new_change(
            const std::function<uint32_t()>& dataCdrSerializedSize,
            ChangeKind_t changeKind,
            InstanceHandle_t handle = c_InstanceHandle_Unknown);

    RTPS_DllAPI CacheChange_t* new_change(
            ChangeKind_t changeKind,
            InstanceHandle_t handle = c_InstanceHandle_Unknown);

    RTPS_DllAPI bool release_change(
            CacheChange_t* change);

    RTPS_DllAPI virtual bool matched_reader_add(
            const ReaderProxyData& data) = 0;

    RTPS_DllAPI virtual bool matched_reader_remove(
            const GUID_t& reader_guid) = 0;

    RTPS_DllAPI virtual bool matched_reader_is_matched(
            const GUID_t& reader_guid) = 0;

    RTPS_DllAPI virtual void reader_data_filter(
            fastdds::rtps::IReaderDataFilter* filter) = 0;

    RTPS_DllAPI virtual const fastdds::rtps::IReaderDataFilter* reader_data_filter() const = 0;

    RTPS_DllAPI virtual bool is_acked_by_all(
            const CacheChange_t* /*a_change*/) const
    {
        return false;
    }

    RTPS_DllAPI virtual bool wait_for_all_acked(
            const Duration_t& /*max_wait*/)
    {
        return true;
    }

    RTPS_DllAPI virtual void updateAttributes(
            const WriterAttributes& att) = 0;

    RTPS_DllAPI SequenceNumber_t get_seq_num_min();

    RTPS_DllAPI SequenceNumber_t get_seq_num_max();

    RTPS_DllAPI uint32_t getTypeMaxSerialized();

    uint32_t getMaxDataSize();

    uint32_t calculateMaxDataSize(
            uint32_t length);

    RTPS_DllAPI inline WriterListener* getListener()
    {
        return mp_listener;
    }

    RTPS_DllAPI inline bool set_listener(
            WriterListener* listener)
    {
        mp_listener = listener;
        return true;
    }

    RTPS_DllAPI inline bool isAsync() const
    {
        return is_async_;
    }

    RTPS_DllAPI bool remove_older_changes(
            unsigned int max = 0);

    virtual bool try_remove_change(
            const std::chrono::steady_clock::time_point& max_blocking_time_point,
            std::unique_lock<RecursiveTimedMutex>& lock) = 0;

    virtual bool wait_for_acknowledgement(
            const SequenceNumber_t& seq,
            const std::chrono::steady_clock::time_point& max_blocking_time_point,
            std::unique_lock<RecursiveTimedMutex>& lock) = 0;

#ifdef FASTDDS_STATISTICS

    RTPS_DllAPI bool add_statistics_listener(
            std::shared_ptr<fastdds::statistics::IListener> listener);

    RTPS_DllAPI bool remove_statistics_listener(
            std::shared_ptr<fastdds::statistics::IListener> listener);

    RTPS_DllAPI void set_enabled_statistics_writers_mask(
            uint32_t enabled_writers);

#endif // FASTDDS_STATISTICS

    inline RTPSParticipantImpl* getRTPSParticipant() const
    {
        return mp_RTPSParticipant;
    }

    void set_separate_sending (
            bool enable)
    {
        m_separateSendingEnabled = enable;
    }

    bool get_separate_sending () const
    {
        return m_separateSendingEnabled;
    }

    virtual bool process_acknack(
            const GUID_t& writer_guid,
            const GUID_t& reader_guid,
            uint32_t ack_count,
            const SequenceNumberSet_t& sn_set,
            bool final_flag,
            bool& result)
    {
        (void)reader_guid; (void)ack_count; (void)sn_set; (void)final_flag;

        result = false;
        return writer_guid == m_guid;
    }

    virtual bool process_nack_frag(
            const GUID_t& writer_guid,
            const GUID_t& reader_guid,
            uint32_t ack_count,
            const SequenceNumber_t& seq_num,
            const FragmentNumberSet_t fragments_state,
            bool& result)
    {
        (void)reader_guid; (void)ack_count; (void)seq_num; (void)fragments_state;

        result = false;
        return writer_guid == m_guid;
    }

    const LivelinessQosPolicyKind& get_liveliness_kind() const;

    const Duration_t& get_liveliness_lease_duration() const;

    const Duration_t& get_liveliness_announcement_period() const;

    LivelinessLostStatus liveliness_lost_status_;

    bool is_datasharing_compatible() const;

    virtual DeliveryRetCode deliver_sample_nts(
            CacheChange_t* cache_change,
            RTPSMessageGroup& group,
            LocatorSelectorSender& locator_selector,
            const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time) = 0;

    virtual LocatorSelectorSender& get_general_locator_selector() = 0;

    virtual LocatorSelectorSender& get_async_locator_selector() = 0;

    virtual bool send_nts(
            CDRMessage_t* message,
            const LocatorSelectorSender& locator_selector,
            std::chrono::steady_clock::time_point& max_blocking_time_point) const;

protected:

    bool m_pushMode = true;

    fastdds::rtps::FlowController* flow_controller_;

    WriterHistory* mp_history = nullptr;
    WriterListener* mp_listener = nullptr;
    bool is_async_ = false;
    bool m_separateSendingEnabled = false;

    LivelinessQosPolicyKind liveliness_kind_;
    Duration_t liveliness_lease_duration_;
    Duration_t liveliness_announcement_period_;

    void add_guid(
            LocatorSelectorSender& locator_selector,
            const GUID_t& remote_guid);

    void compute_selected_guids(
            LocatorSelectorSender& locator_selector);

    void update_cached_info_nts(
            LocatorSelectorSender& locator_selector);

    virtual void unsent_change_added_to_history(
            CacheChange_t* change,
            const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time) = 0;

    virtual bool change_removed_by_history(
            CacheChange_t* a_change) = 0;

    bool is_datasharing_compatible_with(
            const ReaderProxyData& rdata) const;

    bool is_pool_initialized() const;

    template<typename Functor>
    bool send_data_or_fragments(
            RTPSMessageGroup& group,
            CacheChange_t* change,
            bool inline_qos,
            Functor sent_fun)
    {
        bool sent_ok = true;

        uint32_t n_fragments = change->getFragmentCount();
        if (n_fragments > 0)
        {
            for (FragmentNumber_t frag = 1; frag <= n_fragments; frag++)
            {
                sent_ok &= group.add_data_frag(*change, frag, inline_qos);
                if (sent_ok)
                {
                    sent_fun(change, frag);
                }
                else
                {
                    EPROSIMA_LOG_ERROR(RTPS_WRITER,
                            "Error sending fragment (" << change->sequenceNumber << ", " << frag << ")");
                    break;
                }
            }
        }
        else
        {
            sent_ok = group.add_data(*change, inline_qos);
            if (sent_ok)
            {
                sent_fun(change, 0);
            }
            else
            {
                EPROSIMA_LOG_ERROR(RTPS_WRITER, "Error sending change " << change->sequenceNumber);
            }
        }

        return sent_ok;
    }

    void add_statistics_sent_submessage(
            CacheChange_t* change,
            size_t num_locators);

    void deinit();

private:

    RecursiveTimedMutex& get_mutex()
    {
        return mp_mutex;
    }

    RTPSWriter& operator =(
            const RTPSWriter&) = delete;

    void init(
            const std::shared_ptr<IPayloadPool>& payload_pool,
            const std::shared_ptr<IChangePool>& change_pool,
            const WriterAttributes& att);


    RTPSWriter* next_[2] = { nullptr, nullptr };
};

} /* namespace rtps */
} /* namespace fastrtps */
} /* namespace eprosima */

#endif /* _FASTDDS_RTPS_RTPSWRITER_H_ */