12 zmq::radio_t::radio_t (
class ctx_t *parent_, uint32_t tid_,
int sid_) :
13 socket_base_t (parent_, tid_, sid_,
true), _lossy (
true)
18 zmq::radio_t::~radio_t ()
22 void zmq::radio_t::xattach_pipe (pipe_t *pipe_,
23 bool subscribe_to_all_,
24 bool locally_initiated_)
33 pipe_->set_nodelay ();
37 if (subscribe_to_all_)
38 _udp_pipes.push_back (pipe_);
42 xread_activated (pipe_);
45 void zmq::radio_t::xread_activated (pipe_t *pipe_)
49 while (pipe_->read (&msg)) {
51 if (msg.is_join () || msg.is_leave ()) {
58 std::pair<subscriptions_t::iterator, subscriptions_t::iterator>
61 for (subscriptions_t::iterator
it =
range.first;
63 if (
it->second == pipe_) {
64 _subscriptions.erase (
it);
74 void zmq::radio_t::xwrite_activated (pipe_t *pipe_)
76 _dist.activated (pipe_);
78 int zmq::radio_t::xsetsockopt (
int option_,
82 if (optvallen_ !=
sizeof (
int) || *
static_cast<const int *
> (optval_) < 0) {
87 _lossy = (*
static_cast<const int *
> (optval_) == 0);
95 void zmq::radio_t::xpipe_terminated (pipe_t *pipe_)
97 for (subscriptions_t::iterator
it = _subscriptions.begin (),
98 end = _subscriptions.end ();
100 if (
it->second == pipe_) {
101 #if __cplusplus >= 201103L || (defined _MSC_VER && _MSC_VER >= 1700)
102 it = _subscriptions.erase (
it);
104 _subscriptions.erase (
it++);
112 const udp_pipes_t::iterator
end = _udp_pipes.end ();
113 const udp_pipes_t::iterator
it =
114 std::find (_udp_pipes.begin (),
end, pipe_);
116 _udp_pipes.erase (
it);
119 _dist.pipe_terminated (pipe_);
122 int zmq::radio_t::xsend (msg_t *msg_)
125 if (msg_->flags () & msg_t::more) {
132 const std::pair<subscriptions_t::iterator, subscriptions_t::iterator>
136 _dist.match (
it->second);
138 for (udp_pipes_t::iterator
it = _udp_pipes.begin (),
139 end = _udp_pipes.end ();
144 if (_lossy || _dist.check_hwm ()) {
145 if (_dist.send_to_matching (msg_) == 0) {
154 bool zmq::radio_t::xhas_out ()
156 return _dist.has_out ();
159 int zmq::radio_t::xrecv (msg_t *msg_)
167 bool zmq::radio_t::xhas_in ()
172 zmq::radio_session_t::radio_session_t (io_thread_t *io_thread_,
174 socket_base_t *socket_,
177 session_base_t (io_thread_, connect_, socket_,
options_, addr_),
182 zmq::radio_session_t::~radio_session_t ()
186 int zmq::radio_session_t::push_msg (msg_t *msg_)
188 if (msg_->flags () & msg_t::command) {
189 char *command_data =
static_cast<char *
> (msg_->data ());
190 const size_t data_size = msg_->size ();
195 msg_t join_leave_msg;
199 if (data_size >= 5 && memcmp (command_data,
"\4JOIN", 5) == 0) {
200 group_length =
static_cast<int> (data_size) - 5;
201 group = command_data + 5;
202 rc = join_leave_msg.init_join ();
203 }
else if (data_size >= 6 && memcmp (command_data,
"\5LEAVE", 6) == 0) {
204 group_length =
static_cast<int> (data_size) - 6;
205 group = command_data + 6;
206 rc = join_leave_msg.init_leave ();
210 return session_base_t::push_msg (msg_);
215 rc = join_leave_msg.set_group (
group, group_length);
223 *msg_ = join_leave_msg;
224 return session_base_t::push_msg (msg_);
226 return session_base_t::push_msg (msg_);
229 int zmq::radio_session_t::pull_msg (msg_t *msg_)
231 if (_state ==
group) {
232 int rc = session_base_t::pull_msg (&_pending_msg);
236 const char *
group = _pending_msg.group ();
237 const int length =
static_cast<int> (strlen (
group));
240 rc = msg_->init_size (
length);
242 msg_->set_flags (msg_t::more);
249 *msg_ = _pending_msg;
254 void zmq::radio_session_t::reset ()
256 session_base_t::reset ();