Program Listing for File StatelessWriter.h
↰ Return to documentation for file (/tmp/ws/src/fastrtps/include/fastdds/rtps/writer/StatelessWriter.h
)
// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef _FASTDDS_RTPS_STATELESSWRITER_H_
#define _FASTDDS_RTPS_STATELESSWRITER_H_
#ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC
#include <fastdds/rtps/common/Time_t.h>
#include <fastdds/rtps/history/IChangePool.h>
#include <fastdds/rtps/history/IPayloadPool.h>
#include <fastdds/rtps/interfaces/IReaderDataFilter.hpp>
#include <fastdds/rtps/writer/ChangeForReader.h>
#include <fastdds/rtps/writer/ReaderLocator.h>
#include <fastdds/rtps/writer/RTPSWriter.h>
#include <fastrtps/utils/collections/ResourceLimitedVector.hpp>
#include <condition_variable>
#include <list>
#include <memory>
#include <mutex>
namespace eprosima {
namespace fastrtps {
namespace rtps {
class StatelessWriter : public RTPSWriter
{
friend class RTPSParticipantImpl;
protected:
StatelessWriter(
RTPSParticipantImpl* participant,
const GUID_t& guid,
const WriterAttributes& attributes,
fastdds::rtps::FlowController* flow_controller,
WriterHistory* history,
WriterListener* listener = nullptr);
StatelessWriter(
RTPSParticipantImpl* impl,
const GUID_t& guid,
const WriterAttributes& att,
const std::shared_ptr<IPayloadPool>& payload_pool,
fastdds::rtps::FlowController* flow_controller,
WriterHistory* hist,
WriterListener* listen = nullptr);
StatelessWriter(
RTPSParticipantImpl* impl,
const GUID_t& guid,
const WriterAttributes& att,
const std::shared_ptr<IPayloadPool>& payload_pool,
const std::shared_ptr<IChangePool>& change_pool,
fastdds::rtps::FlowController* flow_controller,
WriterHistory* hist,
WriterListener* listen = nullptr);
public:
virtual ~StatelessWriter();
void unsent_change_added_to_history(
CacheChange_t* change,
const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time) override;
bool change_removed_by_history(
CacheChange_t* change) override;
bool matched_reader_add(
const ReaderProxyData& data) override;
bool matched_reader_remove(
const GUID_t& reader_guid) override;
bool matched_reader_is_matched(
const GUID_t& reader_guid) override;
void reader_data_filter(
fastdds::rtps::IReaderDataFilter* filter) final
{
reader_data_filter_ = filter;
}
const fastdds::rtps::IReaderDataFilter* reader_data_filter() const final
{
return reader_data_filter_;
}
void updateAttributes(
const WriterAttributes& att) override
{
(void)att;
//FOR NOW THERE IS NOTHING TO UPDATE.
}
bool set_fixed_locators(
const LocatorList_t& locator_list);
void unsent_changes_reset();
bool is_acked_by_all(
const CacheChange_t* change) const override;
bool try_remove_change(
const std::chrono::steady_clock::time_point&,
std::unique_lock<RecursiveTimedMutex>&) override;
bool wait_for_acknowledgement(
const SequenceNumber_t& seq,
const std::chrono::steady_clock::time_point& max_blocking_time_point,
std::unique_lock<RecursiveTimedMutex>& lock) override;
bool send_nts(
CDRMessage_t* message,
const LocatorSelectorSender& locator_selector,
std::chrono::steady_clock::time_point& max_blocking_time_point) const override;
inline size_t getMatchedReadersSize() const
{
std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
return matched_remote_readers_.size()
+ matched_local_readers_.size()
+ matched_datasharing_readers_.size();
}
DeliveryRetCode deliver_sample_nts(
CacheChange_t* cache_change,
RTPSMessageGroup& group,
LocatorSelectorSender& locator_selector,
const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time) override;
LocatorSelectorSender& get_general_locator_selector() override
{
return locator_selector_;
}
LocatorSelectorSender& get_async_locator_selector() override
{
return locator_selector_;
}
private:
void init(
RTPSParticipantImpl* participant,
const WriterAttributes& attributes);
void get_builtin_guid();
bool has_builtin_guid();
void update_reader_info(
bool create_sender_resources);
bool datasharing_delivery(
CacheChange_t* change);
bool intraprocess_delivery(
CacheChange_t* change,
ReaderLocator& reader_locator);
bool is_inline_qos_expected_ = false;
LocatorList_t fixed_locators_;
ResourceLimitedVector<std::unique_ptr<ReaderLocator>> matched_remote_readers_;
std::condition_variable_any unsent_changes_cond_;
uint64_t current_sequence_number_sent_ = 0;
FragmentNumber_t current_fragment_sent_ = 0;
uint64_t last_sequence_number_sent_ = 0;
ResourceLimitedVector<std::unique_ptr<ReaderLocator>> matched_local_readers_;
ResourceLimitedVector<std::unique_ptr<ReaderLocator>> matched_datasharing_readers_;
ResourceLimitedVector<std::unique_ptr<ReaderLocator>> matched_readers_pool_;
LocatorSelectorSender locator_selector_;
fastdds::rtps::IReaderDataFilter* reader_data_filter_ = nullptr;
};
} /* namespace rtps */
} /* namespace fastrtps */
} /* namespace eprosima */
#endif // ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC
#endif /* _FASTDDS_RTPS_STATELESSWRITER_H_ */