Go to the documentation of this file.00001
00023 #include <iostream>
00024 #include <string>
00025 #include "ccpp_dds_dcps.h"
00026 #include "opensplice_dds_comm/check_status.h"
00027 #include "opensplice_dds_comm/ccpp_GSDFPacket.h"
00028 #include "opensplice_dds_comm/example_main.h"
00029 #include "opensplice_dds_comm/GSDFPacket_listener.h"
00030
00031 #include "opensplice_dds_comm/subscriber.h"
00032
00033 using namespace DDS;
00034
00035 namespace opensplice_dds_comm{
00036
00037 Subscriber::Subscriber(const std::string& topic_name)
00038 {
00039 domain = 0;
00040 topic_name_ = topic_name.data();
00041 GSDFPacketTypeName = NULL;
00042
00043
00044 dpf = DomainParticipantFactory::get_instance();
00045 checkHandle(dpf.in(), "DDS::DomainParticipantFactory::get_instance");
00046 participant = dpf->create_participant (
00047 domain,
00048 PARTICIPANT_QOS_DEFAULT,
00049 NULL,
00050 STATUS_MASK_NONE);
00051 checkHandle(participant, "DDS::DomainParticipantFactory::create_participant");
00052
00053
00054 GSDFPacketTS = new GSDFPacketTypeSupport();
00055 checkHandle(GSDFPacketTS.in(), "new GSDFPacketTypeSupport");
00056 GSDFPacketTypeName = GSDFPacketTS->get_type_name();
00057 status = GSDFPacketTS->register_type(
00058 participant.in(),
00059 GSDFPacketTypeName);
00060 checkStatus(status, "NetworkPartitionsData::GSDFPacketTypeSupport::register_type");
00061
00062
00063 status = participant->get_default_topic_qos(topic_qos);
00064 checkStatus(status, "DDS::DomainParticipant::get_default_topic_qos");
00065
00066 topic_qos.reliability.kind = BEST_EFFORT_RELIABILITY_QOS;
00067
00068 topic_qos.durability_service.history_kind = KEEP_LAST_HISTORY_QOS;
00069
00070
00071
00072 status = participant->set_default_topic_qos(topic_qos);
00073 checkStatus(status, "DDS::DomainParticipant::set_default_topic_qos");
00074
00075
00076 GSDFPacketTopic = participant->create_topic(
00077 topic_name_,
00078 GSDFPacketTypeName,
00079 topic_qos,
00080 NULL,
00081 STATUS_MASK_NONE);
00082 checkHandle(GSDFPacketTopic.in(), "DDS::DomainParticipant::create_topic (GSDFPacket)");
00083
00084
00085 status = participant->get_default_subscriber_qos (sub_qos);
00086 checkStatus(status, "DDS::DomainParticipant::get_default_subscriber_qos");
00087 sub_qos.partition.name.length(1);
00088 std::string partition_name="micros_swarm_framework_partion";
00089 sub_qos.partition.name[0] = partition_name.data();
00090
00091
00092 subscriber_ = participant->create_subscriber(sub_qos, NULL, STATUS_MASK_NONE);
00093 checkHandle(subscriber_.in(), "DDS::DomainParticipant::create_subscriber");
00094
00095 status = subscriber_->get_default_datareader_qos(dr_qos);
00096 dr_qos.history.kind = KEEP_ALL_HISTORY_QOS;
00097
00098 dr_qos.destination_order.kind = BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS;
00099
00100 dr_qos.durability.kind = VOLATILE_DURABILITY_QOS;
00101
00102
00103 parentReader = subscriber_->create_datareader(
00104 GSDFPacketTopic.in(),
00105 dr_qos,
00106 NULL,
00107 STATUS_MASK_NONE);
00108 checkHandle(parentReader, "DDS::Subscriber::create_datareader");
00109
00110
00111 GSDFPacketDR = GSDFPacketDataReader::_narrow(parentReader);
00112 checkHandle(GSDFPacketDR.in(), "NetworkPartitionsData::GSDFPacketDataReader::_narrow");
00113 }
00114
00115 void Subscriber::subscribe(void (*callBack)(const GSDFPacket& packet))
00116 {
00117 GSDFPacketListener *myListener = new GSDFPacketListener();
00118 myListener->callBack_ = callBack;
00119
00120
00121
00122
00123
00124 DDS::StatusMask mask = DDS::DATA_AVAILABLE_STATUS;
00125
00126 GSDFPacketDR->set_listener(myListener, mask);
00127 }
00128
00129 void Subscriber::subscribe(boost::function<void(const GSDFPacket&)> callBack)
00130 {
00131 GSDFPacketListener *myListener = new GSDFPacketListener();
00132 myListener->callBack_ = callBack;
00133
00134
00135
00136
00137 DDS::StatusMask mask = DDS::DATA_AVAILABLE_STATUS;
00138
00139 GSDFPacketDR->set_listener(myListener, mask);
00140 }
00141
00142 Subscriber::~Subscriber()
00143 {
00144
00145 status = subscriber_->delete_datareader(GSDFPacketDR.in());
00146 checkStatus(status, "DDS::Subscriber::delete_datareader");
00147
00148
00149 status = participant->delete_subscriber(subscriber_.in());
00150 checkStatus(status, "DDS::DomainParticipant::delete_subscriber");
00151
00152
00153 status = participant->delete_topic(GSDFPacketTopic.in());
00154 checkStatus(status, "DDS::DomainParticipant::delete_topic (GSDFPacketTopic)");
00155
00156
00157 string_free(GSDFPacketTypeName);
00158
00159
00160 status = dpf->delete_participant(participant.in());
00161 checkStatus(status, "DDS::DomainParticipantFactory::delete_participant");
00162
00163
00164 }
00165 };
00166