00001
00006
00007
00008
00009
00010 #include <ecl/exceptions.hpp>
00011 #include <mm_messages.hpp>
00012 #include <nanomsg/nn.h>
00013 #include <nanomsg/pair.h>
00014 #include <string>
00015 #include "../../include/mm_radio/radio.hpp"
00016
00017
00018
00019
00020
00021 namespace mm_radio {
00022 namespace impl {
00023
00024
00025
00026
00027
00028 Radio::Radio(const std::string& name,
00029 const std::string& url,
00030 const bool bind,
00031 const mm_messages::Verbosity::Level& verbosity
00032 ) :
00033 name(name),
00034 url(url),
00035 verbosity(verbosity),
00036 shutdown_requested(false)
00037 {
00038 socket = nn_socket (AF_SP, NN_PAIR);
00039 if ( socket < 0 ) {
00040
00041 std::cout << "Radio socket error: " << nn_strerror(errno) << " [" << nn_errno() << "][" << name << "][" << url << "]" << std::endl;
00042 }
00043 int result = nn_setsockopt (socket, NN_SOL_SOCKET, NN_SOCKET_NAME, name.c_str(), name.size());
00044 if ( result < 0 ) {
00045
00046 std::cout << "Radio nn_setsockopt error: " << nn_strerror(errno) << " [" << nn_errno() << "][" << name << "][" << url << "]" << std::endl;
00047 }
00048 int send_timeout_ms = 5000;
00049 result = nn_setsockopt (socket, NN_SOL_SOCKET, NN_SNDTIMEO, &send_timeout_ms, sizeof(send_timeout_ms));
00050 if ( bind ) {
00051 endpoint_id = nn_bind(socket, url.c_str());
00052 } else {
00053 endpoint_id = nn_connect(socket, url.c_str());
00054 }
00055 if (endpoint_id < 0) {
00056
00057 if ( bind ) {
00058 std::cout << "Radio bind error: " << nn_strerror(errno) << " [" << nn_errno() << "][" << name << "][" << url << "]" << std::endl;
00059 } else {
00060 std::cout << "Radio connect error: " << nn_strerror(errno) << " [" << nn_errno() << "][" << name << "][" << url << "]" << std::endl;
00061 }
00062 }
00063
00064 if (verbosity > mm_messages::Verbosity::QUIET) {
00065
00066 std::cout << "[" << ecl::TimeStamp() << "] Radio : [" << name << "][" << url << "][" << socket << "][" << endpoint_id << "]";
00067 if ( bind ) {
00068 std::cout << "[bind]" << std::endl;
00069 } else {
00070 std::cout << "[connect]" << std::endl;
00071 }
00072 }
00073
00074
00075
00076 thread.start(&Radio::spin, *this);
00077 }
00078
00079 Radio::Radio(const Radio& other) {
00080 socket = other.socket;
00081 endpoint_id = other.endpoint_id;
00082 name = other.name;
00083 verbosity = other.verbosity;
00084 shutdown_requested = other.shutdown_requested;
00085
00086 std::move(other.thread);
00087 }
00088
00089 Radio::~Radio() {
00090 if ( socket >= 0 ) {
00091
00092
00093 nn_close(socket);
00094 }
00095 mutex.lock();
00096
00097
00098
00099
00100 for (SubscriberMapIterator iter = subscribers.begin(); iter != subscribers.end(); ++iter) {
00101 delete iter->second;
00102 }
00103 subscribers.clear();
00104 mutex.unlock();
00105 }
00106
00107 void Radio::spin() {
00108
00109 while (!shutdown_requested)
00110 {
00111 unsigned char *buffer = NULL;
00112 int bytes = ::nn_recv(socket, &buffer, NN_MSG, 0);
00113 if ( bytes < 0 ) {
00114
00115 if (nn_errno() == EAGAIN) {
00116 continue;
00117 }
00118
00119 }
00120 mm_messages::PacketHeader header = mm_messages::Message<mm_messages::PacketHeader>::decode(buffer, mm_messages::PacketHeader::size);
00121 mm_messages::SubPacketHeader subheader = mm_messages::Message<mm_messages::SubPacketHeader>::decode(buffer + mm_messages::PacketHeader::size, mm_messages::SubPacketHeader::size);
00122 if ( verbosity > mm_messages::Verbosity::QUIET ) {
00123 std::cout << "[" << ecl::TimeStamp() << "] RadioDemux: [" << subheader.id << "]";
00124 std::cout << "[" << bytes << "]";
00125 if ( verbosity > mm_messages::Verbosity::LOW ) {
00126 std::cout << "[" << std::hex;
00127 for(unsigned int i=0; i < bytes; ++i ) {
00128 std::cout << static_cast<unsigned int>(*(buffer+i)) << " ";
00129 }
00130 std::cout << std::dec;
00131 std::cout << "]";
00132 }
00133 std::cout << std::endl;
00134 }
00135 mutex.lock();
00136 SubscriberMapIterator iter = subscribers.find(subheader.id);
00137 if (iter != subscribers.end()) {
00138 (*(iter->second))(buffer + mm_messages::PacketHeader::size + mm_messages::SubPacketHeader::size, bytes - mm_messages::PacketHeader::size - mm_messages::SubPacketHeader::size);
00139 }
00140 mutex.unlock();
00141 nn_freemsg (buffer);
00142 }
00143 }
00149 void Radio::shutdown() {
00150 if ( socket > 0 ) {
00151 int result = close (socket);
00152 }
00153 if ( !shutdown_requested ) {
00154 shutdown_requested = true;
00155 thread.join();
00156 }
00157 }
00158
00159 void Radio::unregisterSubscriber(const unsigned int& id)
00160 {
00161 mutex.lock();
00162 subscribers.erase(id);
00163 mutex.unlock();
00164 }
00165
00166 int Radio::send(const unsigned int& id, const mm_messages::ByteArray& msg_buffer) {
00167
00168 mm_messages::ByteArray buffer;
00169 mm_messages::Message<mm_messages::PacketHeader>::encode(mm_messages::PacketHeader(), buffer);
00170 mm_messages::Message<mm_messages::SubPacketHeader>::encode(mm_messages::SubPacketHeader(id, msg_buffer.size()), buffer);
00171 buffer.insert(buffer.end(), msg_buffer.begin(), msg_buffer.end());
00172
00173 if (verbosity > mm_messages::Verbosity::LOW) {
00174 std::cout << "[" << ecl::TimeStamp() << "] RadioMux: [" << id << "][" << buffer.size() << "][";
00175 std::cout << std::hex;
00176 for(unsigned int i=0; i < buffer.size(); ++i ) {
00177 std::cout << static_cast<unsigned int>(buffer[i]) << " ";
00178 }
00179 std::cout << std::dec;
00180 std::cout << "]" << std::endl;
00181 }
00182 int result = ::nn_send(socket, buffer.data(), buffer.size(), 0);
00183 if ( result == -1 ) {
00184 int error_number = nn_errno();
00185
00186 if ( ( error_number == EAGAIN ) || ( error_number == ETIMEDOUT ) ) {
00187
00188 throw ecl::StandardException(LOC, ecl::TimeOutError, std::string("timed out trying to send a message [") + name + std::string("][") + url + std::string("]"));
00189 }
00190 }
00191 return 0;
00192 }
00193
00194 }
00195 }
00196
00197
00198
00199
00200
00201 namespace mm_radio {
00202
00203
00204
00205
00206
00212 void Radio::startServer(const std::string& name,
00213 const std::string& url,
00214 const mm_messages::Verbosity::Level& verbosity) {
00215 RadioMapConstIterator iter = radios().find(name);
00216 if ( iter == radios().end() ) {
00217 if (url.empty()) {
00218
00219 } else {
00220 std::pair<RadioMapIterator,bool> result;
00221 result = radios().insert(
00222 RadioMapPair(name, std::make_shared<impl::Radio>(name, url, true, verbosity)));
00223 }
00224 } else if ( !url.empty() ) {
00225
00226 }
00227 }
00228
00234 void Radio::startClient(const std::string& name,
00235 const std::string& url,
00236 const mm_messages::Verbosity::Level& verbosity) {
00237 RadioMapConstIterator iter = radios().find(name);
00238 if ( iter == radios().end() ) {
00239 if (url.empty()) {
00240 std::cout << "mm::Radio::startClient : url is empty" << std::endl;
00241
00242 } else {
00243 std::pair<RadioMapIterator,bool> result;
00244 result = radios().insert(
00245 RadioMapPair(name, std::make_shared<impl::Radio>(name, url, false, verbosity)));
00246 }
00247 } else if ( !url.empty() ) {
00248 std::cout << "mm::Radio::startClient : already radio in the map [" << name << "][" << url << "]" << std::endl;
00249
00250 }
00251 }
00252
00253 Radio::RadioMap& Radio::radios() {
00254 static Radio::RadioMap map;
00255 return map;
00256 }
00257
00258 int Radio::send(const std::string& name, const unsigned int& id, const mm_messages::ByteArray& msg_buffer) {
00259 RadioMapIterator iter = radios().find(name);
00260 if ( iter != radios().end() ) {
00261 return (iter->second)->send(id, msg_buffer);
00262 } else {
00263
00264 std::cout << "Radio : no mux by that name found (while trying to send)"<< std::endl;
00265 return NotAvailable;
00266 }
00267 }
00268
00269 void Radio::shutdown(const std::string& name) {
00270 radios().erase(name);
00271 }
00272
00273 void Radio::shutdown() {
00274 radios().clear();
00275 }
00276
00277 void Radio::unregisterSubscriber(const std::string& name, const unsigned int& id)
00278 {
00279 RadioMapIterator iter = radios().find(name);
00280 if ( iter != radios().end() ) {
00281 (iter->second)->unregisterSubscriber(id);
00282 } else {
00283
00284 }
00285 }
00286
00287 }