27 bool subscribe_to_all_,
28 bool locally_initiated_)
35 uint32_t routing_id = _next_routing_id++;
37 routing_id = _next_routing_id++;
39 pipe_->set_server_socket_routing_id (routing_id);
43 _out_pipes.ZMQ_MAP_INSERT_OR_EMPLACE (routing_id, outpipe).second;
51 const out_pipes_t::iterator
it =
52 _out_pipes.find (pipe_->get_server_socket_routing_id ());
54 _out_pipes.erase (
it);
55 _fq.pipe_terminated (pipe_);
60 _fq.activated (pipe_);
65 const out_pipes_t::iterator
end = _out_pipes.end ();
66 out_pipes_t::iterator
it;
67 for (
it = _out_pipes.begin ();
it !=
end; ++
it)
68 if (
it->second.pipe == pipe_)
73 it->second.active =
true;
85 out_pipes_t::iterator
it = _out_pipes.find (routing_id);
87 if (
it != _out_pipes.end ()) {
88 if (!
it->second.pipe->check_write ()) {
89 it->second.active =
false;
102 const bool ok =
it->second.pipe->write (msg_);
108 it->second.pipe->flush ();
120 int rc = _fq.recvpipe (msg_, &pipe);
125 rc = _fq.recvpipe (msg_,
NULL);
128 rc = _fq.recvpipe (msg_,
NULL);
132 rc = _fq.recvpipe (msg_, &pipe);
140 const uint32_t routing_id = pipe->get_server_socket_routing_id ();
148 return _fq.has_in ();