mux.cpp
Go to the documentation of this file.
00001 
00006 /*****************************************************************************
00007 ** Includes
00008 *****************************************************************************/
00009 
00010 #include <iostream>
00011 #include <ecl/time/timestamp.hpp>
00012 #include <mm_messages.hpp>
00013 #include <nanomsg/pubsub.h>
00014 #include <errno.h>
00015 #include <nanomsg/utils/err.h>
00016 #include "../../include/mm_mux_demux/mux.hpp"
00017 
00018 /*****************************************************************************
00019 ** Namespaces
00020 *****************************************************************************/
00021 
00022 namespace mm_mux_demux {
00023 namespace impl {
00024 
00025 /*****************************************************************************
00026  ** Implementation
00027  *****************************************************************************/
00028 
00029 MessageMux::MessageMux(const std::string& name,
00030                        const std::string& url,
00031                        const mm_messages::Verbosity::Level& verbosity,
00032                        const bool bind) :
00033   name(name),
00034   url(url),
00035   verbosity(verbosity)
00036 {
00037   socket = nn_socket (AF_SP, NN_PUB);
00038   if ( socket < 0 ) {
00039     // TODO : throw exception
00040     std::cout << "Mux socket error: " << nn_strerror(errno) << " [" << nn_errno() << "][" << name << "][" << url << "]" << std::endl;
00041   }
00042   nn_setsockopt (socket, NN_SOL_SOCKET, NN_SOCKET_NAME, name.c_str(), name.size());
00043   if ( bind ) {
00044     endpoint_id = nn_bind(socket, url.c_str());
00045   } else {
00046     endpoint_id = nn_connect(socket, url.c_str());
00047   }
00048   if (endpoint_id < 0) {
00049     // TODO : throw exception
00050     std::cout << "Mux bind error: " << nn_strerror(errno) << " [" << nn_errno() << "][" << name << "][" << url << "]" << std::endl;
00051   }
00052   if (verbosity > mm_messages::Verbosity::QUIET) {
00053     std::cout << "[" << ecl::TimeStamp() << "] MessageMux : [" << name << "][" << url << "][" << socket << "][" << endpoint_id << "]";
00054     if ( bind ) {
00055       std::cout << "[bind]" << std::endl;
00056     } else {
00057       std::cout << "[connect]" << std::endl;
00058     }
00059   }
00060 }
00061 
00062 MessageMux::~MessageMux() {
00063   if ( socket > 0 ) {
00064     nn_shutdown (socket, endpoint_id);
00065   }
00066 }
00067 
00068 int MessageMux::send(const unsigned int& id, const mm_messages::ByteArray& msg_buffer) {
00069   // this is identical to radio's send, could be collapsed
00070   mm_messages::ByteArray buffer;
00071   mm_messages::Message<mm_messages::PacketHeader>::encode(mm_messages::PacketHeader(), buffer);
00072   mm_messages::Message<mm_messages::SubPacketHeader>::encode(mm_messages::SubPacketHeader(id, msg_buffer.size()), buffer);
00073   buffer.insert(buffer.end(), msg_buffer.begin(), msg_buffer.end());
00074 
00075   if (verbosity > mm_messages::Verbosity::LOW) {
00076     std::cout << "[" << ecl::TimeStamp() << "] Mux: [" << id << "][" << buffer.size() << "][";
00077     std::cout << std::hex;
00078     for(unsigned int i=0; i < buffer.size(); ++i ) {
00079       std::cout << static_cast<unsigned int>(buffer[i]) << " ";
00080     }
00081     std::cout << std::dec;
00082     std::cout << "]" << std::endl;
00083   }
00084   int result = ::nn_send(socket, buffer.data(), buffer.size(), 0); // last option is flags, only NN_DONTWAIT available
00085   // TODO : lots of error flags to check here
00086   return 0;
00087 }
00088 
00089 } // namespace impl
00090 } // mm_mux_demux
00091 
00092 
00093 /*****************************************************************************
00094 ** Namespaces
00095 *****************************************************************************/
00096 
00097 namespace mm_mux_demux {
00098 
00099 /*****************************************************************************
00100 ** Global Statics
00101 *****************************************************************************/
00107 void MessageMux::start(const std::string& name,
00108                              const std::string& url,
00109                              const mm_messages::Verbosity::Level& verbosity,
00110                              const bool bind) {
00111   MuxMapConstIterator iter = multiplexers().find(name);
00112   if ( iter == multiplexers().end() ) {
00113     if (url.empty()) {
00114       // TODO : throw an exception
00115     } else {
00116       std::pair<MuxMapIterator,bool> result;
00117       result = multiplexers().insert(
00118           MuxMapPair(name, std::make_shared<impl::MessageMux>(name, url, verbosity, bind)));
00119     }
00120   } else if ( !url.empty() ) {
00121     // TODO : throw an exception, name-url already present.
00122   }
00123 }
00124 
00125 void MessageMux::shutdown(const std::string& name) {
00126   multiplexers().erase(name);
00127 }
00128 
00129 void MessageMux::shutdown() {
00130   multiplexers().clear();
00131 }
00132 
00133 MessageMux::MuxMap& MessageMux::multiplexers() {
00134   static MessageMux::MuxMap map;
00135   return map;
00136 }
00137 
00138 int MessageMux::send(const std::string& name, const unsigned int& id, const mm_messages::ByteArray& msg_buffer) {
00139   MuxMapIterator iter = multiplexers().find(name);
00140   if ( iter != multiplexers().end() ) {
00141     return (iter->second)->send(id, msg_buffer);
00142   } else {
00143     // exceptions exceptions...
00144     std::cout << "Mux : no mux by that name found (while trying to send)"<< std::endl;
00145     return NotAvailable;
00146   }
00147 }
00148 
00149 } // namespace mm_mux_demux


mm_mux_demux
Author(s): Daniel Stonier
autogenerated on Thu Jun 6 2019 21:13:22