stream_engine_base.hpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #ifndef __ZMQ_STREAM_ENGINE_BASE_HPP_INCLUDED__
4 #define __ZMQ_STREAM_ENGINE_BASE_HPP_INCLUDED__
5 
6 #include <stddef.h>
7 
8 #include "fd.hpp"
9 #include "i_engine.hpp"
10 #include "io_object.hpp"
11 #include "i_encoder.hpp"
12 #include "i_decoder.hpp"
13 #include "options.hpp"
14 #include "socket_base.hpp"
15 #include "metadata.hpp"
16 #include "msg.hpp"
17 #include "tcp.hpp"
18 
19 namespace zmq
20 {
21 class io_thread_t;
22 class session_base_t;
23 class mechanism_t;
24 
25 // This engine handles any socket with SOCK_STREAM semantics,
26 // e.g. TCP socket or an UNIX domain socket.
27 
29 {
30  public:
32  const options_t &options_,
33  const endpoint_uri_pair_t &endpoint_uri_pair_,
34  bool has_handshake_stage_);
36 
37  // i_engine interface implementation.
39  void plug (zmq::io_thread_t *io_thread_,
40  zmq::session_base_t *session_) ZMQ_FINAL;
41  void terminate () ZMQ_FINAL;
42  bool restart_input () ZMQ_FINAL;
43  void restart_output () ZMQ_FINAL;
46 
47  // i_poll_events interface implementation.
48  void in_event () ZMQ_FINAL;
49  void out_event () ZMQ_OVERRIDE;
50  void timer_event (int id_) ZMQ_FINAL;
51 
52  protected:
53  typedef metadata_t::dict_t properties_t;
54  bool init_properties (properties_t &properties_);
55 
56  // Function to handle network disconnections.
57  virtual void error (error_reason_t reason_);
58 
59  int next_handshake_command (msg_t *msg_);
60  int process_handshake_command (msg_t *msg_);
61 
62  int pull_msg_from_session (msg_t *msg_);
63  int push_msg_to_session (msg_t *msg_);
64 
65  int pull_and_encode (msg_t *msg_);
66  virtual int decode_and_push (msg_t *msg_);
68 
69  void set_handshake_timer ();
70 
71  virtual bool handshake () { return true; };
72  virtual void plug_internal (){};
73 
74  virtual int process_command_message (msg_t *msg_)
75  {
76  LIBZMQ_UNUSED (msg_);
77  return -1;
78  };
79  virtual int produce_ping_message (msg_t *msg_)
80  {
81  LIBZMQ_UNUSED (msg_);
82  return -1;
83  };
84  virtual int process_heartbeat_message (msg_t *msg_)
85  {
86  LIBZMQ_UNUSED (msg_);
87  return -1;
88  };
89  virtual int produce_pong_message (msg_t *msg_)
90  {
91  LIBZMQ_UNUSED (msg_);
92  return -1;
93  };
94 
95  virtual int read (void *data, size_t size_);
96  virtual int write (const void *data_, size_t size_);
97 
102  socket_base_t *socket () { return _socket; }
103 
105 
106  unsigned char *_inpos;
107  size_t _insize;
109 
110  unsigned char *_outpos;
111  size_t _outsize;
113 
115 
118 
119  // Metadata to be attached to received messages. May be NULL.
121 
122  // True iff the engine couldn't consume the last decoded message.
124 
125  // True iff the engine doesn't have any message to encode.
127 
128  // Representation of the connected endpoints.
130 
131  // ID of the handshake timer
132  enum
133  {
135  };
136 
137  // True is linger timer is running.
139 
140  // Heartbeat stuff
141  enum
142  {
146  };
150 
151 
153 
154  private:
155  bool in_event_internal ();
156 
157  // Unplug the engine from the session.
158  void unplug ();
159 
160  int write_credential (msg_t *msg_);
161 
162  void mechanism_ready ();
163 
164  // Underlying socket.
166 
168 
169  bool _plugged;
170 
171  // When true, we are still trying to determine whether
172  // the peer is using versioned protocol, and if so, which
173  // version. When false, normal message flow has started.
175 
177 
178  bool _io_error;
179 
180  // The session this engine is attached to.
182 
183  // Socket
185 
186  // Indicate if engine has an handshake stage, if it does, engine must call session.engine_ready
187  // when handshake is completed.
189 
191 };
192 }
193 
194 #endif
zmq::ZMQ_FINAL
Definition: channel.hpp:17
zmq::stream_engine_base_t::session
session_base_t * session()
Definition: stream_engine_base.hpp:101
zmq::stream_engine_base_t::_handle
handle_t _handle
Definition: stream_engine_base.hpp:167
zmq::stream_engine_base_t::set_pollin
void set_pollin()
Definition: stream_engine_base.hpp:100
zmq::stream_engine_base_t::push_msg_to_session
int push_msg_to_session(msg_t *msg_)
Definition: stream_engine_base.cpp:662
zmq::session_base_t
Definition: session_base.hpp:21
zmq::stream_engine_base_t::_plugged
bool _plugged
Definition: stream_engine_base.hpp:169
zmq::stream_engine_base_t::_has_ttl_timer
bool _has_ttl_timer
Definition: stream_engine_base.hpp:147
data_
StringPiece data_
Definition: bytestream_unittest.cc:60
benchmarks.python.py_benchmark.const
const
Definition: py_benchmark.py:14
zmq::stream_engine_base_t::_socket
zmq::socket_base_t * _socket
Definition: stream_engine_base.hpp:184
zmq::stream_engine_base_t::read
virtual int read(void *data, size_t size_)
Definition: stream_engine_base.cpp:756
zmq::io_object_t
Definition: io_object.hpp:20
zmq::io_object_t::handle_t
poller_t::handle_t handle_t
Definition: io_object.hpp:32
zmq::stream_engine_base_t::socket
socket_base_t * socket()
Definition: stream_engine_base.hpp:102
zmq::stream_engine_base_t::init_properties
bool init_properties(properties_t &properties_)
Definition: stream_engine_base.cpp:719
zmq::options_t
Definition: options.hpp:34
zmq::stream_engine_base_t::has_handshake_stage
bool has_handshake_stage() ZMQ_FINAL
Definition: stream_engine_base.hpp:38
zmq::stream_engine_base_t::_encoder
i_encoder * _encoder
Definition: stream_engine_base.hpp:112
zmq::stream_engine_base_t::in_event
void in_event() ZMQ_FINAL
Definition: stream_engine_base.cpp:213
zmq::stream_engine_base_t::_outsize
size_t _outsize
Definition: stream_engine_base.hpp:111
zmq::stream_engine_base_t::_metadata
metadata_t * _metadata
Definition: stream_engine_base.hpp:120
zmq::i_engine::error_reason_t
error_reason_t
Definition: i_engine.hpp:17
zmq::stream_engine_base_t::_peer_address
const std::string _peer_address
Definition: stream_engine_base.hpp:152
zmq::stream_engine_base_t::restart_output
void restart_output() ZMQ_FINAL
Definition: stream_engine_base.cpp:383
zmq::stream_engine_base_t::produce_ping_message
virtual int produce_ping_message(msg_t *msg_)
Definition: stream_engine_base.hpp:79
string
GLsizei const GLchar *const * string
Definition: glcorearb.h:3083
zmq::stream_engine_base_t::next_handshake_command
int next_handshake_command(msg_t *msg_)
Definition: stream_engine_base.cpp:453
zmq::stream_engine_base_t::reset_pollout
void reset_pollout()
Definition: stream_engine_base.hpp:98
zmq::stream_engine_base_t::in_event_internal
bool in_event_internal()
Definition: stream_engine_base.cpp:220
zmq::stream_engine_base_t::decode_and_push
virtual int decode_and_push(msg_t *msg_)
Definition: stream_engine_base.cpp:618
zmq::stream_engine_base_t::_has_handshake_stage
bool _has_handshake_stage
Definition: stream_engine_base.hpp:188
zmq::socket_base_t
Definition: socket_base.hpp:31
zmq::stream_engine_base_t::process_heartbeat_message
virtual int process_heartbeat_message(msg_t *msg_)
Definition: stream_engine_base.hpp:84
zmq::stream_engine_base_t::_input_stopped
bool _input_stopped
Definition: stream_engine_base.hpp:123
zmq::stream_engine_base_t::restart_input
bool restart_input() ZMQ_FINAL
Definition: stream_engine_base.cpp:400
zmq::i_encoder
Definition: i_encoder.hpp:16
error
Definition: cJSON.c:88
zmq::stream_engine_base_t::_tx_msg
msg_t _tx_msg
Definition: stream_engine_base.hpp:176
zmq::stream_engine_base_t::heartbeat_ttl_timer_id
@ heartbeat_ttl_timer_id
Definition: stream_engine_base.hpp:145
zmq::fd_t
int fd_t
Definition: zmq.hpp:287
zmq::stream_engine_base_t::_session
zmq::session_base_t * _session
Definition: stream_engine_base.hpp:181
ZMQ_FINAL
#define ZMQ_FINAL
Definition: macros.hpp:35
zmq::stream_engine_base_t::pull_msg_from_session
int pull_msg_from_session(msg_t *msg_)
Definition: stream_engine_base.cpp:657
zmq::stream_engine_base_t::_has_handshake_timer
bool _has_handshake_timer
Definition: stream_engine_base.hpp:138
zmq
Definition: zmq.hpp:229
zmq::mechanism_t
Definition: mechanism.hpp:19
zmq::stream_engine_base_t::_io_error
bool _io_error
Definition: stream_engine_base.hpp:178
ZMQ_OVERRIDE
#define ZMQ_OVERRIDE
Definition: zmq.hpp:91
zmq::stream_engine_base_t::set_handshake_timer
void set_handshake_timer()
Definition: stream_engine_base.cpp:709
zmq::stream_engine_base_t::heartbeat_ivl_timer_id
@ heartbeat_ivl_timer_id
Definition: stream_engine_base.hpp:143
zmq::endpoint_uri_pair_t
Definition: endpoint.hpp:17
zmq::stream_engine_base_t::mechanism_ready
void mechanism_ready()
Definition: stream_engine_base.cpp:512
zmq::stream_engine_base_t::_decoder
i_decoder * _decoder
Definition: stream_engine_base.hpp:108
LIBZMQ_UNUSED
#define LIBZMQ_UNUSED(object)
Definition: macros.hpp:6
zmq::stream_engine_base_t::_has_heartbeat_timer
bool _has_heartbeat_timer
Definition: stream_engine_base.hpp:149
zmq::stream_engine_base_t::out_event
void out_event() ZMQ_OVERRIDE
Definition: stream_engine_base.cpp:314
zmq::io_object_t::reset_pollout
void reset_pollout(handle_t handle_)
Definition: io_object.cpp:61
zmq::stream_engine_base_t::unplug
void unplug()
Definition: stream_engine_base.cpp:172
ZMQ_NON_COPYABLE_NOR_MOVABLE
#define ZMQ_NON_COPYABLE_NOR_MOVABLE(classname)
Definition: macros.hpp:58
zmq::stream_engine_base_t::zap_msg_available
void zap_msg_available() ZMQ_FINAL
Definition: stream_engine_base.cpp:491
zmq::stream_engine_base_t::pull_and_encode
int pull_and_encode(msg_t *msg_)
Definition: stream_engine_base.cpp:607
fd.hpp
zmq::stream_engine_base_t::_outpos
unsigned char * _outpos
Definition: stream_engine_base.hpp:110
zmq::stream_engine_base_t::_insize
size_t _insize
Definition: stream_engine_base.hpp:107
zmq::stream_engine_base_t::get_endpoint
const endpoint_uri_pair_t & get_endpoint() const ZMQ_FINAL
Definition: stream_engine_base.cpp:507
zmq::stream_engine_base_t::_process_msg
int(stream_engine_base_t::* _process_msg)(msg_t *msg_)
Definition: stream_engine_base.hpp:117
zmq::stream_engine_base_t::_next_msg
int(stream_engine_base_t::* _next_msg)(msg_t *msg_)
Definition: stream_engine_base.hpp:116
zmq::stream_engine_base_t::stream_engine_base_t
stream_engine_base_t(fd_t fd_, const options_t &options_, const endpoint_uri_pair_t &endpoint_uri_pair_, bool has_handshake_stage_)
Definition: stream_engine_base.cpp:75
zmq::stream_engine_base_t
Definition: stream_engine_base.hpp:28
zmq::stream_engine_base_t::set_pollout
void set_pollout()
Definition: stream_engine_base.hpp:99
zmq::stream_engine_base_t::_options
const options_t _options
Definition: stream_engine_base.hpp:104
msg.hpp
zmq::metadata_t
Definition: metadata.hpp:13
zmq::i_decoder
Definition: i_decoder.hpp:15
zmq::stream_engine_base_t::_mechanism
mechanism_t * _mechanism
Definition: stream_engine_base.hpp:114
zmq::stream_engine_base_t::_handshaking
bool _handshaking
Definition: stream_engine_base.hpp:174
metadata.hpp
options.hpp
socket_base.hpp
zmq::stream_engine_base_t::process_command_message
virtual int process_command_message(msg_t *msg_)
Definition: stream_engine_base.hpp:74
zmq::stream_engine_base_t::plug_internal
virtual void plug_internal()
Definition: stream_engine_base.hpp:72
tcp.hpp
io_object.hpp
zmq::io_object_t::set_pollout
void set_pollout(handle_t handle_)
Definition: io_object.cpp:56
zmq::stream_engine_base_t::handshake_timer_id
@ handshake_timer_id
Definition: stream_engine_base.hpp:134
zmq::stream_engine_base_t::_endpoint_uri_pair
const endpoint_uri_pair_t _endpoint_uri_pair
Definition: stream_engine_base.hpp:129
zmq::stream_engine_base_t::_has_timeout_timer
bool _has_timeout_timer
Definition: stream_engine_base.hpp:148
zmq::stream_engine_base_t::_s
fd_t _s
Definition: stream_engine_base.hpp:165
zmq::stream_engine_base_t::push_one_then_decode_and_push
int push_one_then_decode_and_push(msg_t *msg_)
Definition: stream_engine_base.cpp:649
zmq::stream_engine_base_t::~stream_engine_base_t
~stream_engine_base_t() ZMQ_OVERRIDE
Definition: stream_engine_base.cpp:115
data
GLint GLenum GLsizei GLsizei GLsizei GLint GLsizei const GLvoid * data
Definition: glcorearb.h:2879
zmq::stream_engine_base_t::heartbeat_timeout_timer_id
@ heartbeat_timeout_timer_id
Definition: stream_engine_base.hpp:144
zmq::stream_engine_base_t::terminate
void terminate() ZMQ_FINAL
Definition: stream_engine_base.cpp:207
zmq::stream_engine_base_t::write_credential
int write_credential(msg_t *msg_)
Definition: stream_engine_base.cpp:584
zmq::stream_engine_base_t::produce_pong_message
virtual int produce_pong_message(msg_t *msg_)
Definition: stream_engine_base.hpp:89
zmq::i_engine
Definition: i_engine.hpp:15
zmq::stream_engine_base_t::process_handshake_command
int process_handshake_command(msg_t *msg_)
Definition: stream_engine_base.cpp:473
zmq::stream_engine_base_t::timer_event
void timer_event(int id_) ZMQ_FINAL
Definition: stream_engine_base.cpp:735
i_engine.hpp
zmq::stream_engine_base_t::write
virtual int write(const void *data_, size_t size_)
Definition: stream_engine_base.cpp:769
zmq::stream_engine_base_t::properties_t
metadata_t::dict_t properties_t
Definition: stream_engine_base.hpp:53
i_decoder.hpp
zmq::stream_engine_base_t::_output_stopped
bool _output_stopped
Definition: stream_engine_base.hpp:126
zmq::stream_engine_base_t::handshake
virtual bool handshake()
Definition: stream_engine_base.hpp:71
zmq::io_object_t::set_pollin
void set_pollin(handle_t handle_)
Definition: io_object.cpp:46
zmq::msg_t
Definition: msg.hpp:33
zmq::stream_engine_base_t::plug
void plug(zmq::io_thread_t *io_thread_, zmq::session_base_t *session_) ZMQ_FINAL
Definition: stream_engine_base.cpp:152
i_encoder.hpp
zmq::stream_engine_base_t::_inpos
unsigned char * _inpos
Definition: stream_engine_base.hpp:106
options_
DebugStringOptions options_
Definition: src/google/protobuf/descriptor.cc:2410


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:59