demux.cpp
Go to the documentation of this file.
1 
6 /*****************************************************************************
7 ** Includes
8 *****************************************************************************/
9 
10 #include <ecl/formatters.hpp>
11 #include <iostream>
12 #include <mm_messages.hpp>
13 #include <nanomsg/nn.h>
14 #include <nanomsg/pubsub.h>
15 #include "../../include/mm_mux_demux/demux.hpp"
16 
17 /*****************************************************************************
18 ** Namespaces
19 *****************************************************************************/
20 
21 namespace mm_mux_demux {
22 namespace impl {
23 
24 /*****************************************************************************
25 ** Implementation
26 *****************************************************************************/
27 
28 MessageDemux::MessageDemux(const std::string& name,
29  const std::string& url,
30  const mm_messages::Verbosity::Level& verbosity,
31  const bool bind
32  ) :
33  name(name),
34  url(url),
35  socket(0),
36  verbosity(verbosity),
37  shutdown_requested(false),
38  thread() // defer start of the thread
39 {
40  socket = nn_socket (AF_SP, NN_SUB);
41  if ( socket < 0 ) {
42  // TODO : throw exception
43  std::cout << "Demux socket error: " << nn_strerror(errno) << " [" << nn_errno() << "][" << name << "][" << url << "]" << std::endl;
44  }
45  nn_setsockopt (socket, NN_SUB, NN_SUB_SUBSCRIBE, "", 0);
46  nn_setsockopt (socket, NN_SOL_SOCKET, NN_SOCKET_NAME, name.c_str(), name.size());
47  int timeout = 100; // timeout of 10ms (facilitates shutdown).
48  nn_setsockopt (socket, NN_SOL_SOCKET, NN_RCVTIMEO, &timeout, sizeof(timeout));
49  if ( bind ) {
50  endpoint_id = nn_bind(socket, url.c_str());
51  } else {
52  endpoint_id = nn_connect(socket, url.c_str());
53  }
54  if (endpoint_id < 0) {
55  // TODO : throw exception
56  std::cout << "Demux connect error: " << nn_strerror(errno) << " [" << nn_errno() << "][" << name << "][" << url << "]" << std::endl;
57  }
58  if (verbosity > mm_messages::Verbosity::QUIET) {
59 
60  std::cout << "[" << ecl::TimeStamp() << "] MessageDemux : [" << name << "][" << url << "][" << socket << "][" << endpoint_id << "]";
61  if ( bind ) {
62  std::cout << "[bind]" << std::endl;
63  } else {
64  std::cout << "[connect]" << std::endl;
65  }
66  }
67  // std::thread call, need c++11
68  // thread = std::thread(&MessageDemux::spin, this);
69  // ecl::Thread call
70  thread.start(&MessageDemux::spin, *this);
71 }
72 
74  socket = other.socket;
75  name = other.name;
76  verbosity = other.verbosity;
78  endpoint_id = other.endpoint_id;
79  // this bugger forced us to write the copy constructor...get 'use of deleted function' otherwise
80  std::move(other.thread);
81 }
82 
84 {
85  mutex.lock();
86  // c11 call
87  // for(auto& pair : subscribers) {
88  // delete pair.second;
89  // }
90  for (SubscriberMapIterator iter = subscribers.begin(); iter != subscribers.end(); ++iter) {
91  delete iter->second;
92  }
93  subscribers.clear();
94  mutex.unlock();
95 
96  shutdown();
97 }
98 
100  while (!shutdown_requested)
101  {
102  unsigned char *buffer = NULL;
103  int bytes = ::nn_recv(socket, &buffer, NN_MSG, 0);
104  if ( bytes < 0 ) {
105  // We set socket options in the constructor to timeout as opposed to default infinite blocking
106  if (nn_errno() == EAGAIN) {
107  continue;
108  }
109  // TODO handle errors : http://nanomsg.org/v0.4/nn_recv.3.html
110  }
114  std::cout << "[" << ecl::TimeStamp() << "] Demux: [" << subheader.id << "]";
115  std::cout << "[" << bytes << "][";
117  std::cout << std::hex;
118  for(unsigned int i=0; i < bytes; ++i ) {
119  std::cout << static_cast<unsigned int>(*(buffer+i)) << " ";
120  }
121  std::cout << std::dec;
122  std::cout << "]";
123  }
124  std::cout << std::endl;
125  }
126  mutex.lock();
127  SubscriberMapIterator iter = subscribers.find(subheader.id);
128  if (iter != subscribers.end()) {
130  }
131  mutex.unlock();
132  nn_freemsg (buffer);
133  }
134 }
141  if ( !shutdown_requested ) {
142  shutdown_requested = true;
143  thread.join();
144  }
145  if ( socket > 0 ) {
146  int result = nn_shutdown (socket, endpoint_id);
147  }
148 }
149 
150 void MessageDemux::unregisterSubscriber(const unsigned int& id)
151 {
152  mutex.lock();
153  subscribers.erase(id);
154  mutex.unlock();
155 }
156 
157 } // namespace impl
158 } // mm_mux_demux
159 
160 /*****************************************************************************
161 ** Namespaces
162 *****************************************************************************/
163 
164 namespace mm_mux_demux {
165 
166 /*****************************************************************************
167 ** Globals
168 *****************************************************************************/
169 
170 void MessageDemux::start(const std::string& name,
171  const std::string& url,
173  const bool bind
174  )
175 {
176  DemuxMapConstIterator iter = demultiplexers().find(name);
177  if ( iter == demultiplexers().end() ) {
178  std::pair<DemuxMapIterator,bool> result;
179  result = demultiplexers().insert(
180  DemuxMapPair(name, std::make_shared<impl::MessageDemux>(name, url, verbosity, bind))
181  );
182  } else if ( !url.empty() ) {
183  // TODO : throw an exception, name-url already present.
184  }
185 }
186 
188  DemuxMapIterator iter = demultiplexers().begin();
189  for (iter; iter != demultiplexers().end(); ++iter) {
190  (iter->second)->shutdown();
191  }
192  // TODO : delete from map
193 }
194 
195 void MessageDemux::shutdown(const std::string& name) {
196  DemuxMapConstIterator iter = demultiplexers().find(name);
197  if ( iter != demultiplexers().end() ) {
198  (iter->second)->shutdown();
199  }
200  // TODO : delete from map
201 }
202 
204 {
205  static MessageDemux::DemuxMap map; // string - demux storage in an invisible location
206  return map;
207 }
208 
209 void MessageDemux::unregisterSubscriber(const std::string& name, const unsigned int& id)
210 {
211  DemuxMapIterator iter = demultiplexers().find(name);
212  if ( iter != demultiplexers().end() ) {
213  (iter->second)->unregisterSubscriber(id);
214  } else {
215  std::cout << "Demux : no demux by that name found (while unregistering subscriber)"<< std::endl;
216  }
217 }
218 
219 
220 
221 } // mm_mux_demux
MessageDemux(const std::string &name, const std::string &url, const mm_messages::Verbosity::Level &verbosity=mm_messages::Verbosity::QUIET, const bool bind=false)
Definition: demux.cpp:28
std::map< std::string, std::shared_ptr< impl::MessageDemux > >::iterator DemuxMapIterator
Definition: demux.hpp:103
void unregisterSubscriber(const unsigned int &id)
Definition: demux.cpp:150
static const unsigned int size
static DemuxMap & demultiplexers()
Store demux&#39;s in an invisible storage location.
Definition: demux.cpp:203
static void start(const std::string &name, const std::string &url, const mm_messages::Verbosity::Level &verbosity=mm_messages::Verbosity::QUIET, const bool bind=false)
Definition: demux.cpp:170
mm_messages::Verbosity::Level verbosity
Definition: demux.hpp:78
static const unsigned int size
std::map< std::string, std::shared_ptr< impl::MessageDemux > > DemuxMap
Definition: demux.hpp:101
std::pair< std::string, std::shared_ptr< impl::MessageDemux > > DemuxMapPair
Definition: demux.hpp:102
std::map< std::string, std::shared_ptr< impl::MessageDemux > >::const_iterator DemuxMapConstIterator
Definition: demux.hpp:104
static T decode(const unsigned char *buffer, const unsigned int &size)
std::map< unsigned int, BufferCallbackFunction >::iterator SubscriberMapIterator
Definition: demux.hpp:69


mm_mux_demux
Author(s): Daniel Stonier
autogenerated on Mon Jun 10 2019 13:52:14