lib/radio.cpp
Go to the documentation of this file.
1 
6 /*****************************************************************************
7 ** Includes
8 *****************************************************************************/
9 
10 #include <ecl/exceptions.hpp>
11 #include <mm_messages.hpp>
12 #include <nanomsg/nn.h>
13 #include <nanomsg/pair.h>
14 #include <string>
15 #include "../../include/mm_radio/radio.hpp"
16 
17 /*****************************************************************************
18 ** Namespaces
19 *****************************************************************************/
20 
21 namespace mm_radio {
22 namespace impl {
23 
24 /*****************************************************************************
25 ** Implementation
26 *****************************************************************************/
27 
28 Radio::Radio(const std::string& name,
29  const std::string& url,
30  const bool bind,
31  const mm_messages::Verbosity::Level& verbosity
32  ) :
33  name(name),
34  url(url),
35  verbosity(verbosity),
36  shutdown_requested(false)
37 {
38  socket = nn_socket (AF_SP, NN_PAIR);
39  if ( socket < 0 ) {
40  // TODO : throw exception
41  std::cout << "Radio socket error: " << nn_strerror(errno) << " [" << nn_errno() << "][" << name << "][" << url << "]" << std::endl;
42  }
43  int result = nn_setsockopt (socket, NN_SOL_SOCKET, NN_SOCKET_NAME, name.c_str(), name.size());
44  if ( result < 0 ) {
45  // TODO : throw exception
46  std::cout << "Radio nn_setsockopt error: " << nn_strerror(errno) << " [" << nn_errno() << "][" << name << "][" << url << "]" << std::endl;
47  }
48  int send_timeout_ms = 5000;
49  result = nn_setsockopt (socket, NN_SOL_SOCKET, NN_SNDTIMEO, &send_timeout_ms, sizeof(send_timeout_ms));
50  if ( bind ) {
51  endpoint_id = nn_bind(socket, url.c_str());
52  } else {
53  endpoint_id = nn_connect(socket, url.c_str());
54  }
55  if (endpoint_id < 0) {
56  // TODO : throw exception
57  if ( bind ) {
58  std::cout << "Radio bind error: " << nn_strerror(errno) << " [" << nn_errno() << "][" << name << "][" << url << "]" << std::endl;
59  } else {
60  std::cout << "Radio connect error: " << nn_strerror(errno) << " [" << nn_errno() << "][" << name << "][" << url << "]" << std::endl;
61  }
62  }
63  // TODO : check the result, throw exceptions if necessary
64  if (verbosity > mm_messages::Verbosity::QUIET) {
65 
66  std::cout << "[" << ecl::TimeStamp() << "] Radio : [" << name << "][" << url << "][" << socket << "][" << endpoint_id << "]";
67  if ( bind ) {
68  std::cout << "[bind]" << std::endl;
69  } else {
70  std::cout << "[connect]" << std::endl;
71  }
72  }
73  // std::thread call, need c++11
74  // thread = std::thread(&MessageDemux::spin, this);
75  // ecl::Thread call
76  thread.start(&Radio::spin, *this);
77 }
78 
79 Radio::Radio(const Radio& other) {
80  socket = other.socket;
81  endpoint_id = other.endpoint_id;
82  name = other.name;
83  verbosity = other.verbosity;
85  // this bugger forced us to write the copy constructor...get 'use of deleted function' otherwise
86  std::move(other.thread);
87 }
88 
90  if ( socket >= 0 ) {
91  // only possible to have one connection to a pair at any one time
92  // so don't worry about using nn_shutdown with endpoint ids.
93  nn_close(socket);
94  }
95  mutex.lock();
96  // c11 call
97  // for(auto& pair : subscribers) {
98  // delete pair.second;
99  // }
100  for (SubscriberMapIterator iter = subscribers.begin(); iter != subscribers.end(); ++iter) {
101  delete iter->second;
102  }
103  subscribers.clear();
104  mutex.unlock();
105 }
106 
107 void Radio::spin() {
108  // this is almost identical to the Demux spin (only Subscriber type is different)
109  while (!shutdown_requested)
110  {
111  unsigned char *buffer = NULL;
112  int bytes = ::nn_recv(socket, &buffer, NN_MSG, 0);
113  if ( bytes < 0 ) {
114  // We set socket options in the constructor to timeout as opposed to default infinite blocking
115  if (nn_errno() == EAGAIN) {
116  continue;
117  }
118  // TODO handle errors : http://nanomsg.org/v0.4/nn_recv.3.html
119  }
123  std::cout << "[" << ecl::TimeStamp() << "] RadioDemux: [" << subheader.id << "]";
124  std::cout << "[" << bytes << "]";
126  std::cout << "[" << std::hex;
127  for(unsigned int i=0; i < bytes; ++i ) {
128  std::cout << static_cast<unsigned int>(*(buffer+i)) << " ";
129  }
130  std::cout << std::dec;
131  std::cout << "]";
132  }
133  std::cout << std::endl;
134  }
135  mutex.lock();
136  SubscriberMapIterator iter = subscribers.find(subheader.id);
137  if (iter != subscribers.end()) {
139  }
140  mutex.unlock();
141  nn_freemsg (buffer);
142  }
143 }
150  if ( socket > 0 ) {
151  int result = close (socket);
152  }
153  if ( !shutdown_requested ) {
154  shutdown_requested = true;
155  thread.join();
156  }
157 }
158 
159 void Radio::unregisterSubscriber(const unsigned int& id)
160 {
161  mutex.lock();
162  subscribers.erase(id);
163  mutex.unlock();
164 }
165 
166 int Radio::send(const unsigned int& id, const mm_messages::ByteArray& msg_buffer) {
167  // this is identical to MessageMux's send (could be collapsed)
168  mm_messages::ByteArray buffer;
171  buffer.insert(buffer.end(), msg_buffer.begin(), msg_buffer.end());
172 
174  std::cout << "[" << ecl::TimeStamp() << "] RadioMux: [" << id << "][" << buffer.size() << "][";
175  std::cout << std::hex;
176  for(unsigned int i=0; i < buffer.size(); ++i ) {
177  std::cout << static_cast<unsigned int>(buffer[i]) << " ";
178  }
179  std::cout << std::dec;
180  std::cout << "]" << std::endl;
181  }
182  int result = ::nn_send(socket, buffer.data(), buffer.size(), 0); // last option is flags, only NN_DONTWAIT available
183  if ( result == -1 ) {
184  int error_number = nn_errno();
185  // TODO : lots of error flags to check here
186  if ( ( error_number == EAGAIN ) || ( error_number == ETIMEDOUT ) ) {
187  //std::cout << "[" << ecl::TimeStamp() << "] RadioMux : timed out trying to send a message [" << name << "][" << url << "]" << std::endl;
188  throw ecl::StandardException(LOC, ecl::TimeOutError, std::string("timed out trying to send a message [") + name + std::string("][") + url + std::string("]"));
189  }
190  }
191  return 0;
192 }
193 
194 } // namespace impl
195 } // namespace mm_radio
196 
197 /*****************************************************************************
198 ** Namespaces
199 *****************************************************************************/
200 
201 namespace mm_radio {
202 
203 /*****************************************************************************
204 ** Global Statics
205 *****************************************************************************/
206 
212 void Radio::startServer(const std::string& name,
213  const std::string& url,
215  RadioMapConstIterator iter = radios().find(name);
216  if ( iter == radios().end() ) {
217  if (url.empty()) {
218  // TODO : throw an exception
219  } else {
220  std::pair<RadioMapIterator,bool> result;
221  result = radios().insert(
222  RadioMapPair(name, std::make_shared<impl::Radio>(name, url, true, verbosity)));
223  }
224  } else if ( !url.empty() ) {
225  // TODO : throw an exception, name-url already present.
226  }
227 }
228 
234 void Radio::startClient(const std::string& name,
235  const std::string& url,
237  RadioMapConstIterator iter = radios().find(name);
238  if ( iter == radios().end() ) {
239  if (url.empty()) {
240  std::cout << "mm::Radio::startClient : url is empty" << std::endl;
241  // TODO : throw an exception
242  } else {
243  std::pair<RadioMapIterator,bool> result;
244  result = radios().insert(
245  RadioMapPair(name, std::make_shared<impl::Radio>(name, url, false, verbosity)));
246  }
247  } else if ( !url.empty() ) {
248  std::cout << "mm::Radio::startClient : already radio in the map [" << name << "][" << url << "]" << std::endl;
249  // TODO : throw an exception, name-url already present.
250  }
251 }
252 
254  static Radio::RadioMap map;
255  return map;
256 }
257 
258 int Radio::send(const std::string& name, const unsigned int& id, const mm_messages::ByteArray& msg_buffer) {
259  RadioMapIterator iter = radios().find(name);
260  if ( iter != radios().end() ) {
261  return (iter->second)->send(id, msg_buffer);
262  } else {
263  // exceptions exceptions...
264  std::cout << "Radio : no mux by that name found (while trying to send)"<< std::endl;
265  return NotAvailable;
266  }
267 }
268 
269 void Radio::shutdown(const std::string& name) {
270  radios().erase(name);
271 }
272 
274  radios().clear();
275 }
276 
277 void Radio::unregisterSubscriber(const std::string& name, const unsigned int& id)
278 {
279  RadioMapIterator iter = radios().find(name);
280  if ( iter != radios().end() ) {
281  (iter->second)->unregisterSubscriber(id);
282  } else {
283  // quietly pass - this will only occur if a subscriber self destructs after the radio is shut down
284  }
285 }
286 
287 } // namespace mm_radio
int send(const unsigned int &id, const mm_messages::ByteArray &msg_buffer)
Definition: lib/radio.cpp:166
void unregisterSubscriber(const unsigned int &id)
Definition: lib/radio.cpp:159
std::map< std::string, std::shared_ptr< impl::Radio > >::iterator RadioMapIterator
Definition: radio.hpp:109
std::map< std::string, std::shared_ptr< impl::Radio > > RadioMap
Definition: radio.hpp:107
std::pair< std::string, std::shared_ptr< impl::Radio > > RadioMapPair
Definition: radio.hpp:108
#define LOC
static void encode(const T &msg, ByteArray &buffer)
static const unsigned int size
ecl::Thread thread
Definition: radio.hpp:87
std::map< unsigned int, BufferCallbackFunction >::iterator SubscriberMapIterator
Definition: radio.hpp:78
static RadioMap & radios()
Definition: lib/radio.cpp:253
Radio(const std::string &name, const std::string &url, const bool bind, const mm_messages::Verbosity::Level &verbosity=mm_messages::Verbosity::QUIET)
Definition: lib/radio.cpp:28
mm_messages::Verbosity::Level verbosity
Definition: radio.hpp:85
std::map< std::string, std::shared_ptr< impl::Radio > >::const_iterator RadioMapConstIterator
Definition: radio.hpp:110
SubscriberMap subscribers
Definition: radio.hpp:88
static const unsigned int size
std::vector< unsigned char > ByteArray
std::string name
Definition: radio.hpp:83
std::string url
Definition: radio.hpp:83
static void startClient(const std::string &name, const std::string &url, const mm_messages::Verbosity::Level &verbosity=mm_messages::Verbosity::QUIET)
Pre-establish named connections for a client.
Definition: lib/radio.cpp:234
static void startServer(const std::string &name, const std::string &url, const mm_messages::Verbosity::Level &verbosity=mm_messages::Verbosity::QUIET)
Pre-establish named connections for a server.
Definition: lib/radio.cpp:212
TimeOutError
static T decode(const unsigned char *buffer, const unsigned int &size)
ecl::Mutex mutex
Definition: radio.hpp:89


mm_radio
Author(s): Daniel Stonier
autogenerated on Mon Jun 10 2019 13:52:16