pipe.hpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #ifndef __ZMQ_PIPE_HPP_INCLUDED__
4 #define __ZMQ_PIPE_HPP_INCLUDED__
5 
6 #include "ypipe_base.hpp"
7 #include "config.hpp"
8 #include "object.hpp"
9 #include "stdint.hpp"
10 #include "array.hpp"
11 #include "blob.hpp"
12 #include "options.hpp"
13 #include "endpoint.hpp"
14 #include "msg.hpp"
15 
16 namespace zmq
17 {
18 class pipe_t;
19 
20 // Create a pipepair for bi-directional transfer of messages.
21 // First HWM is for messages passed from first pipe to the second pipe.
22 // Second HWM is for messages passed from second pipe to the first pipe.
23 // Delay specifies how the pipe behaves when the peer terminates. If true
24 // pipe receives all the pending messages before terminating, otherwise it
25 // terminates straight away.
26 // If conflate is true, only the most recently arrived message could be
27 // read (older messages are discarded)
28 int pipepair (zmq::object_t *parents_[2],
29  zmq::pipe_t *pipes_[2],
30  const int hwms_[2],
31  const bool conflate_[2]);
32 
34 {
35  virtual ~i_pipe_events () ZMQ_DEFAULT;
36 
37  virtual void read_activated (zmq::pipe_t *pipe_) = 0;
38  virtual void write_activated (zmq::pipe_t *pipe_) = 0;
39  virtual void hiccuped (zmq::pipe_t *pipe_) = 0;
40  virtual void pipe_terminated (zmq::pipe_t *pipe_) = 0;
41 };
42 
43 // Note that pipe can be stored in three different arrays.
44 // The array of inbound pipes (1), the array of outbound pipes (2) and
45 // the generic array of pipes to be deallocated (3).
46 
47 class pipe_t ZMQ_FINAL : public object_t,
48  public array_item_t<1>,
49  public array_item_t<2>,
50  public array_item_t<3>
51 {
52  // This allows pipepair to create pipe objects.
53  friend int pipepair (zmq::object_t *parents_[2],
54  zmq::pipe_t *pipes_[2],
55  const int hwms_[2],
56  const bool conflate_[2]);
57 
58  public:
59  // Specifies the object to send events to.
60  void set_event_sink (i_pipe_events *sink_);
61 
62  // Pipe endpoint can store an routing ID to be used by its clients.
63  void set_server_socket_routing_id (uint32_t server_socket_routing_id_);
64  uint32_t get_server_socket_routing_id () const;
65 
66  // Pipe endpoint can store an opaque ID to be used by its clients.
67  void set_router_socket_routing_id (const blob_t &router_socket_routing_id_);
68  const blob_t &get_routing_id () const;
69 
70  // Returns true if there is at least one message to read in the pipe.
71  bool check_read ();
72 
73  // Reads a message to the underlying pipe.
74  bool read (msg_t *msg_);
75 
76  // Checks whether messages can be written to the pipe. If the pipe is
77  // closed or if writing the message would cause high watermark the
78  // function returns false.
79  bool check_write ();
80 
81  // Writes a message to the underlying pipe. Returns false if the
82  // message does not pass check_write. If false, the message object
83  // retains ownership of its message buffer.
84  bool write (const msg_t *msg_);
85 
86  // Remove unfinished parts of the outbound message from the pipe.
87  void rollback () const;
88 
89  // Flush the messages downstream.
90  void flush ();
91 
92  // Temporarily disconnects the inbound message stream and drops
93  // all the messages on the fly. Causes 'hiccuped' event to be generated
94  // in the peer.
95  void hiccup ();
96 
97  // Ensure the pipe won't block on receiving pipe_term.
98  void set_nodelay ();
99 
100  // Ask pipe to terminate. The termination will happen asynchronously
101  // and user will be notified about actual deallocation by 'terminated'
102  // event. If delay is true, the pending messages will be processed
103  // before actual shutdown.
104  void terminate (bool delay_);
105 
106  // Set the high water marks.
107  void set_hwms (int inhwm_, int outhwm_);
108 
109  // Set the boost to high water marks, used by inproc sockets so total hwm are sum of connect and bind sockets watermarks
110  void set_hwms_boost (int inhwmboost_, int outhwmboost_);
111 
112  // send command to peer for notify the change of hwm
113  void send_hwms_to_peer (int inhwm_, int outhwm_);
114 
115  // Returns true if HWM is not reached
116  bool check_hwm () const;
117 
118  void set_endpoint_pair (endpoint_uri_pair_t endpoint_pair_);
119  const endpoint_uri_pair_t &get_endpoint_pair () const;
120 
121  void send_stats_to_peer (own_t *socket_base_);
122 
123  void send_disconnect_msg ();
124  void set_disconnect_msg (const std::vector<unsigned char> &disconnect_);
125 
126  void send_hiccup_msg (const std::vector<unsigned char> &hiccup_);
127 
128  private:
129  // Type of the underlying lock-free pipe.
131 
132  // Command handlers.
133  void process_activate_read () ZMQ_OVERRIDE;
134  void process_activate_write (uint64_t msgs_read_) ZMQ_OVERRIDE;
135  void process_hiccup (void *pipe_) ZMQ_OVERRIDE;
136  void
137  process_pipe_peer_stats (uint64_t queue_count_,
138  own_t *socket_base_,
139  endpoint_uri_pair_t *endpoint_pair_) ZMQ_OVERRIDE;
140  void process_pipe_term () ZMQ_OVERRIDE;
141  void process_pipe_term_ack () ZMQ_OVERRIDE;
142  void process_pipe_hwm (int inhwm_, int outhwm_) ZMQ_OVERRIDE;
143 
144  // Handler for delimiter read from the pipe.
145  void process_delimiter ();
146 
147  // Constructor is private. Pipe can only be created using
148  // pipepair function.
149  pipe_t (object_t *parent_,
150  upipe_t *inpipe_,
151  upipe_t *outpipe_,
152  int inhwm_,
153  int outhwm_,
154  bool conflate_);
155 
156  // Pipepair uses this function to let us know about
157  // the peer pipe object.
158  void set_peer (pipe_t *peer_);
159 
160  // Destructor is private. Pipe objects destroy themselves.
161  ~pipe_t () ZMQ_OVERRIDE;
162 
163  // Underlying pipes for both directions.
164  upipe_t *_in_pipe;
165  upipe_t *_out_pipe;
166 
167  // Can the pipe be read from / written to?
168  bool _in_active;
169  bool _out_active;
170 
171  // High watermark for the outbound pipe.
172  int _hwm;
173 
174  // Low watermark for the inbound pipe.
175  int _lwm;
176 
177  // boosts for high and low watermarks, used with inproc sockets so hwm are sum of send and recv hmws on each side of pipe
178  int _in_hwm_boost;
179  int _out_hwm_boost;
180 
181  // Number of messages read and written so far.
182  uint64_t _msgs_read;
183  uint64_t _msgs_written;
184 
185  // Last received peer's msgs_read. The actual number in the peer
186  // can be higher at the moment.
187  uint64_t _peers_msgs_read;
188 
189  // The pipe object on the other side of the pipepair.
190  pipe_t *_peer;
191 
192  // Sink to send events to.
194 
195  // States of the pipe endpoint:
196  // active: common state before any termination begins,
197  // delimiter_received: delimiter was read from pipe before
198  // term command was received,
199  // waiting_for_delimiter: term command was already received
200  // from the peer but there are still pending messages to read,
201  // term_ack_sent: all pending messages were already read and
202  // all we are waiting for is ack from the peer,
203  // term_req_sent1: 'terminate' was explicitly called by the user,
204  // term_req_sent2: user called 'terminate' and then we've got
205  // term command from the peer as well.
206  enum
207  {
213  term_req_sent2
214  } _state;
215 
216  // If true, we receive all the pending inbound messages before
217  // terminating. If false, we terminate immediately when the peer
218  // asks us to.
219  bool _delay;
220 
221  // Routing id of the writer. Used uniquely by the reader side.
223 
224  // Routing id of the writer. Used uniquely by the reader side.
226 
227  // Returns true if the message is delimiter; false otherwise.
228  static bool is_delimiter (const msg_t &msg_);
229 
230  // Computes appropriate low watermark from the given high watermark.
231  static int compute_lwm (int hwm_);
232 
233  const bool _conflate;
234 
235  // The endpoints of this pipe.
237 
238  // Disconnect msg
240 
242 };
243 
244 void send_routing_id (pipe_t *pipe_, const options_t &options_);
245 
246 void send_hello_msg (pipe_t *pipe_, const options_t &options_);
247 }
248 
249 #endif
zmq::ZMQ_FINAL
Definition: channel.hpp:17
zmq::ZMQ_FINAL::term_req_sent1
@ term_req_sent1
Definition: pipe.hpp:212
zmq::ZMQ_FINAL::_router_socket_routing_id
blob_t _router_socket_routing_id
Definition: pipe.hpp:222
zmq::options_t
Definition: options.hpp:34
config.hpp
zmq::i_pipe_events::hiccuped
virtual void hiccuped(zmq::pipe_t *pipe_)=0
zmq::ZMQ_FINAL::waiting_for_delimiter
@ waiting_for_delimiter
Definition: pipe.hpp:210
zmq::ZMQ_FINAL::_server_socket_routing_id
int _server_socket_routing_id
Definition: pipe.hpp:225
ZMQ_DEFAULT
#define ZMQ_DEFAULT
Definition: macros.hpp:43
zmq::ypipe_base_t
Definition: ypipe_base.hpp:15
zmq::ZMQ_FINAL::upipe_t
ypipe_base_t< msg_t > upipe_t
Definition: pipe.hpp:130
zmq::ZMQ_FINAL::_endpoint_pair
endpoint_uri_pair_t _endpoint_pair
Definition: pipe.hpp:236
array.hpp
zmq::array_item_t
Definition: array.hpp:26
ypipe_base.hpp
zmq::ZMQ_FINAL::term_ack_sent
@ term_ack_sent
Definition: pipe.hpp:211
zmq
Definition: zmq.hpp:229
zmq::ZMQ_FINAL::_delay
bool _delay
Definition: pipe.hpp:219
ZMQ_OVERRIDE
#define ZMQ_OVERRIDE
Definition: zmq.hpp:91
zmq::ZMQ_FINAL::active
@ active
Definition: pipe.hpp:208
zmq::endpoint_uri_pair_t
Definition: endpoint.hpp:17
stdint.hpp
zmq::ZMQ_FINAL::delimiter_received
@ delimiter_received
Definition: pipe.hpp:209
ZMQ_NON_COPYABLE_NOR_MOVABLE
#define ZMQ_NON_COPYABLE_NOR_MOVABLE(classname)
Definition: macros.hpp:58
zmq::i_pipe_events::write_activated
virtual void write_activated(zmq::pipe_t *pipe_)=0
endpoint.hpp
zmq::send_hello_msg
void send_hello_msg(pipe_t *pipe_, const options_t &options_)
Definition: pipe.cpp:64
zmq::i_pipe_events
Definition: pipe.hpp:33
zmq::i_pipe_events::read_activated
virtual void read_activated(zmq::pipe_t *pipe_)=0
zmq::i_pipe_events::~i_pipe_events
virtual ~i_pipe_events() ZMQ_DEFAULT
zmq::i_pipe_events::pipe_terminated
virtual void pipe_terminated(zmq::pipe_t *pipe_)=0
zmq::object_t
Definition: object.hpp:28
zmq::blob_t
Definition: blob.hpp:46
msg.hpp
options.hpp
blob.hpp
zmq::own_t
Definition: own.hpp:21
zmq::send_routing_id
void send_routing_id(pipe_t *pipe_, const options_t &options_)
Definition: pipe.cpp:52
zmq::pipepair
int pipepair(zmq::object_t *parents_[2], zmq::pipe_t *pipes_[2], const int hwms_[2], const bool conflate_[2])
object.hpp
zmq::ZMQ_FINAL::_conflate
const bool _conflate
Definition: pipe.hpp:233
zmq::ZMQ_FINAL::_disconnect_msg
msg_t _disconnect_msg
Definition: pipe.hpp:239
zmq::msg_t
Definition: msg.hpp:33
options_
DebugStringOptions options_
Definition: src/google/protobuf/descriptor.cc:2410


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:57