xsub.cpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #include "precompiled.hpp"
4 #include <string.h>
5 
6 #include "macros.hpp"
7 #include "xsub.hpp"
8 #include "err.hpp"
9 
10 zmq::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
11  socket_base_t (parent_, tid_, sid_),
12  _verbose_unsubs (false),
13  _has_message (false),
14  _more_send (false),
15  _more_recv (false),
16  _process_subscribe (false),
17  _only_first_subscribe (false)
18 {
20 
21  // When socket is being closed down we don't want to wait till pending
22  // subscription commands are sent to the wire.
23  options.linger.store (0);
24 
25  const int rc = _message.init ();
26  errno_assert (rc == 0);
27 }
28 
30 {
31  const int rc = _message.close ();
32  errno_assert (rc == 0);
33 }
34 
35 void zmq::xsub_t::xattach_pipe (pipe_t *pipe_,
36  bool subscribe_to_all_,
37  bool locally_initiated_)
38 {
39  LIBZMQ_UNUSED (subscribe_to_all_);
40  LIBZMQ_UNUSED (locally_initiated_);
41 
42  zmq_assert (pipe_);
43  _fq.attach (pipe_);
44  _dist.attach (pipe_);
45 
46  // Send all the cached subscriptions to the new upstream peer.
47  _subscriptions.apply (send_subscription, pipe_);
48  pipe_->flush ();
49 }
50 
51 void zmq::xsub_t::xread_activated (pipe_t *pipe_)
52 {
53  _fq.activated (pipe_);
54 }
55 
56 void zmq::xsub_t::xwrite_activated (pipe_t *pipe_)
57 {
58  _dist.activated (pipe_);
59 }
60 
61 void zmq::xsub_t::xpipe_terminated (pipe_t *pipe_)
62 {
63  _fq.pipe_terminated (pipe_);
64  _dist.pipe_terminated (pipe_);
65 }
66 
67 void zmq::xsub_t::xhiccuped (pipe_t *pipe_)
68 {
69  // Send all the cached subscriptions to the hiccuped pipe.
70  _subscriptions.apply (send_subscription, pipe_);
71  pipe_->flush ();
72 }
73 
74 int zmq::xsub_t::xsetsockopt (int option_,
75  const void *optval_,
76  size_t optvallen_)
77 {
78  if (option_ == ZMQ_ONLY_FIRST_SUBSCRIBE) {
79  if (optvallen_ != sizeof (int)
80  || *static_cast<const int *> (optval_) < 0) {
81  errno = EINVAL;
82  return -1;
83  }
84  _only_first_subscribe = (*static_cast<const int *> (optval_) != 0);
85  return 0;
86  }
87 #ifdef ZMQ_BUILD_DRAFT_API
88  else if (option_ == ZMQ_XSUB_VERBOSE_UNSUBSCRIBE) {
89  _verbose_unsubs = (*static_cast<const int *> (optval_) != 0);
90  return 0;
91  }
92 #endif
93  errno = EINVAL;
94  return -1;
95 }
96 
97 int zmq::xsub_t::xgetsockopt (int option_, void *optval_, size_t *optvallen_)
98 {
99  if (option_ == ZMQ_TOPICS_COUNT) {
100  // make sure to use a multi-thread safe function to avoid race conditions with I/O threads
101  // where subscriptions are processed:
102 #ifdef ZMQ_USE_RADIX_TREE
103  uint64_t num_subscriptions = _subscriptions.size ();
104 #else
105  uint64_t num_subscriptions = _subscriptions.num_prefixes ();
106 #endif
107 
108  return do_getsockopt<int> (optval_, optvallen_,
109  (int) num_subscriptions);
110  }
111 
112  // room for future options here
113 
114  errno = EINVAL;
115  return -1;
116 }
117 
119 {
120  size_t size = msg_->size ();
121  unsigned char *data = static_cast<unsigned char *> (msg_->data ());
122 
123  const bool first_part = !_more_send;
124  _more_send = (msg_->flags () & msg_t::more) != 0;
125 
126  if (first_part) {
127  _process_subscribe = !_only_first_subscribe;
128  } else if (!_process_subscribe) {
129  // User message sent upstream to XPUB socket
130  return _dist.send_to_all (msg_);
131  }
132 
133  if (msg_->is_subscribe () || (size > 0 && *data == 1)) {
134  // Process subscribe message
135  // This used to filter out duplicate subscriptions,
136  // however this is already done on the XPUB side and
137  // doing it here as well breaks ZMQ_XPUB_VERBOSE
138  // when there are forwarding devices involved.
139  if (!msg_->is_subscribe ()) {
140  data = data + 1;
141  size = size - 1;
142  }
143  _subscriptions.add (data, size);
144  _process_subscribe = true;
145  return _dist.send_to_all (msg_);
146  }
147  if (msg_->is_cancel () || (size > 0 && *data == 0)) {
148  // Process unsubscribe message
149  if (!msg_->is_cancel ()) {
150  data = data + 1;
151  size = size - 1;
152  }
153  _process_subscribe = true;
154  const bool rm_result = _subscriptions.rm (data, size);
155  if (rm_result || _verbose_unsubs)
156  return _dist.send_to_all (msg_);
157  } else
158  // User message sent upstream to XPUB socket
159  return _dist.send_to_all (msg_);
160 
161  int rc = msg_->close ();
162  errno_assert (rc == 0);
163  rc = msg_->init ();
164  errno_assert (rc == 0);
165 
166  return 0;
167 }
168 
170 {
171  // Subscription can be added/removed anytime.
172  return true;
173 }
174 
176 {
177  // If there's already a message prepared by a previous call to zmq_poll,
178  // return it straight ahead.
179  if (_has_message) {
180  const int rc = msg_->move (_message);
181  errno_assert (rc == 0);
182  _has_message = false;
183  _more_recv = (msg_->flags () & msg_t::more) != 0;
184  return 0;
185  }
186 
187  // TODO: This can result in infinite loop in the case of continuous
188  // stream of non-matching messages which breaks the non-blocking recv
189  // semantics.
190  while (true) {
191  // Get a message using fair queueing algorithm.
192  int rc = _fq.recv (msg_);
193 
194  // If there's no message available, return immediately.
195  // The same when error occurs.
196  if (rc != 0)
197  return -1;
198 
199  // Check whether the message matches at least one subscription.
200  // Non-initial parts of the message are passed
201  if (_more_recv || !options.filter || match (msg_)) {
202  _more_recv = (msg_->flags () & msg_t::more) != 0;
203  return 0;
204  }
205 
206  // Message doesn't match. Pop any remaining parts of the message
207  // from the pipe.
208  while (msg_->flags () & msg_t::more) {
209  rc = _fq.recv (msg_);
210  errno_assert (rc == 0);
211  }
212  }
213 }
214 
216 {
217  // There are subsequent parts of the partly-read message available.
218  if (_more_recv)
219  return true;
220 
221  // If there's already a message prepared by a previous call to zmq_poll,
222  // return straight ahead.
223  if (_has_message)
224  return true;
225 
226  // TODO: This can result in infinite loop in the case of continuous
227  // stream of non-matching messages.
228  while (true) {
229  // Get a message using fair queueing algorithm.
230  int rc = _fq.recv (&_message);
231 
232  // If there's no message available, return immediately.
233  // The same when error occurs.
234  if (rc != 0) {
236  return false;
237  }
238 
239  // Check whether the message matches at least one subscription.
240  if (!options.filter || match (&_message)) {
241  _has_message = true;
242  return true;
243  }
244 
245  // Message doesn't match. Pop any remaining parts of the message
246  // from the pipe.
247  while (_message.flags () & msg_t::more) {
248  rc = _fq.recv (&_message);
249  errno_assert (rc == 0);
250  }
251  }
252 }
253 
255 {
256  const bool matching = _subscriptions.check (
257  static_cast<unsigned char *> (msg_->data ()), msg_->size ());
258 
259  return matching ^ options.invert_matching;
260 }
261 
263  size_t size_,
264  void *arg_)
265 {
266  pipe_t *pipe = static_cast<pipe_t *> (arg_);
267 
268  // Create the subscription message.
269  msg_t msg;
270  const int rc = msg.init_subscribe (size_, data_);
271  errno_assert (rc == 0);
272 
273  // Send it to the pipe.
274  const bool sent = pipe->write (&msg);
275  // If we reached the SNDHWM, and thus cannot send the subscription, drop
276  // the subscription message instead. This matches the behaviour of
277  // zmq_setsockopt(ZMQ_SUBSCRIBE, ...), which also drops subscriptions
278  // when the SNDHWM is reached.
279  if (!sent)
280  msg.close ();
281 }
zmq::msg_t::init_subscribe
int init_subscribe(const size_t size_, const unsigned char *topic)
Definition: msg.cpp:212
data_
StringPiece data_
Definition: bytestream_unittest.cc:60
zmq::xsub_t::xhiccuped
void xhiccuped(pipe_t *pipe_) ZMQ_FINAL
Definition: xsub.cpp:67
options
Message * options
Definition: src/google/protobuf/descriptor.cc:3119
EINVAL
#define EINVAL
Definition: errno.hpp:25
ZMQ_XSUB_VERBOSE_UNSUBSCRIBE
#define ZMQ_XSUB_VERBOSE_UNSUBSCRIBE
Definition: zmq_draft.h:47
zmq::atomic_value_t::store
void store(const int value_) ZMQ_NOEXCEPT
Definition: atomic_ptr.hpp:221
EAGAIN
#define EAGAIN
Definition: errno.hpp:14
zmq::xsub_t::xhas_out
bool xhas_out() ZMQ_OVERRIDE
Definition: xsub.cpp:169
precompiled.hpp
zmq_assert
#define zmq_assert(x)
Definition: err.hpp:102
zmq::xsub_t::xwrite_activated
void xwrite_activated(zmq::pipe_t *pipe_) ZMQ_FINAL
Definition: xsub.cpp:56
errno
int errno
ZMQ_XSUB
#define ZMQ_XSUB
Definition: zmq.h:268
zmq::socket_base_t
Definition: socket_base.hpp:31
ZMQ_TOPICS_COUNT
#define ZMQ_TOPICS_COUNT
Definition: zmq_draft.h:48
zmq::msg_t::move
int move(msg_t &src_)
Definition: msg.cpp:305
zmq::xsub_t::~xsub_t
~xsub_t() ZMQ_OVERRIDE
Definition: xsub.cpp:29
zmq::xsub_t::xsetsockopt
int xsetsockopt(int option_, const void *optval_, size_t optvallen_) ZMQ_OVERRIDE
Definition: xsub.cpp:74
errno_assert
#define errno_assert(x)
Definition: err.hpp:113
macros.hpp
zmq::msg_t::flags
unsigned char flags
Definition: msg.hpp:232
LIBZMQ_UNUSED
#define LIBZMQ_UNUSED(object)
Definition: macros.hpp:6
zmq::msg_t::close
int close()
Definition: msg.cpp:242
zmq::xsub_t::xrecv
int xrecv(zmq::msg_t *msg_) ZMQ_FINAL
Definition: xsub.cpp:175
zmq::own_t::options
options_t options
Definition: own.hpp:78
zmq::xsub_t::xhas_in
bool xhas_in() ZMQ_FINAL
Definition: xsub.cpp:215
zmq::xsub_t::xsend
int xsend(zmq::msg_t *msg_) ZMQ_OVERRIDE
Definition: xsub.cpp:118
zmq::xsub_t::xgetsockopt
int xgetsockopt(int option_, void *optval_, size_t *optvallen_) ZMQ_FINAL
Definition: xsub.cpp:97
zmq::options_t::linger
atomic_value_t linger
Definition: options.hpp:83
zmq::xsub_t::send_subscription
static void send_subscription(unsigned char *data_, size_t size_, void *arg_)
Definition: xsub.cpp:262
zmq::msg_t::init
int init()
Definition: msg.cpp:50
zmq::msg_t::more
@ more
Definition: msg.hpp:55
ZMQ_ONLY_FIRST_SUBSCRIBE
#define ZMQ_ONLY_FIRST_SUBSCRIBE
Definition: zmq_draft.h:40
size
GLsizeiptr size
Definition: glcorearb.h:2943
err.hpp
zmq::xsub_t::xattach_pipe
void xattach_pipe(zmq::pipe_t *pipe_, bool subscribe_to_all_, bool locally_initiated_) ZMQ_FINAL
Definition: xsub.cpp:35
data
GLint GLenum GLsizei GLsizei GLsizei GLint GLsizei const GLvoid * data
Definition: glcorearb.h:2879
zmq::xsub_t::match
bool match(zmq::msg_t *msg_)
Definition: xsub.cpp:254
zmq::xsub_t::xsub_t
xsub_t(zmq::ctx_t *parent_, uint32_t tid_, int sid_)
Definition: xsub.cpp:10
zmq::xsub_t::xpipe_terminated
void xpipe_terminated(zmq::pipe_t *pipe_) ZMQ_FINAL
Definition: xsub.cpp:61
xsub.hpp
zmq::msg_t::size
unsigned char size
Definition: msg.hpp:240
zmq::msg_t::is_subscribe
bool is_subscribe() const
Definition: msg.hpp:113
zmq::msg_t::data
unsigned char data[max_vsm_size]
Definition: msg.hpp:239
zmq::xsub_t::xread_activated
void xread_activated(zmq::pipe_t *pipe_) ZMQ_FINAL
Definition: xsub.cpp:51
zmq::msg_t::is_cancel
bool is_cancel() const
Definition: msg.hpp:118
zmq::options_t::type
int8_t type
Definition: options.hpp:80
false
#define false
Definition: cJSON.c:70
zmq::xsub_t::_message
msg_t _message
Definition: xsub.hpp:75
zmq::msg_t
Definition: msg.hpp:33


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:07:02