subscriber.cpp
Go to the documentation of this file.
00001 
00023 #include <iostream>
00024 #include <string>
00025 #include "ccpp_dds_dcps.h"
00026 #include "opensplice_dds_broker/check_status.h"
00027 #include "opensplice_dds_broker/ccpp_GSDFPacket.h"
00028 #include "opensplice_dds_broker/example_main.h"
00029 #include "opensplice_dds_broker/GSDFPacket_listener.h"
00030 
00031 #include "opensplice_dds_broker/subscriber.h"
00032 
00033 using namespace DDS;
00034 
00035 namespace opensplice_dds_broker{
00036     
00037     Subscriber::Subscriber(const std::string& topic_name)
00038     {
00039         domain = 0;
00040         topic_name_ = topic_name.data();
00041         GSDFPacketTypeName = NULL;
00042     
00043         //Create a DomainParticipantFactory and a DomainParticipant (using Default QoS settings)
00044         dpf = DomainParticipantFactory::get_instance();
00045         checkHandle(dpf.in(), "DDS::DomainParticipantFactory::get_instance");
00046         participant = dpf->create_participant (
00047             domain,
00048             PARTICIPANT_QOS_DEFAULT,
00049             NULL,
00050             STATUS_MASK_NONE);
00051         checkHandle(participant, "DDS::DomainParticipantFactory::create_participant");
00052         
00053         //Register the required datatype for GSDFPacket
00054         GSDFPacketTS = new GSDFPacketTypeSupport();
00055         checkHandle(GSDFPacketTS.in(), "new GSDFPacketTypeSupport");
00056         GSDFPacketTypeName = GSDFPacketTS->get_type_name();
00057         status = GSDFPacketTS->register_type(
00058             participant.in(),
00059             GSDFPacketTypeName);
00060         checkStatus(status, "NetworkPartitionsData::GSDFPacketTypeSupport::register_type");
00061 
00062         //Set the ReliabilityQosPolicy to BEST_EFFORT_RELIABILITY
00063         status = participant->get_default_topic_qos(topic_qos);
00064         checkStatus(status, "DDS::DomainParticipant::get_default_topic_qos");
00065         
00066         topic_qos.reliability.kind = BEST_EFFORT_RELIABILITY_QOS;
00067         //topic_qos.reliability.kind = RELIABLE_RELIABILITY_QOS;
00068         topic_qos.durability_service.history_kind = KEEP_LAST_HISTORY_QOS;
00069         //topic_qos.durability_service.history_depth = 4000;
00070 
00071         //Make the tailored QoS the new default
00072         status = participant->set_default_topic_qos(topic_qos);
00073         checkStatus(status, "DDS::DomainParticipant::set_default_topic_qos");
00074 
00075         //Use the changed policy when defining the GSDFPacket topic
00076         GSDFPacketTopic = participant->create_topic(
00077             topic_name_,
00078             GSDFPacketTypeName,
00079             topic_qos,
00080             NULL,
00081             STATUS_MASK_NONE);
00082         checkHandle(GSDFPacketTopic.in(), "DDS::DomainParticipant::create_topic (GSDFPacket)");
00083 
00084         //Adapt the default SubscriberQos to read from the "micros_swarm_framework_partion" Partition
00085         status = participant->get_default_subscriber_qos (sub_qos);
00086         checkStatus(status, "DDS::DomainParticipant::get_default_subscriber_qos");
00087         sub_qos.partition.name.length(1);
00088         std::string partition_name="micros_swarm_framework_partion";
00089         sub_qos.partition.name[0] = partition_name.data();
00090 
00091         //Create a Subscriber for the MessageBoard application
00092         subscriber_ = participant->create_subscriber(sub_qos, NULL, STATUS_MASK_NONE);
00093         checkHandle(subscriber_.in(), "DDS::DomainParticipant::create_subscriber");
00094         
00095         status = subscriber_->get_default_datareader_qos(dr_qos);
00096         dr_qos.history.kind = KEEP_ALL_HISTORY_QOS;
00097         //dr_qos.history.kind = KEEP_LAST_HISTORY_QOS;
00098         dr_qos.destination_order.kind = BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS;
00099         //dr_qos.durability.kind = TRANSIENT_LOCAL_DURABILITY_QOS;
00100         dr_qos.durability.kind = VOLATILE_DURABILITY_QOS;
00101 
00102         //Create a DataReader for the NamedMessage Topic (using the appropriate QoS)
00103         parentReader = subscriber_->create_datareader(
00104             GSDFPacketTopic.in(),
00105             dr_qos,
00106             NULL,
00107             STATUS_MASK_NONE);
00108         checkHandle(parentReader, "DDS::Subscriber::create_datareader");
00109 
00110         //Narrow the abstract parent into its typed representative
00111         GSDFPacketDR = GSDFPacketDataReader::_narrow(parentReader);
00112         checkHandle(GSDFPacketDR.in(), "NetworkPartitionsData::GSDFPacketDataReader::_narrow");
00113     }
00114     
00115     void Subscriber::subscribe(void (*callBack)(const GSDFPacket& packet))
00116     {
00117         GSDFPacketListener *myListener = new GSDFPacketListener();
00118         myListener->callBack_ = callBack;  //set callBack function
00119         //myListener->callBack_ = boost::bind(callBack, _1);  //set callBack function
00120         //myListener->GSDFPacketDR_ = GSDFPacketDataReader::_narrow(GSDFPacketDR.in());
00121         //checkHandle(myListener->GSDFPacketDR_.in(), "GSDFPacketDataReader::_narrow");
00122 
00123         //DDS::StatusMask mask = DDS::DATA_AVAILABLE_STATUS | DDS::REQUESTED_DEADLINE_MISSED_STATUS;
00124         DDS::StatusMask mask = DDS::DATA_AVAILABLE_STATUS;
00125         //myListener->GSDFPacketDR_->set_listener(myListener, mask);
00126         GSDFPacketDR->set_listener(myListener, mask);
00127     }
00128     
00129     void Subscriber::subscribe(boost::function<void(const GSDFPacket&)> callBack)
00130     {
00131         GSDFPacketListener *myListener = new GSDFPacketListener();
00132         myListener->callBack_ = callBack;  //set callBack function
00133         //myListener->GSDFPacketDR_ = GSDFPacketDataReader::_narrow(GSDFPacketDR.in());
00134         //checkHandle(myListener->GSDFPacketDR_.in(), "GSDFPacketDataReader::_narrow");
00135 
00136         //DDS::StatusMask mask = DDS::DATA_AVAILABLE_STATUS | DDS::REQUESTED_DEADLINE_MISSED_STATUS;
00137         DDS::StatusMask mask = DDS::DATA_AVAILABLE_STATUS;
00138         //myListener->GSDFPacketDR_->set_listener(myListener, mask);
00139         GSDFPacketDR->set_listener(myListener, mask);
00140     }
00141     
00142     Subscriber::~Subscriber()
00143     {
00144         //Remove the DataReade
00145         status = subscriber_->delete_datareader(GSDFPacketDR.in());
00146         checkStatus(status, "DDS::Subscriber::delete_datareader");
00147 
00148         //Remove the Subscriber
00149         status = participant->delete_subscriber(subscriber_.in());
00150         checkStatus(status, "DDS::DomainParticipant::delete_subscriber");
00151 
00152         //Remove the Topic
00153         status = participant->delete_topic(GSDFPacketTopic.in());
00154         checkStatus(status, "DDS::DomainParticipant::delete_topic (GSDFPacketTopic)");
00155 
00156         //De-allocate the type-names
00157         string_free(GSDFPacketTypeName);
00158 
00159         //Remove the DomainParticipant
00160         status = dpf->delete_participant(participant.in());
00161         checkStatus(status, "DDS::DomainParticipantFactory::delete_participant");
00162         
00163         //cout << "Completed subscriber" << endl;
00164     }
00165 };
00166 


opensplice_dds_broker
Author(s):
autogenerated on Thu Jun 6 2019 18:52:31