13 #include <nanomsg/nn.h> 14 #include <nanomsg/pubsub.h> 15 #include "../../include/mm_mux_demux/demux.hpp" 29 const std::string& url,
37 shutdown_requested(false),
40 socket = nn_socket (AF_SP, NN_SUB);
43 std::cout <<
"Demux socket error: " << nn_strerror(errno) <<
" [" << nn_errno() <<
"][" << name <<
"][" << url <<
"]" << std::endl;
45 nn_setsockopt (
socket, NN_SUB, NN_SUB_SUBSCRIBE,
"", 0);
46 nn_setsockopt (
socket, NN_SOL_SOCKET, NN_SOCKET_NAME, name.c_str(), name.size());
48 nn_setsockopt (
socket, NN_SOL_SOCKET, NN_RCVTIMEO, &timeout,
sizeof(timeout));
56 std::cout <<
"Demux connect error: " << nn_strerror(errno) <<
" [" << nn_errno() <<
"][" << name <<
"][" << url <<
"]" << std::endl;
60 std::cout <<
"[" << ecl::TimeStamp() <<
"] MessageDemux : [" << name <<
"][" << url <<
"][" <<
socket <<
"][" <<
endpoint_id <<
"]";
62 std::cout <<
"[bind]" << std::endl;
64 std::cout <<
"[connect]" << std::endl;
102 unsigned char *buffer = NULL;
103 int bytes = ::nn_recv(
socket, &buffer, NN_MSG, 0);
106 if (nn_errno() == EAGAIN) {
114 std::cout <<
"[" << ecl::TimeStamp() <<
"] Demux: [" << subheader.
id <<
"]";
115 std::cout <<
"[" << bytes <<
"][";
117 std::cout << std::hex;
118 for(
unsigned int i=0; i < bytes; ++i ) {
119 std::cout << static_cast<unsigned int>(*(buffer+i)) <<
" ";
121 std::cout << std::dec;
124 std::cout << std::endl;
171 const std::string&
url,
177 if ( iter == demultiplexers().end() ) {
178 std::pair<DemuxMapIterator,bool> result;
179 result = demultiplexers().insert(
180 DemuxMapPair(name, std::make_shared<impl::MessageDemux>(name, url, verbosity, bind))
182 }
else if ( !url.empty() ) {
189 for (iter; iter != demultiplexers().end(); ++iter) {
197 if ( iter != demultiplexers().end() ) {
212 if ( iter != demultiplexers().end() ) {
215 std::cout <<
"Demux : no demux by that name found (while unregistering subscriber)"<< std::endl;
MessageDemux(const std::string &name, const std::string &url, const mm_messages::Verbosity::Level &verbosity=mm_messages::Verbosity::QUIET, const bool bind=false)
std::map< std::string, std::shared_ptr< impl::MessageDemux > >::iterator DemuxMapIterator
void unregisterSubscriber(const unsigned int &id)
static DemuxMap & demultiplexers()
Store demux's in an invisible storage location.
static void start(const std::string &name, const std::string &url, const mm_messages::Verbosity::Level &verbosity=mm_messages::Verbosity::QUIET, const bool bind=false)
mm_messages::Verbosity::Level verbosity
SubscriberMap subscribers
std::map< std::string, std::shared_ptr< impl::MessageDemux > > DemuxMap
std::pair< std::string, std::shared_ptr< impl::MessageDemux > > DemuxMapPair
std::map< std::string, std::shared_ptr< impl::MessageDemux > >::const_iterator DemuxMapConstIterator
static T decode(const unsigned char *buffer, const unsigned int &size)
std::map< unsigned int, BufferCallbackFunction >::iterator SubscriberMapIterator