xpub.cpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #include "precompiled.hpp"
4 #include <string.h>
5 
6 #include "xpub.hpp"
7 #include "pipe.hpp"
8 #include "err.hpp"
9 #include "msg.hpp"
10 #include "macros.hpp"
11 #include "generic_mtrie_impl.hpp"
12 
13 zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
14  socket_base_t (parent_, tid_, sid_),
15  _verbose_subs (false),
16  _verbose_unsubs (false),
17  _more_send (false),
18  _more_recv (false),
19  _process_subscribe (false),
20  _only_first_subscribe (false),
21  _lossy (true),
22  _manual (false),
23  _send_last_pipe (false),
24  _pending_pipes (),
25  _welcome_msg ()
26 {
27  _last_pipe = NULL;
29  _welcome_msg.init ();
30 }
31 
33 {
34  _welcome_msg.close ();
35  for (std::deque<metadata_t *>::iterator it = _pending_metadata.begin (),
36  end = _pending_metadata.end ();
37  it != end; ++it)
38  if (*it && (*it)->drop_ref ())
39  LIBZMQ_DELETE (*it);
40 }
41 
42 void zmq::xpub_t::xattach_pipe (pipe_t *pipe_,
43  bool subscribe_to_all_,
44  bool locally_initiated_)
45 {
46  LIBZMQ_UNUSED (locally_initiated_);
47 
48  zmq_assert (pipe_);
49  _dist.attach (pipe_);
50 
51  // If subscribe_to_all_ is specified, the caller would like to subscribe
52  // to all data on this pipe, implicitly.
53  if (subscribe_to_all_)
54  _subscriptions.add (NULL, 0, pipe_);
55 
56  // if welcome message exists, send a copy of it
57  if (_welcome_msg.size () > 0) {
58  msg_t copy;
59  copy.init ();
60  const int rc = copy.copy (_welcome_msg);
61  errno_assert (rc == 0);
62  const bool ok = pipe_->write (&copy);
63  zmq_assert (ok);
64  pipe_->flush ();
65  }
66 
67  // The pipe is active when attached. Let's read the subscriptions from
68  // it, if any.
69  xread_activated (pipe_);
70 }
71 
72 void zmq::xpub_t::xread_activated (pipe_t *pipe_)
73 {
74  // There are some subscriptions waiting. Let's process them.
75  msg_t msg;
76  while (pipe_->read (&msg)) {
77  metadata_t *metadata = msg.metadata ();
78  unsigned char *msg_data = static_cast<unsigned char *> (msg.data ()),
79  *data = NULL;
80  size_t size = 0;
81  bool subscribe = false;
82  bool is_subscribe_or_cancel = false;
83  bool notify = false;
84 
85  const bool first_part = !_more_recv;
86  _more_recv = (msg.flags () & msg_t::more) != 0;
87 
88  if (first_part || _process_subscribe) {
89  // Apply the subscription to the trie
90  if (msg.is_subscribe () || msg.is_cancel ()) {
91  data = static_cast<unsigned char *> (msg.command_body ());
92  size = msg.command_body_size ();
93  subscribe = msg.is_subscribe ();
94  is_subscribe_or_cancel = true;
95  } else if (msg.size () > 0 && (*msg_data == 0 || *msg_data == 1)) {
96  data = msg_data + 1;
97  size = msg.size () - 1;
98  subscribe = *msg_data == 1;
99  is_subscribe_or_cancel = true;
100  }
101  }
102 
103  if (first_part)
104  _process_subscribe =
105  !_only_first_subscribe || is_subscribe_or_cancel;
106 
107  if (is_subscribe_or_cancel) {
108  if (_manual) {
109  // Store manual subscription to use on termination
110  if (!subscribe)
111  _manual_subscriptions.rm (data, size, pipe_);
112  else
113  _manual_subscriptions.add (data, size, pipe_);
114 
115  _pending_pipes.push_back (pipe_);
116  } else {
117  if (!subscribe) {
118  const mtrie_t::rm_result rm_result =
119  _subscriptions.rm (data, size, pipe_);
120  // TODO reconsider what to do if rm_result == mtrie_t::not_found
121  notify =
122  rm_result != mtrie_t::values_remain || _verbose_unsubs;
123  } else {
124  const bool first_added =
125  _subscriptions.add (data, size, pipe_);
126  notify = first_added || _verbose_subs;
127  }
128  }
129 
130  // If the request was a new subscription, or the subscription
131  // was removed, or verbose mode or manual mode are enabled, store it
132  // so that it can be passed to the user on next recv call.
133  if (_manual || (options.type == ZMQ_XPUB && notify)) {
134  // ZMTP 3.1 hack: we need to support sub/cancel commands, but
135  // we can't give them back to userspace as it would be an API
136  // breakage since the payload of the message is completely
137  // different. Manually craft an old-style message instead.
138  // Although with other transports it would be possible to simply
139  // reuse the same buffer and prefix a 0/1 byte to the topic, with
140  // inproc the subscribe/cancel command string is not present in
141  // the message, so this optimization is not possible.
142  // The pushback makes a copy of the data array anyway, so the
143  // number of buffer copies does not change.
144  blob_t notification (size + 1);
145  if (subscribe)
146  *notification.data () = 1;
147  else
148  *notification.data () = 0;
149  memcpy (notification.data () + 1, data, size);
150 
151  _pending_data.push_back (ZMQ_MOVE (notification));
152  if (metadata)
153  metadata->add_ref ();
154  _pending_metadata.push_back (metadata);
155  _pending_flags.push_back (0);
156  }
157  } else if (options.type != ZMQ_PUB) {
158  // Process user message coming upstream from xsub socket,
159  // but not if the type is PUB, which never processes user
160  // messages
161  _pending_data.push_back (blob_t (msg_data, msg.size ()));
162  if (metadata)
163  metadata->add_ref ();
164  _pending_metadata.push_back (metadata);
165  _pending_flags.push_back (msg.flags ());
166  }
167 
168  msg.close ();
169  }
170 }
171 
172 void zmq::xpub_t::xwrite_activated (pipe_t *pipe_)
173 {
174  _dist.activated (pipe_);
175 }
176 
177 int zmq::xpub_t::xsetsockopt (int option_,
178  const void *optval_,
179  size_t optvallen_)
180 {
181  if (option_ == ZMQ_XPUB_VERBOSE || option_ == ZMQ_XPUB_VERBOSER
182  || option_ == ZMQ_XPUB_MANUAL_LAST_VALUE || option_ == ZMQ_XPUB_NODROP
183  || option_ == ZMQ_XPUB_MANUAL || option_ == ZMQ_ONLY_FIRST_SUBSCRIBE) {
184  if (optvallen_ != sizeof (int)
185  || *static_cast<const int *> (optval_) < 0) {
186  errno = EINVAL;
187  return -1;
188  }
189  if (option_ == ZMQ_XPUB_VERBOSE) {
190  _verbose_subs = (*static_cast<const int *> (optval_) != 0);
191  _verbose_unsubs = false;
192  } else if (option_ == ZMQ_XPUB_VERBOSER) {
193  _verbose_subs = (*static_cast<const int *> (optval_) != 0);
194  _verbose_unsubs = _verbose_subs;
195  } else if (option_ == ZMQ_XPUB_MANUAL_LAST_VALUE) {
196  _manual = (*static_cast<const int *> (optval_) != 0);
197  _send_last_pipe = _manual;
198  } else if (option_ == ZMQ_XPUB_NODROP)
199  _lossy = (*static_cast<const int *> (optval_) == 0);
200  else if (option_ == ZMQ_XPUB_MANUAL)
201  _manual = (*static_cast<const int *> (optval_) != 0);
202  else if (option_ == ZMQ_ONLY_FIRST_SUBSCRIBE)
203  _only_first_subscribe = (*static_cast<const int *> (optval_) != 0);
204  } else if (option_ == ZMQ_SUBSCRIBE && _manual) {
205  if (_last_pipe != NULL)
206  _subscriptions.add ((unsigned char *) optval_, optvallen_,
207  _last_pipe);
208  } else if (option_ == ZMQ_UNSUBSCRIBE && _manual) {
209  if (_last_pipe != NULL)
210  _subscriptions.rm ((unsigned char *) optval_, optvallen_,
211  _last_pipe);
212  } else if (option_ == ZMQ_XPUB_WELCOME_MSG) {
213  _welcome_msg.close ();
214 
215  if (optvallen_ > 0) {
216  const int rc = _welcome_msg.init_size (optvallen_);
217  errno_assert (rc == 0);
218 
219  unsigned char *data =
220  static_cast<unsigned char *> (_welcome_msg.data ());
221  memcpy (data, optval_, optvallen_);
222  } else
223  _welcome_msg.init ();
224  } else {
225  errno = EINVAL;
226  return -1;
227  }
228  return 0;
229 }
230 
231 int zmq::xpub_t::xgetsockopt (int option_, void *optval_, size_t *optvallen_)
232 {
233  if (option_ == ZMQ_TOPICS_COUNT) {
234  // make sure to use a multi-thread safe function to avoid race conditions with I/O threads
235  // where subscriptions are processed:
236  return do_getsockopt<int> (optval_, optvallen_,
237  (int) _subscriptions.num_prefixes ());
238  }
239 
240  // room for future options here
241 
242  errno = EINVAL;
243  return -1;
244 }
245 
246 static void stub (zmq::mtrie_t::prefix_t data_, size_t size_, void *arg_)
247 {
249  LIBZMQ_UNUSED (size_);
250  LIBZMQ_UNUSED (arg_);
251 }
252 
253 void zmq::xpub_t::xpipe_terminated (pipe_t *pipe_)
254 {
255  if (_manual) {
256  // Remove the pipe from the trie and send corresponding manual
257  // unsubscriptions upstream.
258  _manual_subscriptions.rm (pipe_, send_unsubscription, this, false);
259  // Remove pipe without actually sending the message as it was taken
260  // care of by the manual call above. subscriptions is the real mtrie,
261  // so the pipe must be removed from there or it will be left over.
262  _subscriptions.rm (pipe_, stub, static_cast<void *> (NULL), false);
263 
264  // In case the pipe is currently set as last we must clear it to prevent
265  // subscriptions from being re-added.
266  if (pipe_ == _last_pipe) {
267  _last_pipe = NULL;
268  }
269  } else {
270  // Remove the pipe from the trie. If there are topics that nobody
271  // is interested in anymore, send corresponding unsubscriptions
272  // upstream.
273  _subscriptions.rm (pipe_, send_unsubscription, this, !_verbose_unsubs);
274  }
275 
276  _dist.pipe_terminated (pipe_);
277 }
278 
279 void zmq::xpub_t::mark_as_matching (pipe_t *pipe_, xpub_t *self_)
280 {
281  self_->_dist.match (pipe_);
282 }
283 
285 {
286  if (self_->_last_pipe == pipe_)
287  self_->_dist.match (pipe_);
288 }
289 
291 {
292  const bool msg_more = (msg_->flags () & msg_t::more) != 0;
293 
294  // For the first part of multi-part message, find the matching pipes.
295  if (!_more_send) {
296  // Ensure nothing from previous failed attempt to send is left matched
297  _dist.unmatch ();
298 
299  if (unlikely (_manual && _last_pipe && _send_last_pipe)) {
300  _subscriptions.match (static_cast<unsigned char *> (msg_->data ()),
301  msg_->size (), mark_last_pipe_as_matching,
302  this);
303  _last_pipe = NULL;
304  } else
305  _subscriptions.match (static_cast<unsigned char *> (msg_->data ()),
306  msg_->size (), mark_as_matching, this);
307  // If inverted matching is used, reverse the selection now
308  if (options.invert_matching) {
309  _dist.reverse_match ();
310  }
311  }
312 
313  int rc = -1; // Assume we fail
314  if (_lossy || _dist.check_hwm ()) {
315  if (_dist.send_to_matching (msg_) == 0) {
316  // If we are at the end of multi-part message we can mark
317  // all the pipes as non-matching.
318  if (!msg_more)
319  _dist.unmatch ();
320  _more_send = msg_more;
321  rc = 0; // Yay, sent successfully
322  }
323  } else
324  errno = EAGAIN;
325  return rc;
326 }
327 
329 {
330  return _dist.has_out ();
331 }
332 
334 {
335  // If there is at least one
336  if (_pending_data.empty ()) {
337  errno = EAGAIN;
338  return -1;
339  }
340 
341  // User is reading a message, set last_pipe and remove it from the deque
342  if (_manual && !_pending_pipes.empty ()) {
343  _last_pipe = _pending_pipes.front ();
344  _pending_pipes.pop_front ();
345 
346  // If the distributor doesn't know about this pipe it must have already
347  // been terminated and thus we can't allow manual subscriptions.
348  if (_last_pipe != NULL && !_dist.has_pipe (_last_pipe)) {
349  _last_pipe = NULL;
350  }
351  }
352 
353  int rc = msg_->close ();
354  errno_assert (rc == 0);
355  rc = msg_->init_size (_pending_data.front ().size ());
356  errno_assert (rc == 0);
357  memcpy (msg_->data (), _pending_data.front ().data (),
358  _pending_data.front ().size ());
359 
360  // set metadata only if there is some
361  if (metadata_t *metadata = _pending_metadata.front ()) {
362  msg_->set_metadata (metadata);
363  // Remove ref corresponding to vector placement
364  metadata->drop_ref ();
365  }
366 
367  msg_->set_flags (_pending_flags.front ());
368  _pending_data.pop_front ();
369  _pending_metadata.pop_front ();
370  _pending_flags.pop_front ();
371  return 0;
372 }
373 
375 {
376  return !_pending_data.empty ();
377 }
378 
380  size_t size_,
381  xpub_t *self_)
382 {
383  if (self_->options.type != ZMQ_PUB) {
384  // Place the unsubscription to the queue of pending (un)subscriptions
385  // to be retrieved by the user later on.
386  blob_t unsub (size_ + 1);
387  *unsub.data () = 0;
388  if (size_ > 0)
389  memcpy (unsub.data () + 1, data_, size_);
390  self_->_pending_data.ZMQ_PUSH_OR_EMPLACE_BACK (ZMQ_MOVE (unsub));
391  self_->_pending_metadata.push_back (NULL);
392  self_->_pending_flags.push_back (0);
393 
394  if (self_->_manual) {
395  self_->_last_pipe = NULL;
396  self_->_pending_pipes.push_back (NULL);
397  }
398  }
399 }
LIBZMQ_DELETE
#define LIBZMQ_DELETE(p_object)
Definition: macros.hpp:7
zmq::xpub_t::xsend
int xsend(zmq::msg_t *msg_) ZMQ_FINAL
Definition: xpub.cpp:290
data_
StringPiece data_
Definition: bytestream_unittest.cc:60
zmq::xpub_t::xpipe_terminated
void xpipe_terminated(zmq::pipe_t *pipe_) ZMQ_FINAL
Definition: xpub.cpp:253
ZMQ_XPUB_MANUAL
#define ZMQ_XPUB_MANUAL
Definition: zmq.h:332
end
GLuint GLuint end
Definition: glcorearb.h:2858
zmq::xpub_t::xrecv
int xrecv(zmq::msg_t *msg_) ZMQ_OVERRIDE
Definition: xpub.cpp:333
NULL
NULL
Definition: test_security_zap.cpp:405
ZMQ_XPUB
#define ZMQ_XPUB
Definition: zmq.h:267
ZMQ_PUB
#define ZMQ_PUB
Definition: zmq.h:259
options
Message * options
Definition: src/google/protobuf/descriptor.cc:3119
EINVAL
#define EINVAL
Definition: errno.hpp:25
benchmarks.util.result_uploader.metadata
def metadata
Definition: result_uploader.py:97
EAGAIN
#define EAGAIN
Definition: errno.hpp:14
zmq::msg_t::copy
int copy(msg_t &src_)
Definition: msg.cpp:326
precompiled.hpp
zmq::xpub_t::mark_last_pipe_as_matching
static void mark_last_pipe_as_matching(zmq::pipe_t *pipe_, xpub_t *self_)
Definition: xpub.cpp:284
zmq_assert
#define zmq_assert(x)
Definition: err.hpp:102
ZMQ_XPUB_MANUAL_LAST_VALUE
#define ZMQ_XPUB_MANUAL_LAST_VALUE
Definition: zmq_draft.h:30
zmq::xpub_t::_pending_flags
std::deque< unsigned char > _pending_flags
Definition: xpub.hpp:108
zmq::xpub_t::xpub_t
xpub_t(zmq::ctx_t *parent_, uint32_t tid_, int sid_)
Definition: xpub.cpp:13
errno
int errno
ZMQ_SUBSCRIBE
#define ZMQ_SUBSCRIBE
Definition: zmq.h:278
zmq::socket_base_t
Definition: socket_base.hpp:31
zmq::xpub_t::xsetsockopt
int xsetsockopt(int option_, const void *optval_, size_t optvallen_) ZMQ_FINAL
Definition: xpub.cpp:177
ZMQ_TOPICS_COUNT
#define ZMQ_TOPICS_COUNT
Definition: zmq_draft.h:48
zmq::msg_t::init_size
int init_size(size_t size_)
Definition: msg.cpp:62
ok
ROSCPP_DECL bool ok()
zmq::xpub_t::xhas_in
bool xhas_in() ZMQ_OVERRIDE
Definition: xpub.cpp:374
zmq::xpub_t::_pending_data
std::deque< blob_t > _pending_data
Definition: xpub.hpp:106
errno_assert
#define errno_assert(x)
Definition: err.hpp:113
ZMQ_XPUB_WELCOME_MSG
#define ZMQ_XPUB_WELCOME_MSG
Definition: zmq.h:333
macros.hpp
zmq::msg_t::flags
unsigned char flags
Definition: msg.hpp:232
zmq::generic_mtrie_t< pipe_t >::rm_result
rm_result
Definition: generic_mtrie.hpp:22
LIBZMQ_UNUSED
#define LIBZMQ_UNUSED(object)
Definition: macros.hpp:6
zmq::msg_t::close
int close()
Definition: msg.cpp:242
zmq::blob_t::data
const unsigned char * data() const
Definition: blob.hpp:86
zmq::xpub_t::_dist
dist_t _dist
Definition: xpub.hpp:58
zmq::own_t::options
options_t options
Definition: own.hpp:78
zmq::xpub_t::xgetsockopt
int xgetsockopt(int option_, void *optval_, size_t *optvallen_) ZMQ_FINAL
Definition: xpub.cpp:231
zmq::xpub_t::_last_pipe
pipe_t * _last_pipe
Definition: xpub.hpp:96
zmq::xpub_t::_pending_metadata
std::deque< metadata_t * > _pending_metadata
Definition: xpub.hpp:107
zmq::generic_mtrie_t< pipe_t >::values_remain
@ values_remain
Definition: generic_mtrie.hpp:26
pipe.hpp
zmq::msg_t::set_metadata
void set_metadata(metadata_t *metadata_)
Definition: msg.cpp:448
zmq::xpub_t::send_unsubscription
static void send_unsubscription(zmq::mtrie_t::prefix_t data_, size_t size_, xpub_t *self_)
Definition: xpub.cpp:379
zmq::xpub_t::xattach_pipe
void xattach_pipe(zmq::pipe_t *pipe_, bool subscribe_to_all_=false, bool locally_initiated_=false) ZMQ_OVERRIDE
Definition: xpub.cpp:42
zmq::xpub_t::xread_activated
void xread_activated(zmq::pipe_t *pipe_) ZMQ_FINAL
Definition: xpub.cpp:72
zmq::msg_t::init
int init()
Definition: msg.cpp:50
zmq::blob_t
Definition: blob.hpp:46
msg.hpp
stub
static void stub(zmq::mtrie_t::prefix_t data_, size_t size_, void *arg_)
Definition: xpub.cpp:246
zmq::xpub_t::_pending_pipes
std::deque< pipe_t * > _pending_pipes
Definition: xpub.hpp:99
zmq::metadata_t
Definition: metadata.hpp:13
ZMQ_XPUB_VERBOSE
#define ZMQ_XPUB_VERBOSE
Definition: zmq.h:305
zmq::msg_t::command_body
void * command_body()
Definition: msg.cpp:541
zmq::msg_t::more
@ more
Definition: msg.hpp:55
ZMQ_ONLY_FIRST_SUBSCRIBE
#define ZMQ_ONLY_FIRST_SUBSCRIBE
Definition: zmq_draft.h:40
ZMQ_XPUB_VERBOSER
#define ZMQ_XPUB_VERBOSER
Definition: zmq.h:339
zmq::xpub_t::_welcome_msg
msg_t _welcome_msg
Definition: xpub.hpp:102
zmq::msg_t::command_body_size
size_t command_body_size() const
Definition: msg.cpp:526
zmq::generic_mtrie_t< pipe_t >::prefix_t
const typedef unsigned char * prefix_t
Definition: generic_mtrie.hpp:20
zmq::xpub_t::xwrite_activated
void xwrite_activated(zmq::pipe_t *pipe_) ZMQ_FINAL
Definition: xpub.cpp:172
zmq::dist_t::match
void match(zmq::pipe_t *pipe_)
Definition: dist.cpp:49
size
GLsizeiptr size
Definition: glcorearb.h:2943
ZMQ_UNSUBSCRIBE
#define ZMQ_UNSUBSCRIBE
Definition: zmq.h:279
xpub.hpp
zmq::msg_t::metadata
metadata_t * metadata
Definition: msg.hpp:227
err.hpp
zmq::xpub_t::mark_as_matching
static void mark_as_matching(zmq::pipe_t *pipe_, xpub_t *self_)
Definition: xpub.cpp:279
ZMQ_MOVE
#define ZMQ_MOVE(x)
Definition: blob.hpp:33
generic_mtrie_impl.hpp
data
GLint GLenum GLsizei GLsizei GLsizei GLint GLsizei const GLvoid * data
Definition: glcorearb.h:2879
true
#define true
Definition: cJSON.c:65
zmq::xpub_t::~xpub_t
~xpub_t() ZMQ_OVERRIDE
Definition: xpub.cpp:32
zmq::msg_t::size
unsigned char size
Definition: msg.hpp:240
zmq::msg_t::is_subscribe
bool is_subscribe() const
Definition: msg.hpp:113
zmq::msg_t::data
unsigned char data[max_vsm_size]
Definition: msg.hpp:239
zmq::msg_t::is_cancel
bool is_cancel() const
Definition: msg.hpp:118
zmq::options_t::type
int8_t type
Definition: options.hpp:80
false
#define false
Definition: cJSON.c:70
zmq::xpub_t::_manual
bool _manual
Definition: xpub.hpp:87
zmq::msg_t::set_flags
void set_flags(unsigned char flags_)
Definition: msg.cpp:433
it
MapIter it
Definition: php/ext/google/protobuf/map.c:205
zmq::xpub_t::xhas_out
bool xhas_out() ZMQ_FINAL
Definition: xpub.cpp:328
zmq::msg_t
Definition: msg.hpp:33
unlikely
#define unlikely(x)
Definition: likely.hpp:11
ZMQ_XPUB_NODROP
#define ZMQ_XPUB_NODROP
Definition: zmq.h:330
zmq::xpub_t
Definition: xpub.hpp:20


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:07:02