12 _verbose_unsubs (
false),
16 _process_subscribe (
false),
17 _only_first_subscribe (
false)
31 const int rc = _message.close ();
36 bool subscribe_to_all_,
37 bool locally_initiated_)
47 _subscriptions.apply (send_subscription, pipe_);
53 _fq.activated (pipe_);
58 _dist.activated (pipe_);
63 _fq.pipe_terminated (pipe_);
64 _dist.pipe_terminated (pipe_);
70 _subscriptions.apply (send_subscription, pipe_);
79 if (optvallen_ !=
sizeof (
int)
80 || *
static_cast<const int *
> (optval_) < 0) {
84 _only_first_subscribe = (*
static_cast<const int *
> (optval_) != 0);
87 #ifdef ZMQ_BUILD_DRAFT_API
89 _verbose_unsubs = (*
static_cast<const int *
> (optval_) != 0);
102 #ifdef ZMQ_USE_RADIX_TREE
103 uint64_t num_subscriptions = _subscriptions.size ();
105 uint64_t num_subscriptions = _subscriptions.num_prefixes ();
108 return do_getsockopt<int> (optval_, optvallen_,
109 (
int) num_subscriptions);
121 unsigned char *
data =
static_cast<unsigned char *
> (msg_->
data ());
123 const bool first_part = !_more_send;
127 _process_subscribe = !_only_first_subscribe;
128 }
else if (!_process_subscribe) {
130 return _dist.send_to_all (msg_);
144 _process_subscribe =
true;
145 return _dist.send_to_all (msg_);
153 _process_subscribe =
true;
154 const bool rm_result = _subscriptions.rm (
data,
size);
155 if (rm_result || _verbose_unsubs)
156 return _dist.send_to_all (msg_);
159 return _dist.send_to_all (msg_);
161 int rc = msg_->
close ();
180 const int rc = msg_->
move (_message);
182 _has_message =
false;
192 int rc = _fq.recv (msg_);
201 if (_more_recv || !
options.filter || match (msg_)) {
209 rc = _fq.recv (msg_);
230 int rc = _fq.recv (&_message);
240 if (!
options.filter || match (&_message)) {
248 rc = _fq.recv (&_message);
256 const bool matching = _subscriptions.check (
257 static_cast<unsigned char *
> (msg_->
data ()), msg_->
size ());
259 return matching ^
options.invert_matching;
266 pipe_t *pipe =
static_cast<pipe_t *
> (arg_);
274 const bool sent = pipe->write (&msg);