10 zmq::dish_t::dish_t (
class ctx_t *parent_, uint32_t tid_, 
int sid_) :
 
   11     socket_base_t (parent_, tid_, sid_, 
true), _has_message (
false)
 
   19     const int rc = _message.init ();
 
   23 zmq::dish_t::~dish_t ()
 
   25     const int rc = _message.close ();
 
   29 void zmq::dish_t::xattach_pipe (pipe_t *pipe_,
 
   30                                 bool subscribe_to_all_,
 
   31                                 bool locally_initiated_)
 
   41     send_subscriptions (pipe_);
 
   44 void zmq::dish_t::xread_activated (pipe_t *pipe_)
 
   46     _fq.activated (pipe_);
 
   49 void zmq::dish_t::xwrite_activated (pipe_t *pipe_)
 
   51     _dist.activated (pipe_);
 
   54 void zmq::dish_t::xpipe_terminated (pipe_t *pipe_)
 
   56     _fq.pipe_terminated (pipe_);
 
   57     _dist.pipe_terminated (pipe_);
 
   60 void zmq::dish_t::xhiccuped (pipe_t *pipe_)
 
   63     send_subscriptions (pipe_);
 
   66 int zmq::dish_t::xjoin (
const char *group_)
 
   76     if (!_subscriptions.insert (
group).second) {
 
   82     int rc = msg.init_join ();
 
   85     rc = msg.set_group (group_);
 
   89     rc = _dist.send_to_all (&msg);
 
   92     const int rc2 = msg.close ();
 
   99 int zmq::dish_t::xleave (
const char *group_)
 
  108     if (0 == _subscriptions.erase (
group)) {
 
  114     int rc = msg.init_leave ();
 
  117     rc = msg.set_group (group_);
 
  121     rc = _dist.send_to_all (&msg);
 
  124     const int rc2 = msg.close ();
 
  131 int zmq::dish_t::xsend (msg_t *msg_)
 
  138 bool zmq::dish_t::xhas_out ()
 
  144 int zmq::dish_t::xrecv (msg_t *msg_)
 
  149         const int rc = msg_->move (_message);
 
  151         _has_message = 
false;
 
  155     return xxrecv (msg_);
 
  158 int zmq::dish_t::xxrecv (msg_t *msg_)
 
  162         const int rc = _fq.recv (msg_);
 
  170     } 
while (0 == _subscriptions.count (
std::string (msg_->group ())));
 
  176 bool zmq::dish_t::xhas_in ()
 
  183     const int rc = xxrecv (&_message);
 
  194 void zmq::dish_t::send_subscriptions (pipe_t *pipe_)
 
  196     for (subscriptions_t::iterator 
it = _subscriptions.begin (),
 
  197                                    end = _subscriptions.end ();
 
  200         int rc = msg.init_join ();
 
  203         rc = msg.set_group (
it->c_str ());
 
  213 zmq::dish_session_t::dish_session_t (io_thread_t *io_thread_,
 
  215                                      socket_base_t *socket_,
 
  218     session_base_t (io_thread_, connect_, socket_, 
options_, addr_),
 
  223 zmq::dish_session_t::~dish_session_t ()
 
  227 int zmq::dish_session_t::push_msg (msg_t *msg_)
 
  229     if (_state == 
group) {
 
  230         if ((msg_->flags () & msg_t::more) != msg_t::more) {
 
  243         const int rc = msg_->init ();
 
  247     const char *group_setting = msg_->group ();
 
  249     if (group_setting[0] != 0)
 
  253     rc = msg_->set_group (
static_cast<char *
> (_group_msg.data ()),
 
  258     rc = _group_msg.close ();
 
  262     if ((msg_->flags () & msg_t::more) == msg_t::more) {
 
  268     rc = session_base_t::push_msg (msg_);
 
  276 int zmq::dish_session_t::pull_msg (msg_t *msg_)
 
  278     int rc = session_base_t::pull_msg (msg_);
 
  283     if (!msg_->is_join () && !msg_->is_leave ())
 
  286     const int group_length = 
static_cast<int> (strlen (msg_->group ()));
 
  291     if (msg_->is_join ()) {
 
  292         rc = 
command.init_size (group_length + 5);
 
  295         memcpy (
command.data (), 
"\4JOIN", 5);
 
  297         rc = 
command.init_size (group_length + 6);
 
  300         memcpy (
command.data (), 
"\5LEAVE", 6);
 
  303     command.set_flags (msg_t::command);
 
  304     char *command_data = 
static_cast<char *
> (
command.data ());
 
  307     memcpy (command_data + 
offset, msg_->group (), group_length);
 
  318 void zmq::dish_session_t::reset ()
 
  320     session_base_t::reset ();