demux.cpp
Go to the documentation of this file.
00001 
00006 /*****************************************************************************
00007 ** Includes
00008 *****************************************************************************/
00009 
00010 #include <ecl/formatters.hpp>
00011 #include <iostream>
00012 #include <mm_messages.hpp>
00013 #include <nanomsg/nn.h>
00014 #include <nanomsg/pubsub.h>
00015 #include "../../include/mm_mux_demux/demux.hpp"
00016 
00017 /*****************************************************************************
00018 ** Namespaces
00019 *****************************************************************************/
00020 
00021 namespace mm_mux_demux {
00022 namespace impl {
00023 
00024 /*****************************************************************************
00025 ** Implementation
00026 *****************************************************************************/
00027 
00028 MessageDemux::MessageDemux(const std::string& name,
00029                            const std::string& url,
00030                            const mm_messages::Verbosity::Level& verbosity,
00031                            const bool bind
00032                           ) :
00033     name(name),
00034     url(url),
00035     socket(0),
00036     verbosity(verbosity),
00037     shutdown_requested(false),
00038     thread() // defer start of the thread
00039 {
00040   socket = nn_socket (AF_SP, NN_SUB);
00041   if ( socket < 0 ) {
00042     // TODO : throw exception
00043     std::cout << "Demux socket error: " << nn_strerror(errno) << " [" << nn_errno() << "][" << name << "][" << url << "]" << std::endl;
00044   }
00045   nn_setsockopt (socket, NN_SUB, NN_SUB_SUBSCRIBE, "", 0);
00046   nn_setsockopt (socket, NN_SOL_SOCKET, NN_SOCKET_NAME, name.c_str(), name.size());
00047   int timeout = 100; // timeout of 10ms (facilitates shutdown).
00048   nn_setsockopt (socket, NN_SOL_SOCKET, NN_RCVTIMEO, &timeout, sizeof(timeout));
00049   if ( bind ) {
00050     endpoint_id = nn_bind(socket, url.c_str());
00051   } else {
00052     endpoint_id = nn_connect(socket, url.c_str());
00053   }
00054   if (endpoint_id < 0) {
00055     // TODO : throw exception
00056     std::cout << "Demux connect error: " << nn_strerror(errno) << " [" << nn_errno() << "][" << name << "][" << url << "]" << std::endl;
00057   }
00058   if (verbosity > mm_messages::Verbosity::QUIET) {
00059 
00060     std::cout << "[" << ecl::TimeStamp() << "] MessageDemux : [" << name << "][" << url << "][" << socket << "][" << endpoint_id << "]";
00061     if ( bind ) {
00062       std::cout << "[bind]" << std::endl;
00063     } else {
00064       std::cout << "[connect]" << std::endl;
00065     }
00066   }
00067   // std::thread call, need c++11
00068   // thread = std::thread(&MessageDemux::spin, this);
00069   // ecl::Thread call
00070   thread.start(&MessageDemux::spin, *this);
00071 }
00072 
00073 MessageDemux::MessageDemux(const MessageDemux& other) {
00074   socket = other.socket;
00075   name = other.name;
00076   verbosity = other.verbosity;
00077   shutdown_requested = other.shutdown_requested;
00078   endpoint_id = other.endpoint_id;
00079   // this bugger forced us to write the copy constructor...get 'use of deleted function' otherwise
00080   std::move(other.thread);
00081 }
00082 
00083 MessageDemux::~MessageDemux()
00084 {
00085   mutex.lock();
00086   // c11 call
00087   // for(auto& pair : subscribers) {
00088   //     delete pair.second;
00089   // }
00090   for (SubscriberMapIterator iter = subscribers.begin(); iter != subscribers.end(); ++iter) {
00091     delete iter->second;
00092   }
00093   subscribers.clear();
00094   mutex.unlock();
00095 
00096   shutdown();
00097 }
00098 
00099 void MessageDemux::spin() {
00100   while (!shutdown_requested)
00101   {
00102     unsigned char *buffer = NULL;
00103     int bytes = ::nn_recv(socket, &buffer, NN_MSG, 0);
00104     if ( bytes < 0 ) {
00105       // We set socket options in the constructor to timeout as opposed to default infinite blocking
00106       if (nn_errno() == EAGAIN) {
00107         continue;
00108       }
00109       // TODO handle errors : http://nanomsg.org/v0.4/nn_recv.3.html
00110     }
00111     mm_messages::PacketHeader header = mm_messages::Message<mm_messages::PacketHeader>::decode(buffer, mm_messages::PacketHeader::size);
00112     mm_messages::SubPacketHeader subheader = mm_messages::Message<mm_messages::SubPacketHeader>::decode(buffer + mm_messages::PacketHeader::size, mm_messages::SubPacketHeader::size);
00113     if ( verbosity > mm_messages::Verbosity::QUIET ) {
00114       std::cout << "[" << ecl::TimeStamp() << "] Demux: [" << subheader.id << "]";
00115       std::cout << "[" << bytes << "][";
00116       if ( verbosity > mm_messages::Verbosity::LOW ) {
00117         std::cout << std::hex;
00118         for(unsigned int i=0; i < bytes; ++i ) {
00119           std::cout << static_cast<unsigned int>(*(buffer+i)) << " ";
00120         }
00121         std::cout << std::dec;
00122         std::cout << "]";
00123       }
00124       std::cout << std::endl;
00125     }
00126     mutex.lock();
00127     SubscriberMapIterator iter = subscribers.find(subheader.id);
00128     if (iter != subscribers.end()) {
00129       (*(iter->second))(buffer + mm_messages::PacketHeader::size + mm_messages::SubPacketHeader::size, bytes - mm_messages::PacketHeader::size - mm_messages::SubPacketHeader::size);
00130     }
00131     mutex.unlock();
00132     nn_freemsg (buffer);
00133   }
00134 }
00140 void MessageDemux::shutdown() {
00141   if ( !shutdown_requested ) {
00142     shutdown_requested = true;
00143     thread.join();
00144   }
00145   if ( socket > 0 ) {
00146     int result = nn_shutdown (socket, endpoint_id);
00147   }
00148 }
00149 
00150 void MessageDemux::unregisterSubscriber(const unsigned int& id)
00151 {
00152   mutex.lock();
00153   subscribers.erase(id);
00154   mutex.unlock();
00155 }
00156 
00157 } // namespace impl
00158 } // mm_mux_demux
00159 
00160 /*****************************************************************************
00161 ** Namespaces
00162 *****************************************************************************/
00163 
00164 namespace mm_mux_demux {
00165 
00166 /*****************************************************************************
00167 ** Globals
00168 *****************************************************************************/
00169 
00170 void MessageDemux::start(const std::string& name,
00171                          const std::string& url,
00172                          const mm_messages::Verbosity::Level& verbosity,
00173                          const bool bind
00174                         )
00175 {
00176   DemuxMapConstIterator iter = demultiplexers().find(name);
00177   if ( iter == demultiplexers().end() ) {
00178     std::pair<DemuxMapIterator,bool> result;
00179     result = demultiplexers().insert(
00180         DemuxMapPair(name, std::make_shared<impl::MessageDemux>(name, url, verbosity, bind))
00181     );
00182   } else if ( !url.empty() ) {
00183     // TODO : throw an exception, name-url already present.
00184   }
00185 }
00186 
00187 void MessageDemux::shutdown() {
00188   DemuxMapIterator iter = demultiplexers().begin();
00189   for (iter; iter != demultiplexers().end(); ++iter) {
00190     (iter->second)->shutdown();
00191   }
00192   // TODO : delete from map
00193 }
00194 
00195 void MessageDemux::shutdown(const std::string& name) {
00196   DemuxMapConstIterator iter = demultiplexers().find(name);
00197   if ( iter != demultiplexers().end() ) {
00198     (iter->second)->shutdown();
00199   }
00200   // TODO : delete from map
00201 }
00202 
00203 MessageDemux::DemuxMap& MessageDemux::demultiplexers()
00204 {
00205   static MessageDemux::DemuxMap map;  // string - demux storage in an invisible location
00206   return map;
00207 }
00208 
00209 void MessageDemux::unregisterSubscriber(const std::string& name, const unsigned int& id)
00210 {
00211   DemuxMapIterator iter = demultiplexers().find(name);
00212   if ( iter != demultiplexers().end() ) {
00213     (iter->second)->unregisterSubscriber(id);
00214   } else {
00215     std::cout << "Demux : no demux by that name found (while unregistering subscriber)"<< std::endl;
00216   }
00217 }
00218 
00219 
00220 
00221 } // mm_mux_demux


mm_mux_demux
Author(s): Daniel Stonier
autogenerated on Thu Jun 6 2019 21:13:22