10 zmq::dish_t::dish_t (
class ctx_t *parent_, uint32_t tid_,
int sid_) :
11 socket_base_t (parent_, tid_, sid_,
true), _has_message (
false)
19 const int rc = _message.init ();
23 zmq::dish_t::~dish_t ()
25 const int rc = _message.close ();
29 void zmq::dish_t::xattach_pipe (pipe_t *pipe_,
30 bool subscribe_to_all_,
31 bool locally_initiated_)
41 send_subscriptions (pipe_);
44 void zmq::dish_t::xread_activated (pipe_t *pipe_)
46 _fq.activated (pipe_);
49 void zmq::dish_t::xwrite_activated (pipe_t *pipe_)
51 _dist.activated (pipe_);
54 void zmq::dish_t::xpipe_terminated (pipe_t *pipe_)
56 _fq.pipe_terminated (pipe_);
57 _dist.pipe_terminated (pipe_);
60 void zmq::dish_t::xhiccuped (pipe_t *pipe_)
63 send_subscriptions (pipe_);
66 int zmq::dish_t::xjoin (
const char *group_)
76 if (!_subscriptions.insert (
group).second) {
82 int rc = msg.init_join ();
85 rc = msg.set_group (group_);
89 rc = _dist.send_to_all (&msg);
92 const int rc2 = msg.close ();
99 int zmq::dish_t::xleave (
const char *group_)
108 if (0 == _subscriptions.erase (
group)) {
114 int rc = msg.init_leave ();
117 rc = msg.set_group (group_);
121 rc = _dist.send_to_all (&msg);
124 const int rc2 = msg.close ();
131 int zmq::dish_t::xsend (msg_t *msg_)
138 bool zmq::dish_t::xhas_out ()
144 int zmq::dish_t::xrecv (msg_t *msg_)
149 const int rc = msg_->move (_message);
151 _has_message =
false;
155 return xxrecv (msg_);
158 int zmq::dish_t::xxrecv (msg_t *msg_)
162 const int rc = _fq.recv (msg_);
170 }
while (0 == _subscriptions.count (
std::string (msg_->group ())));
176 bool zmq::dish_t::xhas_in ()
183 const int rc = xxrecv (&_message);
194 void zmq::dish_t::send_subscriptions (pipe_t *pipe_)
196 for (subscriptions_t::iterator
it = _subscriptions.begin (),
197 end = _subscriptions.end ();
200 int rc = msg.init_join ();
203 rc = msg.set_group (
it->c_str ());
213 zmq::dish_session_t::dish_session_t (io_thread_t *io_thread_,
215 socket_base_t *socket_,
218 session_base_t (io_thread_, connect_, socket_,
options_, addr_),
223 zmq::dish_session_t::~dish_session_t ()
227 int zmq::dish_session_t::push_msg (msg_t *msg_)
229 if (_state ==
group) {
230 if ((msg_->flags () & msg_t::more) != msg_t::more) {
243 const int rc = msg_->init ();
247 const char *group_setting = msg_->group ();
249 if (group_setting[0] != 0)
253 rc = msg_->set_group (
static_cast<char *
> (_group_msg.data ()),
258 rc = _group_msg.close ();
262 if ((msg_->flags () & msg_t::more) == msg_t::more) {
268 rc = session_base_t::push_msg (msg_);
276 int zmq::dish_session_t::pull_msg (msg_t *msg_)
278 int rc = session_base_t::pull_msg (msg_);
283 if (!msg_->is_join () && !msg_->is_leave ())
286 const int group_length =
static_cast<int> (strlen (msg_->group ()));
291 if (msg_->is_join ()) {
292 rc =
command.init_size (group_length + 5);
295 memcpy (
command.data (),
"\4JOIN", 5);
297 rc =
command.init_size (group_length + 6);
300 memcpy (
command.data (),
"\5LEAVE", 6);
303 command.set_flags (msg_t::command);
304 char *command_data =
static_cast<char *
> (
command.data ());
307 memcpy (command_data +
offset, msg_->group (), group_length);
318 void zmq::dish_session_t::reset ()
320 session_base_t::reset ();