Go to the documentation of this file.00001
00006
00007
00008
00009
00010 #include <ecl/formatters.hpp>
00011 #include <iostream>
00012 #include <mm_messages.hpp>
00013 #include <nanomsg/nn.h>
00014 #include <nanomsg/pubsub.h>
00015 #include "../../include/mm_mux_demux/demux.hpp"
00016
00017
00018
00019
00020
00021 namespace mm_mux_demux {
00022 namespace impl {
00023
00024
00025
00026
00027
00028 MessageDemux::MessageDemux(const std::string& name,
00029 const std::string& url,
00030 const mm_messages::Verbosity::Level& verbosity,
00031 const bool bind
00032 ) :
00033 name(name),
00034 url(url),
00035 socket(0),
00036 verbosity(verbosity),
00037 shutdown_requested(false),
00038 thread()
00039 {
00040 socket = nn_socket (AF_SP, NN_SUB);
00041 if ( socket < 0 ) {
00042
00043 std::cout << "Demux socket error: " << nn_strerror(errno) << " [" << nn_errno() << "][" << name << "][" << url << "]" << std::endl;
00044 }
00045 nn_setsockopt (socket, NN_SUB, NN_SUB_SUBSCRIBE, "", 0);
00046 nn_setsockopt (socket, NN_SOL_SOCKET, NN_SOCKET_NAME, name.c_str(), name.size());
00047 int timeout = 100;
00048 nn_setsockopt (socket, NN_SOL_SOCKET, NN_RCVTIMEO, &timeout, sizeof(timeout));
00049 if ( bind ) {
00050 endpoint_id = nn_bind(socket, url.c_str());
00051 } else {
00052 endpoint_id = nn_connect(socket, url.c_str());
00053 }
00054 if (endpoint_id < 0) {
00055
00056 std::cout << "Demux connect error: " << nn_strerror(errno) << " [" << nn_errno() << "][" << name << "][" << url << "]" << std::endl;
00057 }
00058 if (verbosity > mm_messages::Verbosity::QUIET) {
00059
00060 std::cout << "[" << ecl::TimeStamp() << "] MessageDemux : [" << name << "][" << url << "][" << socket << "][" << endpoint_id << "]";
00061 if ( bind ) {
00062 std::cout << "[bind]" << std::endl;
00063 } else {
00064 std::cout << "[connect]" << std::endl;
00065 }
00066 }
00067
00068
00069
00070 thread.start(&MessageDemux::spin, *this);
00071 }
00072
00073 MessageDemux::MessageDemux(const MessageDemux& other) {
00074 socket = other.socket;
00075 name = other.name;
00076 verbosity = other.verbosity;
00077 shutdown_requested = other.shutdown_requested;
00078 endpoint_id = other.endpoint_id;
00079
00080 std::move(other.thread);
00081 }
00082
00083 MessageDemux::~MessageDemux()
00084 {
00085 mutex.lock();
00086
00087
00088
00089
00090 for (SubscriberMapIterator iter = subscribers.begin(); iter != subscribers.end(); ++iter) {
00091 delete iter->second;
00092 }
00093 subscribers.clear();
00094 mutex.unlock();
00095
00096 shutdown();
00097 }
00098
00099 void MessageDemux::spin() {
00100 while (!shutdown_requested)
00101 {
00102 unsigned char *buffer = NULL;
00103 int bytes = ::nn_recv(socket, &buffer, NN_MSG, 0);
00104 if ( bytes < 0 ) {
00105
00106 if (nn_errno() == EAGAIN) {
00107 continue;
00108 }
00109
00110 }
00111 mm_messages::PacketHeader header = mm_messages::Message<mm_messages::PacketHeader>::decode(buffer, mm_messages::PacketHeader::size);
00112 mm_messages::SubPacketHeader subheader = mm_messages::Message<mm_messages::SubPacketHeader>::decode(buffer + mm_messages::PacketHeader::size, mm_messages::SubPacketHeader::size);
00113 if ( verbosity > mm_messages::Verbosity::QUIET ) {
00114 std::cout << "[" << ecl::TimeStamp() << "] Demux: [" << subheader.id << "]";
00115 std::cout << "[" << bytes << "][";
00116 if ( verbosity > mm_messages::Verbosity::LOW ) {
00117 std::cout << std::hex;
00118 for(unsigned int i=0; i < bytes; ++i ) {
00119 std::cout << static_cast<unsigned int>(*(buffer+i)) << " ";
00120 }
00121 std::cout << std::dec;
00122 std::cout << "]";
00123 }
00124 std::cout << std::endl;
00125 }
00126 mutex.lock();
00127 SubscriberMapIterator iter = subscribers.find(subheader.id);
00128 if (iter != subscribers.end()) {
00129 (*(iter->second))(buffer + mm_messages::PacketHeader::size + mm_messages::SubPacketHeader::size, bytes - mm_messages::PacketHeader::size - mm_messages::SubPacketHeader::size);
00130 }
00131 mutex.unlock();
00132 nn_freemsg (buffer);
00133 }
00134 }
00140 void MessageDemux::shutdown() {
00141 if ( !shutdown_requested ) {
00142 shutdown_requested = true;
00143 thread.join();
00144 }
00145 if ( socket > 0 ) {
00146 int result = nn_shutdown (socket, endpoint_id);
00147 }
00148 }
00149
00150 void MessageDemux::unregisterSubscriber(const unsigned int& id)
00151 {
00152 mutex.lock();
00153 subscribers.erase(id);
00154 mutex.unlock();
00155 }
00156
00157 }
00158 }
00159
00160
00161
00162
00163
00164 namespace mm_mux_demux {
00165
00166
00167
00168
00169
00170 void MessageDemux::start(const std::string& name,
00171 const std::string& url,
00172 const mm_messages::Verbosity::Level& verbosity,
00173 const bool bind
00174 )
00175 {
00176 DemuxMapConstIterator iter = demultiplexers().find(name);
00177 if ( iter == demultiplexers().end() ) {
00178 std::pair<DemuxMapIterator,bool> result;
00179 result = demultiplexers().insert(
00180 DemuxMapPair(name, std::make_shared<impl::MessageDemux>(name, url, verbosity, bind))
00181 );
00182 } else if ( !url.empty() ) {
00183
00184 }
00185 }
00186
00187 void MessageDemux::shutdown() {
00188 DemuxMapIterator iter = demultiplexers().begin();
00189 for (iter; iter != demultiplexers().end(); ++iter) {
00190 (iter->second)->shutdown();
00191 }
00192
00193 }
00194
00195 void MessageDemux::shutdown(const std::string& name) {
00196 DemuxMapConstIterator iter = demultiplexers().find(name);
00197 if ( iter != demultiplexers().end() ) {
00198 (iter->second)->shutdown();
00199 }
00200
00201 }
00202
00203 MessageDemux::DemuxMap& MessageDemux::demultiplexers()
00204 {
00205 static MessageDemux::DemuxMap map;
00206 return map;
00207 }
00208
00209 void MessageDemux::unregisterSubscriber(const std::string& name, const unsigned int& id)
00210 {
00211 DemuxMapIterator iter = demultiplexers().find(name);
00212 if ( iter != demultiplexers().end() ) {
00213 (iter->second)->unregisterSubscriber(id);
00214 } else {
00215 std::cout << "Demux : no demux by that name found (while unregistering subscriber)"<< std::endl;
00216 }
00217 }
00218
00219
00220
00221 }