xsub.hpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #ifndef __ZMQ_XSUB_HPP_INCLUDED__
4 #define __ZMQ_XSUB_HPP_INCLUDED__
5 
6 #include "socket_base.hpp"
7 #include "session_base.hpp"
8 #include "dist.hpp"
9 #include "fq.hpp"
10 #ifdef ZMQ_USE_RADIX_TREE
11 #include "radix_tree.hpp"
12 #else
13 #include "trie.hpp"
14 #endif
15 
16 namespace zmq
17 {
18 class ctx_t;
19 class pipe_t;
20 class io_thread_t;
21 
22 class xsub_t : public socket_base_t
23 {
24  public:
25  xsub_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
27 
28  protected:
29  // Overrides of functions from socket_base_t.
30  void xattach_pipe (zmq::pipe_t *pipe_,
31  bool subscribe_to_all_,
32  bool locally_initiated_) ZMQ_FINAL;
33  int xsetsockopt (int option_,
34  const void *optval_,
35  size_t optvallen_) ZMQ_OVERRIDE;
36  int xgetsockopt (int option_, void *optval_, size_t *optvallen_) ZMQ_FINAL;
37  int xsend (zmq::msg_t *msg_) ZMQ_OVERRIDE;
38  bool xhas_out () ZMQ_OVERRIDE;
39  int xrecv (zmq::msg_t *msg_) ZMQ_FINAL;
40  bool xhas_in () ZMQ_FINAL;
41  void xread_activated (zmq::pipe_t *pipe_) ZMQ_FINAL;
42  void xwrite_activated (zmq::pipe_t *pipe_) ZMQ_FINAL;
43  void xhiccuped (pipe_t *pipe_) ZMQ_FINAL;
44  void xpipe_terminated (zmq::pipe_t *pipe_) ZMQ_FINAL;
45 
46  private:
47  // Check whether the message matches at least one subscription.
48  bool match (zmq::msg_t *msg_);
49 
50  // Function to be applied to the trie to send all the subsciptions
51  // upstream.
52  static void
53  send_subscription (unsigned char *data_, size_t size_, void *arg_);
54 
55  // Fair queueing object for inbound pipes.
57 
58  // Object for distributing the subscriptions upstream.
60 
61  // The repository of subscriptions.
62 #ifdef ZMQ_USE_RADIX_TREE
64 #else
66 #endif
67 
68  // If true, send all unsubscription messages upstream, not just
69  // unique ones
71 
72  // If true, 'message' contains a matching message to return on the
73  // next recv call.
76 
77  // If true, part of a multipart message was already sent, but
78  // there are following parts still waiting.
79  bool _more_send;
80 
81  // If true, part of a multipart message was already received, but
82  // there are following parts still waiting.
83  bool _more_recv;
84 
85  // If true, subscribe and cancel messages are processed for the rest
86  // of multipart message.
88 
89  // This option is enabled with ZMQ_ONLY_FIRST_SUBSCRIBE.
90  // If true, messages following subscribe/unsubscribe in a multipart
91  // message are treated as user data regardless of the first byte.
93 
95 };
96 }
97 
98 #endif
zmq::xsub_t::_verbose_unsubs
bool _verbose_unsubs
Definition: xsub.hpp:70
data_
StringPiece data_
Definition: bytestream_unittest.cc:60
zmq::xsub_t::xhiccuped
void xhiccuped(pipe_t *pipe_) ZMQ_FINAL
Definition: xsub.cpp:67
zmq::xsub_t::xhas_out
bool xhas_out() ZMQ_OVERRIDE
Definition: xsub.cpp:169
zmq::xsub_t::_fq
fq_t _fq
Definition: xsub.hpp:56
dist.hpp
zmq::xsub_t::xwrite_activated
void xwrite_activated(zmq::pipe_t *pipe_) ZMQ_FINAL
Definition: xsub.cpp:56
zmq::xsub_t::_dist
dist_t _dist
Definition: xsub.hpp:59
zmq::socket_base_t
Definition: socket_base.hpp:31
zmq::xsub_t::_more_send
bool _more_send
Definition: xsub.hpp:79
zmq::fq_t
Definition: fq.hpp:18
radix_tree.hpp
ZMQ_FINAL
#define ZMQ_FINAL
Definition: macros.hpp:35
zmq::xsub_t::_subscriptions
trie_with_size_t _subscriptions
Definition: xsub.hpp:65
zmq
Definition: zmq.hpp:229
zmq::xsub_t::~xsub_t
~xsub_t() ZMQ_OVERRIDE
Definition: xsub.cpp:29
zmq::radix_tree_t
Definition: radix_tree.hpp:89
ZMQ_OVERRIDE
#define ZMQ_OVERRIDE
Definition: zmq.hpp:91
trie.hpp
zmq::xsub_t::xsetsockopt
int xsetsockopt(int option_, const void *optval_, size_t optvallen_) ZMQ_OVERRIDE
Definition: xsub.cpp:74
zmq::xsub_t::xrecv
int xrecv(zmq::msg_t *msg_) ZMQ_FINAL
Definition: xsub.cpp:175
ZMQ_NON_COPYABLE_NOR_MOVABLE
#define ZMQ_NON_COPYABLE_NOR_MOVABLE(classname)
Definition: macros.hpp:58
zmq::xsub_t::xhas_in
bool xhas_in() ZMQ_FINAL
Definition: xsub.cpp:215
zmq::xsub_t::xsend
int xsend(zmq::msg_t *msg_) ZMQ_OVERRIDE
Definition: xsub.cpp:118
zmq::xsub_t::xgetsockopt
int xgetsockopt(int option_, void *optval_, size_t *optvallen_) ZMQ_FINAL
Definition: xsub.cpp:97
zmq::xsub_t::_more_recv
bool _more_recv
Definition: xsub.hpp:83
zmq::trie_with_size_t
Definition: trie.hpp:60
zmq::xsub_t::send_subscription
static void send_subscription(unsigned char *data_, size_t size_, void *arg_)
Definition: xsub.cpp:262
fq.hpp
zmq::xsub_t
Definition: xsub.hpp:22
zmq::dist_t
Definition: dist.hpp:18
socket_base.hpp
zmq::xsub_t::_only_first_subscribe
bool _only_first_subscribe
Definition: xsub.hpp:92
zmq::xsub_t::xattach_pipe
void xattach_pipe(zmq::pipe_t *pipe_, bool subscribe_to_all_, bool locally_initiated_) ZMQ_FINAL
Definition: xsub.cpp:35
zmq::xsub_t::_has_message
bool _has_message
Definition: xsub.hpp:74
zmq::xsub_t::_process_subscribe
bool _process_subscribe
Definition: xsub.hpp:87
zmq::xsub_t::match
bool match(zmq::msg_t *msg_)
Definition: xsub.cpp:254
zmq::xsub_t::xsub_t
xsub_t(zmq::ctx_t *parent_, uint32_t tid_, int sid_)
Definition: xsub.cpp:10
zmq::xsub_t::xpipe_terminated
void xpipe_terminated(zmq::pipe_t *pipe_) ZMQ_FINAL
Definition: xsub.cpp:61
session_base.hpp
zmq::xsub_t::xread_activated
void xread_activated(zmq::pipe_t *pipe_) ZMQ_FINAL
Definition: xsub.cpp:51
zmq::xsub_t::_message
msg_t _message
Definition: xsub.hpp:75
zmq::msg_t
Definition: msg.hpp:33


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:07:02