Go to the documentation of this file.00001
00006
00007
00008
00009
00010 #include <iostream>
00011 #include <ecl/time/timestamp.hpp>
00012 #include <mm_messages.hpp>
00013 #include <nanomsg/pubsub.h>
00014 #include <errno.h>
00015 #include <nanomsg/utils/err.h>
00016 #include "../../include/mm_mux_demux/mux.hpp"
00017
00018
00019
00020
00021
00022 namespace mm_mux_demux {
00023 namespace impl {
00024
00025
00026
00027
00028
00029 MessageMux::MessageMux(const std::string& name,
00030 const std::string& url,
00031 const mm_messages::Verbosity::Level& verbosity,
00032 const bool bind) :
00033 name(name),
00034 url(url),
00035 verbosity(verbosity)
00036 {
00037 socket = nn_socket (AF_SP, NN_PUB);
00038 if ( socket < 0 ) {
00039
00040 std::cout << "Mux socket error: " << nn_strerror(errno) << " [" << nn_errno() << "][" << name << "][" << url << "]" << std::endl;
00041 }
00042 nn_setsockopt (socket, NN_SOL_SOCKET, NN_SOCKET_NAME, name.c_str(), name.size());
00043 if ( bind ) {
00044 endpoint_id = nn_bind(socket, url.c_str());
00045 } else {
00046 endpoint_id = nn_connect(socket, url.c_str());
00047 }
00048 if (endpoint_id < 0) {
00049
00050 std::cout << "Mux bind error: " << nn_strerror(errno) << " [" << nn_errno() << "][" << name << "][" << url << "]" << std::endl;
00051 }
00052 if (verbosity > mm_messages::Verbosity::QUIET) {
00053 std::cout << "[" << ecl::TimeStamp() << "] MessageMux : [" << name << "][" << url << "][" << socket << "][" << endpoint_id << "]";
00054 if ( bind ) {
00055 std::cout << "[bind]" << std::endl;
00056 } else {
00057 std::cout << "[connect]" << std::endl;
00058 }
00059 }
00060 }
00061
00062 MessageMux::~MessageMux() {
00063 if ( socket > 0 ) {
00064 nn_shutdown (socket, endpoint_id);
00065 }
00066 }
00067
00068 int MessageMux::send(const unsigned int& id, const mm_messages::ByteArray& msg_buffer) {
00069
00070 mm_messages::ByteArray buffer;
00071 mm_messages::Message<mm_messages::PacketHeader>::encode(mm_messages::PacketHeader(), buffer);
00072 mm_messages::Message<mm_messages::SubPacketHeader>::encode(mm_messages::SubPacketHeader(id, msg_buffer.size()), buffer);
00073 buffer.insert(buffer.end(), msg_buffer.begin(), msg_buffer.end());
00074
00075 if (verbosity > mm_messages::Verbosity::LOW) {
00076 std::cout << "[" << ecl::TimeStamp() << "] Mux: [" << id << "][" << buffer.size() << "][";
00077 std::cout << std::hex;
00078 for(unsigned int i=0; i < buffer.size(); ++i ) {
00079 std::cout << static_cast<unsigned int>(buffer[i]) << " ";
00080 }
00081 std::cout << std::dec;
00082 std::cout << "]" << std::endl;
00083 }
00084 int result = ::nn_send(socket, buffer.data(), buffer.size(), 0);
00085
00086 return 0;
00087 }
00088
00089 }
00090 }
00091
00092
00093
00094
00095
00096
00097 namespace mm_mux_demux {
00098
00099
00100
00101
00107 void MessageMux::start(const std::string& name,
00108 const std::string& url,
00109 const mm_messages::Verbosity::Level& verbosity,
00110 const bool bind) {
00111 MuxMapConstIterator iter = multiplexers().find(name);
00112 if ( iter == multiplexers().end() ) {
00113 if (url.empty()) {
00114
00115 } else {
00116 std::pair<MuxMapIterator,bool> result;
00117 result = multiplexers().insert(
00118 MuxMapPair(name, std::make_shared<impl::MessageMux>(name, url, verbosity, bind)));
00119 }
00120 } else if ( !url.empty() ) {
00121
00122 }
00123 }
00124
00125 void MessageMux::shutdown(const std::string& name) {
00126 multiplexers().erase(name);
00127 }
00128
00129 void MessageMux::shutdown() {
00130 multiplexers().clear();
00131 }
00132
00133 MessageMux::MuxMap& MessageMux::multiplexers() {
00134 static MessageMux::MuxMap map;
00135 return map;
00136 }
00137
00138 int MessageMux::send(const std::string& name, const unsigned int& id, const mm_messages::ByteArray& msg_buffer) {
00139 MuxMapIterator iter = multiplexers().find(name);
00140 if ( iter != multiplexers().end() ) {
00141 return (iter->second)->send(id, msg_buffer);
00142 } else {
00143
00144 std::cout << "Mux : no mux by that name found (while trying to send)"<< std::endl;
00145 return NotAvailable;
00146 }
00147 }
00148
00149 }