radio.cpp
Go to the documentation of this file.
00001 
00006 /*****************************************************************************
00007 ** Includes
00008 *****************************************************************************/
00009 
00010 #include <ecl/exceptions.hpp>
00011 #include <mm_messages.hpp>
00012 #include <nanomsg/nn.h>
00013 #include <nanomsg/pair.h>
00014 #include <string>
00015 #include "../../include/mm_radio/radio.hpp"
00016 
00017 /*****************************************************************************
00018 ** Namespaces
00019 *****************************************************************************/
00020 
00021 namespace mm_radio {
00022 namespace impl {
00023 
00024 /*****************************************************************************
00025 ** Implementation
00026 *****************************************************************************/
00027 
00028 Radio::Radio(const std::string& name,
00029              const std::string& url,
00030              const bool bind,
00031              const mm_messages::Verbosity::Level& verbosity
00032             ) :
00033   name(name),
00034   url(url),
00035   verbosity(verbosity),
00036   shutdown_requested(false)
00037 {
00038   socket = nn_socket (AF_SP, NN_PAIR);
00039   if ( socket < 0 ) {
00040     // TODO : throw exception
00041     std::cout << "Radio socket error: " << nn_strerror(errno) << " [" << nn_errno() << "][" << name << "][" << url << "]" << std::endl;
00042   }
00043   int result = nn_setsockopt (socket, NN_SOL_SOCKET, NN_SOCKET_NAME, name.c_str(), name.size());
00044   if ( result < 0 ) {
00045     // TODO : throw exception
00046     std::cout << "Radio nn_setsockopt error: " <<  nn_strerror(errno) << " [" << nn_errno() << "][" << name << "][" << url << "]" << std::endl;
00047   }
00048   int send_timeout_ms = 5000;
00049   result = nn_setsockopt (socket, NN_SOL_SOCKET, NN_SNDTIMEO, &send_timeout_ms, sizeof(send_timeout_ms));
00050   if ( bind ) {
00051     endpoint_id = nn_bind(socket, url.c_str());
00052   } else {
00053     endpoint_id = nn_connect(socket, url.c_str());
00054   }
00055   if (endpoint_id < 0) {
00056     // TODO : throw exception
00057     if ( bind ) {
00058       std::cout << "Radio bind error: " << nn_strerror(errno) << " [" << nn_errno() << "][" << name << "][" << url << "]" << std::endl;
00059     } else {
00060       std::cout << "Radio connect error: " << nn_strerror(errno) << " [" << nn_errno() << "][" << name << "][" << url << "]" << std::endl;
00061     }
00062   }
00063   // TODO : check the result, throw exceptions if necessary
00064   if (verbosity > mm_messages::Verbosity::QUIET) {
00065 
00066     std::cout << "[" << ecl::TimeStamp() << "] Radio : [" << name << "][" << url << "][" << socket << "][" << endpoint_id << "]";
00067     if ( bind ) {
00068       std::cout << "[bind]" << std::endl;
00069     } else {
00070       std::cout << "[connect]" << std::endl;
00071     }
00072   }
00073   // std::thread call, need c++11
00074   // thread = std::thread(&MessageDemux::spin, this);
00075   // ecl::Thread call
00076   thread.start(&Radio::spin, *this);
00077 }
00078 
00079 Radio::Radio(const Radio& other) {
00080   socket = other.socket;
00081   endpoint_id = other.endpoint_id;
00082   name = other.name;
00083   verbosity = other.verbosity;
00084   shutdown_requested = other.shutdown_requested;
00085   // this bugger forced us to write the copy constructor...get 'use of deleted function' otherwise
00086   std::move(other.thread);
00087 }
00088 
00089 Radio::~Radio() {
00090   if ( socket >= 0 ) {
00091     // only possible to have one connection to a pair at any one time
00092     // so don't worry about using nn_shutdown with endpoint ids.
00093     nn_close(socket);
00094   }
00095   mutex.lock();
00096   // c11 call
00097   // for(auto& pair : subscribers) {
00098   //     delete pair.second;
00099   // }
00100   for (SubscriberMapIterator iter = subscribers.begin(); iter != subscribers.end(); ++iter) {
00101     delete iter->second;
00102   }
00103   subscribers.clear();
00104   mutex.unlock();
00105 }
00106 
00107 void Radio::spin() {
00108   // this is almost identical to the Demux spin (only Subscriber type is different)
00109   while (!shutdown_requested)
00110   {
00111     unsigned char *buffer = NULL;
00112     int bytes = ::nn_recv(socket, &buffer, NN_MSG, 0);
00113     if ( bytes < 0 ) {
00114       // We set socket options in the constructor to timeout as opposed to default infinite blocking
00115       if (nn_errno() == EAGAIN) {
00116         continue;
00117       }
00118       // TODO handle errors : http://nanomsg.org/v0.4/nn_recv.3.html
00119     }
00120     mm_messages::PacketHeader header = mm_messages::Message<mm_messages::PacketHeader>::decode(buffer, mm_messages::PacketHeader::size);
00121     mm_messages::SubPacketHeader subheader = mm_messages::Message<mm_messages::SubPacketHeader>::decode(buffer + mm_messages::PacketHeader::size, mm_messages::SubPacketHeader::size);
00122     if ( verbosity > mm_messages::Verbosity::QUIET ) {
00123       std::cout << "[" << ecl::TimeStamp() << "] RadioDemux: [" << subheader.id << "]";
00124       std::cout << "[" << bytes << "]";
00125       if ( verbosity > mm_messages::Verbosity::LOW ) {
00126         std::cout << "[" << std::hex;
00127         for(unsigned int i=0; i < bytes; ++i ) {
00128           std::cout << static_cast<unsigned int>(*(buffer+i)) << " ";
00129         }
00130         std::cout << std::dec;
00131         std::cout << "]";
00132       }
00133       std::cout << std::endl;
00134     }
00135     mutex.lock();
00136     SubscriberMapIterator iter = subscribers.find(subheader.id);
00137     if (iter != subscribers.end()) {
00138       (*(iter->second))(buffer + mm_messages::PacketHeader::size + mm_messages::SubPacketHeader::size, bytes - mm_messages::PacketHeader::size - mm_messages::SubPacketHeader::size);
00139     }
00140     mutex.unlock();
00141     nn_freemsg (buffer);
00142   }
00143 }
00149 void Radio::shutdown() {
00150   if ( socket > 0 ) {
00151     int result = close (socket);
00152   }
00153   if ( !shutdown_requested ) {
00154     shutdown_requested = true;
00155     thread.join();
00156   }
00157 }
00158 
00159 void Radio::unregisterSubscriber(const unsigned int& id)
00160 {
00161   mutex.lock();
00162   subscribers.erase(id);
00163   mutex.unlock();
00164 }
00165 
00166 int Radio::send(const unsigned int& id, const mm_messages::ByteArray& msg_buffer) {
00167   // this is identical to MessageMux's send (could be collapsed)
00168   mm_messages::ByteArray buffer;
00169   mm_messages::Message<mm_messages::PacketHeader>::encode(mm_messages::PacketHeader(), buffer);
00170   mm_messages::Message<mm_messages::SubPacketHeader>::encode(mm_messages::SubPacketHeader(id, msg_buffer.size()), buffer);
00171   buffer.insert(buffer.end(), msg_buffer.begin(), msg_buffer.end());
00172 
00173   if (verbosity > mm_messages::Verbosity::LOW) {
00174     std::cout << "[" << ecl::TimeStamp() << "] RadioMux: [" << id << "][" << buffer.size() << "][";
00175     std::cout << std::hex;
00176     for(unsigned int i=0; i < buffer.size(); ++i ) {
00177       std::cout << static_cast<unsigned int>(buffer[i]) << " ";
00178     }
00179     std::cout << std::dec;
00180     std::cout << "]" << std::endl;
00181   }
00182   int result = ::nn_send(socket, buffer.data(), buffer.size(), 0); // last option is flags, only NN_DONTWAIT available
00183   if ( result == -1 ) {
00184     int error_number = nn_errno();
00185     // TODO : lots of error flags to check here
00186     if ( ( error_number == EAGAIN ) || ( error_number == ETIMEDOUT ) ) {
00187       //std::cout << "[" << ecl::TimeStamp() << "] RadioMux : timed out trying to send a message [" << name << "][" << url << "]" << std::endl;
00188       throw ecl::StandardException(LOC, ecl::TimeOutError, std::string("timed out trying to send a message [") + name + std::string("][") + url + std::string("]"));
00189     }
00190   }
00191   return 0;
00192 }
00193 
00194 } // namespace impl
00195 } // namespace mm_radio
00196 
00197 /*****************************************************************************
00198 ** Namespaces
00199 *****************************************************************************/
00200 
00201 namespace mm_radio {
00202 
00203 /*****************************************************************************
00204 ** Global Statics
00205 *****************************************************************************/
00206 
00212 void Radio::startServer(const std::string& name,
00213                           const std::string& url,
00214                           const mm_messages::Verbosity::Level& verbosity) {
00215   RadioMapConstIterator iter = radios().find(name);
00216   if ( iter == radios().end() ) {
00217     if (url.empty()) {
00218       // TODO : throw an exception
00219     } else {
00220       std::pair<RadioMapIterator,bool> result;
00221       result = radios().insert(
00222           RadioMapPair(name, std::make_shared<impl::Radio>(name, url, true, verbosity)));
00223     }
00224   } else if ( !url.empty() ) {
00225     // TODO : throw an exception, name-url already present.
00226   }
00227 }
00228 
00234 void Radio::startClient(const std::string& name,
00235                           const std::string& url,
00236                           const mm_messages::Verbosity::Level& verbosity) {
00237   RadioMapConstIterator iter = radios().find(name);
00238   if ( iter == radios().end() ) {
00239     if (url.empty()) {
00240       std::cout << "mm::Radio::startClient : url is empty" << std::endl;
00241       // TODO : throw an exception
00242     } else {
00243       std::pair<RadioMapIterator,bool> result;
00244       result = radios().insert(
00245           RadioMapPair(name, std::make_shared<impl::Radio>(name, url, false, verbosity)));
00246     }
00247   } else if ( !url.empty() ) {
00248     std::cout << "mm::Radio::startClient : already radio in the map [" << name << "][" << url << "]" << std::endl;
00249     // TODO : throw an exception, name-url already present.
00250   }
00251 }
00252 
00253 Radio::RadioMap& Radio::radios() {
00254   static Radio::RadioMap map;
00255   return map;
00256 }
00257 
00258 int Radio::send(const std::string& name, const unsigned int& id, const mm_messages::ByteArray& msg_buffer) {
00259   RadioMapIterator iter = radios().find(name);
00260   if ( iter != radios().end() ) {
00261     return (iter->second)->send(id, msg_buffer);
00262   } else {
00263     // exceptions exceptions...
00264     std::cout << "Radio : no mux by that name found (while trying to send)"<< std::endl;
00265     return NotAvailable;
00266   }
00267 }
00268 
00269 void Radio::shutdown(const std::string& name) {
00270   radios().erase(name);
00271 }
00272 
00273 void Radio::shutdown() {
00274   radios().clear();
00275 }
00276 
00277 void Radio::unregisterSubscriber(const std::string& name, const unsigned int& id)
00278 {
00279   RadioMapIterator iter = radios().find(name);
00280   if ( iter != radios().end() ) {
00281     (iter->second)->unregisterSubscriber(id);
00282   } else {
00283     // quietly pass - this will only occur if a subscriber self destructs after the radio is shut down
00284   }
00285 }
00286 
00287 } // namespace mm_radio


mm_radio
Author(s): Daniel Stonier
autogenerated on Thu Jun 6 2019 21:13:24