Program Listing for File MessageReceiver.h

Return to documentation for file (/tmp/ws/src/fastrtps/include/fastdds/rtps/messages/MessageReceiver.h)

// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef _FASTDDS_RTPS_MESSAGERECEIVER_H_
#define _FASTDDS_RTPS_MESSAGERECEIVER_H_
#ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC

#include <functional>
#include <unordered_map>

#include <fastdds/rtps/common/all_common.h>
#include <fastrtps/utils/shared_mutex.hpp>

namespace eprosima {
namespace fastrtps {
namespace rtps {

class RTPSParticipantImpl;
class Endpoint;
class RTPSWriter;
class RTPSReader;
struct SubmessageHeader_t;

class MessageReceiver
{
public:

    MessageReceiver(
            RTPSParticipantImpl* participant,
            uint32_t rec_buffer_size);

    virtual ~MessageReceiver();

    void processCDRMsg(
            const Locator_t& source_locator,
            const Locator_t& reception_locator,
            CDRMessage_t* msg);

    // Functions to associate/remove associatedendpoints
    void associateEndpoint(
            Endpoint* to_add);
    void removeEndpoint(
            Endpoint* to_remove);

private:

    mutable eprosima::shared_mutex mtx_;
    std::vector<RTPSWriter*> associated_writers_;
    std::unordered_map<EntityId_t, std::vector<RTPSReader*>> associated_readers_;

    RTPSParticipantImpl* participant_;
    ProtocolVersion_t source_version_;
    VendorId_t source_vendor_id_;
    GuidPrefix_t source_guid_prefix_;
    GuidPrefix_t dest_guid_prefix_;
    bool have_timestamp_;
    Time_t timestamp_;

#if HAVE_SECURITY
    CDRMessage_t crypto_msg_;
    CDRMessage_t crypto_submsg_;
    SerializedPayload_t crypto_payload_;
#endif // if HAVE_SECURITY

    std::function<void(
                const EntityId_t&,
                CacheChange_t&)> process_data_message_function_;
    std::function<void(
                const EntityId_t&,
                CacheChange_t&,
                uint32_t,
                uint32_t,
                uint16_t)> process_data_fragment_message_function_;

    void reset();

    bool checkRTPSHeader(
            CDRMessage_t* msg);
    bool readSubmessageHeader(
            CDRMessage_t* msg,
            SubmessageHeader_t* smh) const;

    bool willAReaderAcceptMsgDirectedTo(
            const EntityId_t& readerID,
            RTPSReader*& first_reader) const;

    template<typename Functor>
    void findAllReaders(
            const EntityId_t& readerID,
            const Functor& callback) const;


    bool proc_Submsg_Data(
            CDRMessage_t* msg,
            SubmessageHeader_t* smh) const;
    bool proc_Submsg_DataFrag(
            CDRMessage_t* msg,
            SubmessageHeader_t* smh) const;
    bool proc_Submsg_Heartbeat(
            CDRMessage_t* msg,
            SubmessageHeader_t* smh) const;
    bool proc_Submsg_Acknack(
            CDRMessage_t* msg,
            SubmessageHeader_t* smh) const;
    bool proc_Submsg_Gap(
            CDRMessage_t* msg,
            SubmessageHeader_t* smh) const;
    bool proc_Submsg_InfoTS(
            CDRMessage_t* msg,
            SubmessageHeader_t* smh);
    bool proc_Submsg_InfoDST(
            CDRMessage_t* msg,
            SubmessageHeader_t* smh);
    bool proc_Submsg_InfoSRC(
            CDRMessage_t* msg,
            SubmessageHeader_t* smh);
    bool proc_Submsg_NackFrag(
            CDRMessage_t* msg,
            SubmessageHeader_t* smh) const;
    bool proc_Submsg_HeartbeatFrag(
            CDRMessage_t* msg,
            SubmessageHeader_t* smh) const;


 #if HAVE_SECURITY
    void process_data_message_with_security(
            const EntityId_t& reader_id,
            CacheChange_t& change);
#endif // HAVE_SECURITY

    void process_data_message_without_security(
            const EntityId_t& reader_id,
            CacheChange_t& change);

 #if HAVE_SECURITY
    void process_data_fragment_message_with_security(
            const EntityId_t& reader_id,
            CacheChange_t& change,
            uint32_t sample_size,
            uint32_t fragment_starting_num,
            uint16_t fragments_in_submessage);
#endif // HAVE_SECURITY

    void process_data_fragment_message_without_security(
            const EntityId_t& reader_id,
            CacheChange_t& change,
            uint32_t sample_size,
            uint32_t fragment_starting_num,
            uint16_t fragments_in_submessage);

    void notify_network_statistics(
            const Locator_t& source_locator,
            const Locator_t& reception_locator,
            CDRMessage_t* msg) const;

};

} /* namespace rtps */
} /* namespace fastrtps */
} /* namespace eprosima */

#endif // ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC
#endif /* _FASTDDS_RTPS_MESSAGERECEIVER_H_ */