mux.cpp
Go to the documentation of this file.
1 
6 /*****************************************************************************
7 ** Includes
8 *****************************************************************************/
9 
10 #include <iostream>
11 #include <ecl/time/timestamp.hpp>
12 #include <mm_messages.hpp>
13 #include <nanomsg/pubsub.h>
14 #include <errno.h>
15 #include <nanomsg/utils/err.h>
16 #include "../../include/mm_mux_demux/mux.hpp"
17 
18 /*****************************************************************************
19 ** Namespaces
20 *****************************************************************************/
21 
22 namespace mm_mux_demux {
23 namespace impl {
24 
25 /*****************************************************************************
26  ** Implementation
27  *****************************************************************************/
28 
29 MessageMux::MessageMux(const std::string& name,
30  const std::string& url,
31  const mm_messages::Verbosity::Level& verbosity,
32  const bool bind) :
33  name(name),
34  url(url),
35  verbosity(verbosity)
36 {
37  socket = nn_socket (AF_SP, NN_PUB);
38  if ( socket < 0 ) {
39  // TODO : throw exception
40  std::cout << "Mux socket error: " << nn_strerror(errno) << " [" << nn_errno() << "][" << name << "][" << url << "]" << std::endl;
41  }
42  nn_setsockopt (socket, NN_SOL_SOCKET, NN_SOCKET_NAME, name.c_str(), name.size());
43  if ( bind ) {
44  endpoint_id = nn_bind(socket, url.c_str());
45  } else {
46  endpoint_id = nn_connect(socket, url.c_str());
47  }
48  if (endpoint_id < 0) {
49  // TODO : throw exception
50  std::cout << "Mux bind error: " << nn_strerror(errno) << " [" << nn_errno() << "][" << name << "][" << url << "]" << std::endl;
51  }
52  if (verbosity > mm_messages::Verbosity::QUIET) {
53  std::cout << "[" << ecl::TimeStamp() << "] MessageMux : [" << name << "][" << url << "][" << socket << "][" << endpoint_id << "]";
54  if ( bind ) {
55  std::cout << "[bind]" << std::endl;
56  } else {
57  std::cout << "[connect]" << std::endl;
58  }
59  }
60 }
61 
63  if ( socket > 0 ) {
64  nn_shutdown (socket, endpoint_id);
65  }
66 }
67 
68 int MessageMux::send(const unsigned int& id, const mm_messages::ByteArray& msg_buffer) {
69  // this is identical to radio's send, could be collapsed
73  buffer.insert(buffer.end(), msg_buffer.begin(), msg_buffer.end());
74 
76  std::cout << "[" << ecl::TimeStamp() << "] Mux: [" << id << "][" << buffer.size() << "][";
77  std::cout << std::hex;
78  for(unsigned int i=0; i < buffer.size(); ++i ) {
79  std::cout << static_cast<unsigned int>(buffer[i]) << " ";
80  }
81  std::cout << std::dec;
82  std::cout << "]" << std::endl;
83  }
84  int result = ::nn_send(socket, buffer.data(), buffer.size(), 0); // last option is flags, only NN_DONTWAIT available
85  // TODO : lots of error flags to check here
86  return 0;
87 }
88 
89 } // namespace impl
90 } // mm_mux_demux
91 
92 
93 /*****************************************************************************
94 ** Namespaces
95 *****************************************************************************/
96 
97 namespace mm_mux_demux {
98 
99 /*****************************************************************************
100 ** Global Statics
101 *****************************************************************************/
107 void MessageMux::start(const std::string& name,
108  const std::string& url,
110  const bool bind) {
111  MuxMapConstIterator iter = multiplexers().find(name);
112  if ( iter == multiplexers().end() ) {
113  if (url.empty()) {
114  // TODO : throw an exception
115  } else {
116  std::pair<MuxMapIterator,bool> result;
117  result = multiplexers().insert(
118  MuxMapPair(name, std::make_shared<impl::MessageMux>(name, url, verbosity, bind)));
119  }
120  } else if ( !url.empty() ) {
121  // TODO : throw an exception, name-url already present.
122  }
123 }
124 
125 void MessageMux::shutdown(const std::string& name) {
126  multiplexers().erase(name);
127 }
128 
130  multiplexers().clear();
131 }
132 
134  static MessageMux::MuxMap map;
135  return map;
136 }
137 
138 int MessageMux::send(const std::string& name, const unsigned int& id, const mm_messages::ByteArray& msg_buffer) {
139  MuxMapIterator iter = multiplexers().find(name);
140  if ( iter != multiplexers().end() ) {
141  return (iter->second)->send(id, msg_buffer);
142  } else {
143  // exceptions exceptions...
144  std::cout << "Mux : no mux by that name found (while trying to send)"<< std::endl;
145  return NotAvailable;
146  }
147 }
148 
149 } // namespace mm_mux_demux
static MuxMap & multiplexers()
Definition: mux.cpp:133
MessageMux(const std::string &name, const std::string &url, const mm_messages::Verbosity::Level &verbosity=mm_messages::Verbosity::QUIET, const bool bind=true)
Definition: mux.cpp:29
static void shutdown()
Definition: mux.cpp:129
std::pair< std::string, std::shared_ptr< impl::MessageMux > > MuxMapPair
Definition: mux.hpp:65
int send(const unsigned int &id, const mm_messages::ByteArray &msg_buffer)
Definition: mux.cpp:68
static void encode(const T &msg, ByteArray &buffer)
mm_messages::Verbosity::Level verbosity
Definition: mux.hpp:46
std::map< std::string, std::shared_ptr< impl::MessageMux > > MuxMap
Definition: mux.hpp:64
std::map< std::string, std::shared_ptr< impl::MessageMux > >::const_iterator MuxMapConstIterator
Definition: mux.hpp:67
std::vector< unsigned char > ByteArray
std::map< std::string, std::shared_ptr< impl::MessageMux > >::iterator MuxMapIterator
Definition: mux.hpp:66
static void start(const std::string &name, const std::string &url, const mm_messages::Verbosity::Level &verbosity=mm_messages::Verbosity::QUIET, const bool bind=true)
Pre-establish named connections.
Definition: mux.cpp:107


mm_mux_demux
Author(s): Daniel Stonier
autogenerated on Mon Jun 10 2019 13:52:14