15 _verbose_subs (
false),
16 _verbose_unsubs (
false),
19 _process_subscribe (
false),
20 _only_first_subscribe (
false),
23 _send_last_pipe (
false),
34 _welcome_msg.close ();
35 for (std::deque<metadata_t *>::iterator
it = _pending_metadata.begin (),
36 end = _pending_metadata.end ();
38 if (*
it && (*it)->drop_ref ())
43 bool subscribe_to_all_,
44 bool locally_initiated_)
53 if (subscribe_to_all_)
54 _subscriptions.add (
NULL, 0, pipe_);
57 if (_welcome_msg.size () > 0) {
60 const int rc = copy.
copy (_welcome_msg);
62 const bool ok = pipe_->write (©);
69 xread_activated (pipe_);
76 while (pipe_->read (&msg)) {
78 unsigned char *msg_data =
static_cast<unsigned char *
> (msg.
data ()),
81 bool subscribe =
false;
82 bool is_subscribe_or_cancel =
false;
85 const bool first_part = !_more_recv;
88 if (first_part || _process_subscribe) {
94 is_subscribe_or_cancel =
true;
95 }
else if (msg.
size () > 0 && (*msg_data == 0 || *msg_data == 1)) {
98 subscribe = *msg_data == 1;
99 is_subscribe_or_cancel =
true;
105 !_only_first_subscribe || is_subscribe_or_cancel;
107 if (is_subscribe_or_cancel) {
111 _manual_subscriptions.rm (
data,
size, pipe_);
113 _manual_subscriptions.add (
data,
size, pipe_);
115 _pending_pipes.push_back (pipe_);
119 _subscriptions.rm (
data,
size, pipe_);
124 const bool first_added =
125 _subscriptions.add (
data,
size, pipe_);
126 notify = first_added || _verbose_subs;
146 *notification.
data () = 1;
148 *notification.
data () = 0;
151 _pending_data.push_back (
ZMQ_MOVE (notification));
154 _pending_metadata.push_back (
metadata);
155 _pending_flags.push_back (0);
161 _pending_data.push_back (
blob_t (msg_data, msg.
size ()));
164 _pending_metadata.push_back (
metadata);
165 _pending_flags.push_back (msg.
flags ());
174 _dist.activated (pipe_);
184 if (optvallen_ !=
sizeof (
int)
185 || *
static_cast<const int *
> (optval_) < 0) {
190 _verbose_subs = (*
static_cast<const int *
> (optval_) != 0);
191 _verbose_unsubs =
false;
193 _verbose_subs = (*
static_cast<const int *
> (optval_) != 0);
194 _verbose_unsubs = _verbose_subs;
196 _manual = (*
static_cast<const int *
> (optval_) != 0);
197 _send_last_pipe = _manual;
199 _lossy = (*
static_cast<const int *
> (optval_) == 0);
201 _manual = (*
static_cast<const int *
> (optval_) != 0);
203 _only_first_subscribe = (*
static_cast<const int *
> (optval_) != 0);
205 if (_last_pipe !=
NULL)
206 _subscriptions.add ((
unsigned char *) optval_, optvallen_,
209 if (_last_pipe !=
NULL)
210 _subscriptions.rm ((
unsigned char *) optval_, optvallen_,
213 _welcome_msg.close ();
215 if (optvallen_ > 0) {
216 const int rc = _welcome_msg.init_size (optvallen_);
219 unsigned char *
data =
220 static_cast<unsigned char *
> (_welcome_msg.data ());
221 memcpy (
data, optval_, optvallen_);
223 _welcome_msg.init ();
236 return do_getsockopt<int> (optval_, optvallen_,
237 (
int) _subscriptions.num_prefixes ());
258 _manual_subscriptions.rm (pipe_, send_unsubscription,
this,
false);
262 _subscriptions.rm (pipe_,
stub,
static_cast<void *
> (
NULL),
false);
266 if (pipe_ == _last_pipe) {
273 _subscriptions.rm (pipe_, send_unsubscription,
this, !_verbose_unsubs);
276 _dist.pipe_terminated (pipe_);
299 if (
unlikely (_manual && _last_pipe && _send_last_pipe)) {
300 _subscriptions.match (
static_cast<unsigned char *
> (msg_->
data ()),
301 msg_->
size (), mark_last_pipe_as_matching,
305 _subscriptions.match (
static_cast<unsigned char *
> (msg_->
data ()),
306 msg_->
size (), mark_as_matching,
this);
309 _dist.reverse_match ();
314 if (_lossy || _dist.check_hwm ()) {
315 if (_dist.send_to_matching (msg_) == 0) {
320 _more_send = msg_more;
330 return _dist.has_out ();
336 if (_pending_data.empty ()) {
342 if (_manual && !_pending_pipes.empty ()) {
343 _last_pipe = _pending_pipes.front ();
344 _pending_pipes.pop_front ();
348 if (_last_pipe !=
NULL && !_dist.has_pipe (_last_pipe)) {
353 int rc = msg_->
close ();
355 rc = msg_->
init_size (_pending_data.front ().size ());
357 memcpy (msg_->
data (), _pending_data.front ().data (),
358 _pending_data.front ().size ());
367 msg_->
set_flags (_pending_flags.front ());
368 _pending_data.pop_front ();
369 _pending_metadata.pop_front ();
370 _pending_flags.pop_front ();
376 return !_pending_data.empty ();
389 memcpy (unsub.
data () + 1,
data_, size_);