Program Listing for File StatelessWriter.h

Return to documentation for file (/tmp/ws/src/fastrtps/include/fastdds/rtps/writer/StatelessWriter.h)

// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef _FASTDDS_RTPS_STATELESSWRITER_H_
#define _FASTDDS_RTPS_STATELESSWRITER_H_

#ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC

#include <fastdds/rtps/common/Time_t.h>
#include <fastdds/rtps/history/IChangePool.h>
#include <fastdds/rtps/history/IPayloadPool.h>
#include <fastdds/rtps/interfaces/IReaderDataFilter.hpp>
#include <fastdds/rtps/writer/ChangeForReader.h>
#include <fastdds/rtps/writer/ReaderLocator.h>
#include <fastdds/rtps/writer/RTPSWriter.h>
#include <fastrtps/utils/collections/ResourceLimitedVector.hpp>

#include <condition_variable>
#include <list>
#include <memory>
#include <mutex>

namespace eprosima {
namespace fastrtps {
namespace rtps {


class StatelessWriter : public RTPSWriter
{
    friend class RTPSParticipantImpl;

protected:

    StatelessWriter(
            RTPSParticipantImpl* participant,
            const GUID_t& guid,
            const WriterAttributes& attributes,
            fastdds::rtps::FlowController* flow_controller,
            WriterHistory* history,
            WriterListener* listener = nullptr);

    StatelessWriter(
            RTPSParticipantImpl* impl,
            const GUID_t& guid,
            const WriterAttributes& att,
            const std::shared_ptr<IPayloadPool>& payload_pool,
            fastdds::rtps::FlowController* flow_controller,
            WriterHistory* hist,
            WriterListener* listen = nullptr);

    StatelessWriter(
            RTPSParticipantImpl* impl,
            const GUID_t& guid,
            const WriterAttributes& att,
            const std::shared_ptr<IPayloadPool>& payload_pool,
            const std::shared_ptr<IChangePool>& change_pool,
            fastdds::rtps::FlowController* flow_controller,
            WriterHistory* hist,
            WriterListener* listen = nullptr);

public:

    virtual ~StatelessWriter();

    void unsent_change_added_to_history(
            CacheChange_t* change,
            const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time) override;

    bool change_removed_by_history(
            CacheChange_t* change) override;

    bool matched_reader_add(
            const ReaderProxyData& data) override;

    bool matched_reader_remove(
            const GUID_t& reader_guid) override;

    bool matched_reader_is_matched(
            const GUID_t& reader_guid) override;

    void reader_data_filter(
            fastdds::rtps::IReaderDataFilter* filter) final
    {
        reader_data_filter_ = filter;
    }

    const fastdds::rtps::IReaderDataFilter* reader_data_filter() const final
    {
        return reader_data_filter_;
    }

    void updateAttributes(
            const WriterAttributes& att) override
    {
        (void)att;
        //FOR NOW THERE IS NOTHING TO UPDATE.
    }

    bool set_fixed_locators(
            const LocatorList_t& locator_list);

    void unsent_changes_reset();

    bool is_acked_by_all(
            const CacheChange_t* change) const override;

    bool try_remove_change(
            const std::chrono::steady_clock::time_point&,
            std::unique_lock<RecursiveTimedMutex>&) override;

    bool wait_for_acknowledgement(
            const SequenceNumber_t& seq,
            const std::chrono::steady_clock::time_point& max_blocking_time_point,
            std::unique_lock<RecursiveTimedMutex>& lock) override;

    bool send_nts(
            CDRMessage_t* message,
            const LocatorSelectorSender& locator_selector,
            std::chrono::steady_clock::time_point& max_blocking_time_point) const override;

    inline size_t getMatchedReadersSize() const
    {
        std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
        return matched_remote_readers_.size()
               + matched_local_readers_.size()
               + matched_datasharing_readers_.size();
    }

    DeliveryRetCode deliver_sample_nts(
            CacheChange_t* cache_change,
            RTPSMessageGroup& group,
            LocatorSelectorSender& locator_selector,
            const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time) override;

    LocatorSelectorSender& get_general_locator_selector() override
    {
        return locator_selector_;
    }

    LocatorSelectorSender& get_async_locator_selector() override
    {
        return locator_selector_;
    }

private:

    void init(
            RTPSParticipantImpl* participant,
            const WriterAttributes& attributes);

    void get_builtin_guid();

    bool has_builtin_guid();

    void update_reader_info(
            bool create_sender_resources);

    bool datasharing_delivery(
            CacheChange_t* change);

    bool intraprocess_delivery(
            CacheChange_t* change,
            ReaderLocator& reader_locator);

    bool is_inline_qos_expected_ = false;
    LocatorList_t fixed_locators_;
    ResourceLimitedVector<std::unique_ptr<ReaderLocator>> matched_remote_readers_;

    std::condition_variable_any unsent_changes_cond_;

    uint64_t current_sequence_number_sent_ = 0;

    FragmentNumber_t current_fragment_sent_ = 0;

    uint64_t last_sequence_number_sent_ = 0;

    ResourceLimitedVector<std::unique_ptr<ReaderLocator>> matched_local_readers_;
    ResourceLimitedVector<std::unique_ptr<ReaderLocator>> matched_datasharing_readers_;
    ResourceLimitedVector<std::unique_ptr<ReaderLocator>> matched_readers_pool_;

    LocatorSelectorSender locator_selector_;

    fastdds::rtps::IReaderDataFilter* reader_data_filter_ = nullptr;
};

} /* namespace rtps */
} /* namespace fastrtps */
} /* namespace eprosima */

#endif // ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC
#endif /* _FASTDDS_RTPS_STATELESSWRITER_H_ */