router.hpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #ifndef __ZMQ_ROUTER_HPP_INCLUDED__
4 #define __ZMQ_ROUTER_HPP_INCLUDED__
5 
6 #include <map>
7 
8 #include "socket_base.hpp"
9 #include "session_base.hpp"
10 #include "stdint.hpp"
11 #include "blob.hpp"
12 #include "msg.hpp"
13 #include "fq.hpp"
14 
15 namespace zmq
16 {
17 class ctx_t;
18 class pipe_t;
19 
20 // TODO: This class uses O(n) scheduling. Rewrite it to use O(1) algorithm.
22 {
23  public:
24  router_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
26 
27  // Overrides of functions from socket_base_t.
28  void xattach_pipe (zmq::pipe_t *pipe_,
29  bool subscribe_to_all_,
30  bool locally_initiated_) ZMQ_FINAL;
31  int
32  xsetsockopt (int option_, const void *optval_, size_t optvallen_) ZMQ_FINAL;
33  int xsend (zmq::msg_t *msg_) ZMQ_OVERRIDE;
34  int xrecv (zmq::msg_t *msg_) ZMQ_OVERRIDE;
35  bool xhas_in () ZMQ_OVERRIDE;
36  bool xhas_out () ZMQ_OVERRIDE;
37  void xread_activated (zmq::pipe_t *pipe_) ZMQ_FINAL;
38  void xpipe_terminated (zmq::pipe_t *pipe_) ZMQ_FINAL;
39  int get_peer_state (const void *routing_id_,
40  size_t routing_id_size_) const ZMQ_FINAL;
41 
42  protected:
43  // Rollback any message parts that were sent but not yet flushed.
44  int rollback ();
45 
46  private:
47  // Receive peer id and update lookup map
48  bool identify_peer (pipe_t *pipe_, bool locally_initiated_);
49 
50  // Fair queueing object for inbound pipes.
52 
53  // True iff there is a message held in the pre-fetch buffer.
55 
56  // If true, the receiver got the message part with
57  // the peer's identity.
59 
60  // Holds the prefetched identity.
62 
63  // Holds the prefetched message.
65 
66  // The pipe we are currently reading from
67  zmq::pipe_t *_current_in;
68 
69  // Should current_in should be terminate after all parts received?
71 
72  // If true, more incoming message parts are expected.
73  bool _more_in;
74 
75  // We keep a set of pipes that have not been identified yet.
76  std::set<pipe_t *> _anonymous_pipes;
77 
78  // The pipe we are currently writing to.
79  zmq::pipe_t *_current_out;
80 
81  // If true, more outgoing message parts are expected.
82  bool _more_out;
83 
84  // Routing IDs are generated. It's a simple increment and wrap-over
85  // algorithm. This value is the next ID to use (if not used already).
87 
88  // If true, report EAGAIN to the caller instead of silently dropping
89  // the message targeting an unknown peer.
90  bool _mandatory;
92 
93  // if true, send an empty message to every connected router peer
95 
96  // If true, the router will reassign an identity upon encountering a
97  // name collision. The new pipe will take the identity, the old pipe
98  // will be terminated.
99  bool _handover;
100 
102 };
103 }
104 
105 #endif
zmq::router_t::get_peer_state
int get_peer_state(const void *routing_id_, size_t routing_id_size_) const ZMQ_FINAL
Definition: router.cpp:404
zmq::router_t::xsend
int xsend(zmq::msg_t *msg_) ZMQ_OVERRIDE
Definition: router.cpp:161
zmq::router_t::_terminate_current_in
bool _terminate_current_in
Definition: router.hpp:70
zmq::router_t::~router_t
~router_t() ZMQ_OVERRIDE
Definition: router.cpp:38
zmq::router_t::_routing_id_sent
bool _routing_id_sent
Definition: router.hpp:58
zmq::router_t::rollback
int rollback()
Definition: router.cpp:334
zmq::router_t::router_t
router_t(zmq::ctx_t *parent_, uint32_t tid_, int sid_)
Definition: router.cpp:12
zmq::router_t::_current_out
zmq::pipe_t * _current_out
Definition: router.hpp:79
zmq::router_t::_prefetched_msg
msg_t _prefetched_msg
Definition: router.hpp:64
zmq::router_t::xhas_in
bool xhas_in() ZMQ_OVERRIDE
Definition: router.cpp:344
zmq::fq_t
Definition: fq.hpp:18
zmq::router_t::_mandatory
bool _mandatory
Definition: router.hpp:90
zmq::router_t::xattach_pipe
void xattach_pipe(zmq::pipe_t *pipe_, bool subscribe_to_all_, bool locally_initiated_) ZMQ_FINAL
Definition: router.cpp:45
ZMQ_FINAL
#define ZMQ_FINAL
Definition: macros.hpp:35
zmq
Definition: zmq.hpp:229
zmq::router_t::_more_in
bool _more_in
Definition: router.hpp:73
ZMQ_OVERRIDE
#define ZMQ_OVERRIDE
Definition: zmq.hpp:91
zmq::router_t
Definition: router.hpp:21
zmq::router_t::_fq
fq_t _fq
Definition: router.hpp:51
stdint.hpp
zmq::router_t::_next_integral_routing_id
uint32_t _next_integral_routing_id
Definition: router.hpp:86
zmq::router_t::_probe_router
bool _probe_router
Definition: router.hpp:94
zmq::router_t::xsetsockopt
int xsetsockopt(int option_, const void *optval_, size_t optvallen_) ZMQ_FINAL
Definition: router.cpp:75
ZMQ_NON_COPYABLE_NOR_MOVABLE
#define ZMQ_NON_COPYABLE_NOR_MOVABLE(classname)
Definition: macros.hpp:58
zmq::router_t::_handover
bool _handover
Definition: router.hpp:99
zmq::router_t::_anonymous_pipes
std::set< pipe_t * > _anonymous_pipes
Definition: router.hpp:76
zmq::routing_socket_base_t
Definition: socket_base.hpp:333
zmq::router_t::_raw_socket
bool _raw_socket
Definition: router.hpp:91
zmq::router_t::xhas_out
bool xhas_out() ZMQ_OVERRIDE
Definition: router.cpp:392
zmq::router_t::identify_peer
bool identify_peer(pipe_t *pipe_, bool locally_initiated_)
Definition: router.cpp:427
zmq::router_t::_current_in
zmq::pipe_t * _current_in
Definition: router.hpp:67
fq.hpp
msg.hpp
zmq::router_t::xrecv
int xrecv(zmq::msg_t *msg_) ZMQ_OVERRIDE
Definition: router.cpp:263
zmq::router_t::_prefetched_id
msg_t _prefetched_id
Definition: router.hpp:61
socket_base.hpp
blob.hpp
zmq::router_t::_prefetched
bool _prefetched
Definition: router.hpp:54
zmq::router_t::_more_out
bool _more_out
Definition: router.hpp:82
session_base.hpp
zmq::router_t::xread_activated
void xread_activated(zmq::pipe_t *pipe_) ZMQ_FINAL
Definition: router.cpp:147
zmq::msg_t
Definition: msg.hpp:33
zmq::router_t::xpipe_terminated
void xpipe_terminated(zmq::pipe_t *pipe_) ZMQ_FINAL
Definition: router.cpp:136


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:58