xpub.hpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #ifndef __ZMQ_XPUB_HPP_INCLUDED__
4 #define __ZMQ_XPUB_HPP_INCLUDED__
5 
6 #include <deque>
7 
8 #include "socket_base.hpp"
9 #include "session_base.hpp"
10 #include "mtrie.hpp"
11 #include "dist.hpp"
12 
13 namespace zmq
14 {
15 class ctx_t;
16 class msg_t;
17 class pipe_t;
18 class io_thread_t;
19 
20 class xpub_t : public socket_base_t
21 {
22  public:
23  xpub_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
25 
26  // Implementations of virtual functions from socket_base_t.
27  void xattach_pipe (zmq::pipe_t *pipe_,
28  bool subscribe_to_all_ = false,
29  bool locally_initiated_ = false) ZMQ_OVERRIDE;
30  int xsend (zmq::msg_t *msg_) ZMQ_FINAL;
31  bool xhas_out () ZMQ_FINAL;
32  int xrecv (zmq::msg_t *msg_) ZMQ_OVERRIDE;
33  bool xhas_in () ZMQ_OVERRIDE;
34  void xread_activated (zmq::pipe_t *pipe_) ZMQ_FINAL;
35  void xwrite_activated (zmq::pipe_t *pipe_) ZMQ_FINAL;
36  int
37  xsetsockopt (int option_, const void *optval_, size_t optvallen_) ZMQ_FINAL;
38  int xgetsockopt (int option_, void *optval_, size_t *optvallen_) ZMQ_FINAL;
39  void xpipe_terminated (zmq::pipe_t *pipe_) ZMQ_FINAL;
40 
41  private:
42  // Function to be applied to the trie to send all the subscriptions
43  // upstream.
45  size_t size_,
46  xpub_t *self_);
47 
48  // Function to be applied to each matching pipes.
49  static void mark_as_matching (zmq::pipe_t *pipe_, xpub_t *self_);
50 
51  // List of all subscriptions mapped to corresponding pipes.
53 
54  // List of manual subscriptions mapped to corresponding pipes.
56 
57  // Distributor of messages holding the list of outbound pipes.
59 
60  // If true, send all subscription messages upstream, not just
61  // unique ones
63 
64  // If true, send all unsubscription messages upstream, not just
65  // unique ones
67 
68  // True if we are in the middle of sending a multi-part message.
69  bool _more_send;
70 
71  // True if we are in the middle of receiving a multi-part message.
72  bool _more_recv;
73 
74  // If true, subscribe and cancel messages are processed for the rest
75  // of multipart message.
77 
78  // This option is enabled with ZMQ_ONLY_FIRST_SUBSCRIBE.
79  // If true, messages following subscribe/unsubscribe in a multipart
80  // message are treated as user data regardless of the first byte.
82 
83  // Drop messages if HWM reached, otherwise return with EAGAIN
84  bool _lossy;
85 
86  // Subscriptions will not bed added automatically, only after calling set option with ZMQ_SUBSCRIBE or ZMQ_UNSUBSCRIBE
87  bool _manual;
88 
89  // Send message to the last pipe, only used if xpub is on manual and after calling set option with ZMQ_SUBSCRIBE
91 
92  // Function to be applied to match the last pipe.
93  static void mark_last_pipe_as_matching (zmq::pipe_t *pipe_, xpub_t *self_);
94 
95  // Last pipe that sent subscription message, only used if xpub is on manual
96  pipe_t *_last_pipe;
97 
98  // Pipes that sent subscriptions messages that have not yet been processed, only used if xpub is on manual
99  std::deque<pipe_t *> _pending_pipes;
100 
101  // Welcome message to send to pipe when attached
103 
104  // List of pending (un)subscriptions, ie. those that were already
105  // applied to the trie, but not yet received by the user.
106  std::deque<blob_t> _pending_data;
107  std::deque<metadata_t *> _pending_metadata;
108  std::deque<unsigned char> _pending_flags;
109 
111 };
112 }
113 
114 #endif
zmq::xpub_t::xsend
int xsend(zmq::msg_t *msg_) ZMQ_FINAL
Definition: xpub.cpp:290
data_
StringPiece data_
Definition: bytestream_unittest.cc:60
zmq::xpub_t::xpipe_terminated
void xpipe_terminated(zmq::pipe_t *pipe_) ZMQ_FINAL
Definition: xpub.cpp:253
zmq::xpub_t::xrecv
int xrecv(zmq::msg_t *msg_) ZMQ_OVERRIDE
Definition: xpub.cpp:333
zmq::xpub_t::_more_send
bool _more_send
Definition: xpub.hpp:69
zmq::xpub_t::mark_last_pipe_as_matching
static void mark_last_pipe_as_matching(zmq::pipe_t *pipe_, xpub_t *self_)
Definition: xpub.cpp:284
zmq::xpub_t::_pending_flags
std::deque< unsigned char > _pending_flags
Definition: xpub.hpp:108
zmq::xpub_t::xpub_t
xpub_t(zmq::ctx_t *parent_, uint32_t tid_, int sid_)
Definition: xpub.cpp:13
dist.hpp
zmq::socket_base_t
Definition: socket_base.hpp:31
zmq::xpub_t::xsetsockopt
int xsetsockopt(int option_, const void *optval_, size_t optvallen_) ZMQ_FINAL
Definition: xpub.cpp:177
mtrie.hpp
ZMQ_FINAL
#define ZMQ_FINAL
Definition: macros.hpp:35
zmq::xpub_t::xhas_in
bool xhas_in() ZMQ_OVERRIDE
Definition: xpub.cpp:374
zmq::xpub_t::_pending_data
std::deque< blob_t > _pending_data
Definition: xpub.hpp:106
zmq
Definition: zmq.hpp:229
ZMQ_OVERRIDE
#define ZMQ_OVERRIDE
Definition: zmq.hpp:91
zmq::xpub_t::_only_first_subscribe
bool _only_first_subscribe
Definition: xpub.hpp:81
zmq::xpub_t::_verbose_unsubs
bool _verbose_unsubs
Definition: xpub.hpp:66
zmq::xpub_t::_dist
dist_t _dist
Definition: xpub.hpp:58
zmq::xpub_t::xgetsockopt
int xgetsockopt(int option_, void *optval_, size_t *optvallen_) ZMQ_FINAL
Definition: xpub.cpp:231
zmq::xpub_t::_last_pipe
pipe_t * _last_pipe
Definition: xpub.hpp:96
ZMQ_NON_COPYABLE_NOR_MOVABLE
#define ZMQ_NON_COPYABLE_NOR_MOVABLE(classname)
Definition: macros.hpp:58
zmq::xpub_t::_pending_metadata
std::deque< metadata_t * > _pending_metadata
Definition: xpub.hpp:107
zmq::xpub_t::_subscriptions
mtrie_t _subscriptions
Definition: xpub.hpp:52
zmq::xpub_t::_lossy
bool _lossy
Definition: xpub.hpp:84
zmq::xpub_t::send_unsubscription
static void send_unsubscription(zmq::mtrie_t::prefix_t data_, size_t size_, xpub_t *self_)
Definition: xpub.cpp:379
zmq::xpub_t::xattach_pipe
void xattach_pipe(zmq::pipe_t *pipe_, bool subscribe_to_all_=false, bool locally_initiated_=false) ZMQ_OVERRIDE
Definition: xpub.cpp:42
zmq::xpub_t::xread_activated
void xread_activated(zmq::pipe_t *pipe_) ZMQ_FINAL
Definition: xpub.cpp:72
zmq::xpub_t::_process_subscribe
bool _process_subscribe
Definition: xpub.hpp:76
zmq::xpub_t::_pending_pipes
std::deque< pipe_t * > _pending_pipes
Definition: xpub.hpp:99
zmq::xpub_t::_welcome_msg
msg_t _welcome_msg
Definition: xpub.hpp:102
zmq::xpub_t::_verbose_subs
bool _verbose_subs
Definition: xpub.hpp:62
zmq::generic_mtrie_t< pipe_t >::prefix_t
const typedef unsigned char * prefix_t
Definition: generic_mtrie.hpp:20
zmq::dist_t
Definition: dist.hpp:18
socket_base.hpp
zmq::xpub_t::xwrite_activated
void xwrite_activated(zmq::pipe_t *pipe_) ZMQ_FINAL
Definition: xpub.cpp:172
zmq::xpub_t::_send_last_pipe
bool _send_last_pipe
Definition: xpub.hpp:90
zmq::generic_mtrie_t< pipe_t >
zmq::xpub_t::mark_as_matching
static void mark_as_matching(zmq::pipe_t *pipe_, xpub_t *self_)
Definition: xpub.cpp:279
zmq::xpub_t::_manual_subscriptions
mtrie_t _manual_subscriptions
Definition: xpub.hpp:55
zmq::xpub_t::~xpub_t
~xpub_t() ZMQ_OVERRIDE
Definition: xpub.cpp:32
zmq::xpub_t::_more_recv
bool _more_recv
Definition: xpub.hpp:72
session_base.hpp
zmq::xpub_t::_manual
bool _manual
Definition: xpub.hpp:87
zmq::xpub_t::xhas_out
bool xhas_out() ZMQ_FINAL
Definition: xpub.cpp:328
zmq::msg_t
Definition: msg.hpp:33
zmq::xpub_t
Definition: xpub.hpp:20


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:07:02