12 #include <nanomsg/nn.h> 13 #include <nanomsg/pair.h> 15 #include "../../include/mm_radio/radio.hpp" 29 const std::string& url,
36 shutdown_requested(false)
38 socket = nn_socket (AF_SP, NN_PAIR);
41 std::cout <<
"Radio socket error: " << nn_strerror(errno) <<
" [" << nn_errno() <<
"][" << name <<
"][" << url <<
"]" << std::endl;
43 int result = nn_setsockopt (
socket, NN_SOL_SOCKET, NN_SOCKET_NAME, name.c_str(), name.size());
46 std::cout <<
"Radio nn_setsockopt error: " << nn_strerror(errno) <<
" [" << nn_errno() <<
"][" << name <<
"][" << url <<
"]" << std::endl;
48 int send_timeout_ms = 5000;
49 result = nn_setsockopt (
socket, NN_SOL_SOCKET, NN_SNDTIMEO, &send_timeout_ms,
sizeof(send_timeout_ms));
58 std::cout <<
"Radio bind error: " << nn_strerror(errno) <<
" [" << nn_errno() <<
"][" << name <<
"][" << url <<
"]" << std::endl;
60 std::cout <<
"Radio connect error: " << nn_strerror(errno) <<
" [" << nn_errno() <<
"][" << name <<
"][" << url <<
"]" << std::endl;
66 std::cout <<
"[" << ecl::TimeStamp() <<
"] Radio : [" << name <<
"][" << url <<
"][" <<
socket <<
"][" <<
endpoint_id <<
"]";
68 std::cout <<
"[bind]" << std::endl;
70 std::cout <<
"[connect]" << std::endl;
111 unsigned char *buffer = NULL;
112 int bytes = ::nn_recv(
socket, &buffer, NN_MSG, 0);
115 if (nn_errno() == EAGAIN) {
123 std::cout <<
"[" << ecl::TimeStamp() <<
"] RadioDemux: [" << subheader.
id <<
"]";
124 std::cout <<
"[" << bytes <<
"]";
126 std::cout <<
"[" << std::hex;
127 for(
unsigned int i=0; i < bytes; ++i ) {
128 std::cout << static_cast<unsigned int>(*(buffer+i)) <<
" ";
130 std::cout << std::dec;
133 std::cout << std::endl;
151 int result = close (
socket);
171 buffer.insert(buffer.end(), msg_buffer.begin(), msg_buffer.end());
174 std::cout <<
"[" << ecl::TimeStamp() <<
"] RadioMux: [" <<
id <<
"][" << buffer.size() <<
"][";
175 std::cout << std::hex;
176 for(
unsigned int i=0; i < buffer.size(); ++i ) {
177 std::cout << static_cast<unsigned int>(buffer[i]) <<
" ";
179 std::cout << std::dec;
180 std::cout <<
"]" << std::endl;
182 int result = ::nn_send(
socket, buffer.data(), buffer.size(), 0);
183 if ( result == -1 ) {
184 int error_number = nn_errno();
186 if ( ( error_number == EAGAIN ) || ( error_number == ETIMEDOUT ) ) {
213 const std::string&
url,
216 if ( iter == radios().end() ) {
220 std::pair<RadioMapIterator,bool> result;
221 result = radios().insert(
222 RadioMapPair(name, std::make_shared<impl::Radio>(name, url,
true, verbosity)));
224 }
else if ( !url.empty() ) {
235 const std::string&
url,
238 if ( iter == radios().end() ) {
240 std::cout <<
"mm::Radio::startClient : url is empty" << std::endl;
243 std::pair<RadioMapIterator,bool> result;
244 result = radios().insert(
245 RadioMapPair(name, std::make_shared<impl::Radio>(name, url,
false, verbosity)));
247 }
else if ( !url.empty() ) {
248 std::cout <<
"mm::Radio::startClient : already radio in the map [" << name <<
"][" << url <<
"]" << std::endl;
260 if ( iter != radios().end() ) {
261 return (iter->second)->send(
id, msg_buffer);
264 std::cout <<
"Radio : no mux by that name found (while trying to send)"<< std::endl;
270 radios().erase(name);
280 if ( iter != radios().end() ) {
int send(const unsigned int &id, const mm_messages::ByteArray &msg_buffer)
void unregisterSubscriber(const unsigned int &id)
std::map< std::string, std::shared_ptr< impl::Radio > >::iterator RadioMapIterator
std::map< std::string, std::shared_ptr< impl::Radio > > RadioMap
std::pair< std::string, std::shared_ptr< impl::Radio > > RadioMapPair
static void encode(const T &msg, ByteArray &buffer)
std::map< unsigned int, BufferCallbackFunction >::iterator SubscriberMapIterator
static RadioMap & radios()
Radio(const std::string &name, const std::string &url, const bool bind, const mm_messages::Verbosity::Level &verbosity=mm_messages::Verbosity::QUIET)
mm_messages::Verbosity::Level verbosity
std::map< std::string, std::shared_ptr< impl::Radio > >::const_iterator RadioMapConstIterator
SubscriberMap subscribers
std::vector< unsigned char > ByteArray
static void startClient(const std::string &name, const std::string &url, const mm_messages::Verbosity::Level &verbosity=mm_messages::Verbosity::QUIET)
Pre-establish named connections for a client.
static void startServer(const std::string &name, const std::string &url, const mm_messages::Verbosity::Level &verbosity=mm_messages::Verbosity::QUIET)
Pre-establish named connections for a server.
static T decode(const unsigned char *buffer, const unsigned int &size)