session_base.hpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #ifndef __ZMQ_SESSION_BASE_HPP_INCLUDED__
4 #define __ZMQ_SESSION_BASE_HPP_INCLUDED__
5 
6 #include <stdarg.h>
7 
8 #include "own.hpp"
9 #include "io_object.hpp"
10 #include "pipe.hpp"
11 #include "socket_base.hpp"
12 #include "i_engine.hpp"
13 #include "msg.hpp"
14 
15 namespace zmq
16 {
17 class io_thread_t;
18 struct i_engine;
19 struct address_t;
20 
21 class session_base_t : public own_t, public io_object_t, public i_pipe_events
22 {
23  public:
24  // Create a session of the particular type.
25  static session_base_t *create (zmq::io_thread_t *io_thread_,
26  bool active_,
27  zmq::socket_base_t *socket_,
28  const options_t &options_,
29  address_t *addr_);
30 
31  // To be used once only, when creating the session.
32  void attach_pipe (zmq::pipe_t *pipe_);
33 
34  // Following functions are the interface exposed towards the engine.
35  virtual void reset ();
36  void flush ();
37  void rollback ();
38  void engine_error (bool handshaked_, zmq::i_engine::error_reason_t reason_);
39  void engine_ready ();
40 
41  // i_pipe_events interface implementation.
42  void read_activated (zmq::pipe_t *pipe_) ZMQ_FINAL;
43  void write_activated (zmq::pipe_t *pipe_) ZMQ_FINAL;
44  void hiccuped (zmq::pipe_t *pipe_) ZMQ_FINAL;
45  void pipe_terminated (zmq::pipe_t *pipe_) ZMQ_FINAL;
46 
47  // Delivers a message. Returns 0 if successful; -1 otherwise.
48  // The function takes ownership of the message.
49  virtual int push_msg (msg_t *msg_);
50 
51  int zap_connect ();
52  bool zap_enabled () const;
53 
54  // Fetches a message. Returns 0 if successful; -1 otherwise.
55  // The caller is responsible for freeing the message when no
56  // longer used.
57  virtual int pull_msg (msg_t *msg_);
58 
59  // Receives message from ZAP socket.
60  // Returns 0 on success; -1 otherwise.
61  // The caller is responsible for freeing the message.
62  int read_zap_msg (msg_t *msg_);
63 
64  // Sends message to ZAP socket.
65  // Returns 0 on success; -1 otherwise.
66  // The function takes ownership of the message.
67  int write_zap_msg (msg_t *msg_);
68 
69  socket_base_t *get_socket () const;
70  const endpoint_uri_pair_t &get_endpoint () const;
71 
72  protected:
73  session_base_t (zmq::io_thread_t *io_thread_,
74  bool active_,
75  zmq::socket_base_t *socket_,
76  const options_t &options_,
77  address_t *addr_);
79 
80  private:
81  void start_connecting (bool wait_);
82 
83  void reconnect ();
84 
85  // Handlers for incoming commands.
86  void process_plug () ZMQ_FINAL;
87  void process_attach (zmq::i_engine *engine_) ZMQ_FINAL;
88  void process_term (int linger_) ZMQ_FINAL;
90 
91  // i_poll_events handlers.
92  void timer_event (int id_) ZMQ_FINAL;
93 
94  // Remove any half processed messages. Flush unflushed messages.
95  // Call this function when engine disconnect to get rid of leftovers.
96  void clean_pipes ();
97 
98  // If true, this session (re)connects to the peer. Otherwise, it's
99  // a transient session created by the listener.
100  const bool _active;
101 
102  // Pipe connecting the session to its socket.
103  zmq::pipe_t *_pipe;
104 
105  // Pipe used to exchange messages with ZAP socket.
106  zmq::pipe_t *_zap_pipe;
107 
108  // This set is added to with pipes we are disconnecting, but haven't yet completed
109  std::set<pipe_t *> _terminating_pipes;
110 
111  // This flag is true if the remainder of the message being processed
112  // is still in the in pipe.
114 
115  // True if termination have been suspended to push the pending
116  // messages to the network.
117  bool _pending;
118 
119  // The protocol I/O engine connected to the session.
121 
122  // The socket the session belongs to.
124 
125  // I/O thread the session is living in. It will be used to plug in
126  // the engines into the same thread.
127  zmq::io_thread_t *_io_thread;
128 
129  // ID of the linger timer
130  enum
131  {
133  };
134 
135  // True is linger timer is running.
137 
138  // Protocol and address to use when connecting.
140 
141 #ifdef ZMQ_HAVE_WSS
142  // TLS handshake, we need to take a copy when the session is created,
143  // in order to maintain the value at the creation time
144  const std::string _wss_hostname;
145 #endif
146 
148 };
149 
150 class hello_msg_session_t ZMQ_FINAL : public session_base_t
151 {
152  public:
153  hello_msg_session_t (zmq::io_thread_t *io_thread_,
154  bool connect_,
155  zmq::socket_base_t *socket_,
156  const options_t &options_,
157  address_t *addr_);
158  ~hello_msg_session_t ();
159 
160  // Overrides of the functions from session_base_t.
161  int pull_msg (msg_t *msg_);
162  void reset ();
163 
164  private:
165  bool _new_pipe;
166 
167  ZMQ_NON_COPYABLE_NOR_MOVABLE (hello_msg_session_t)
168 };
169 }
170 
171 #endif
zmq::ZMQ_FINAL
Definition: channel.hpp:17
zmq::session_base_t
Definition: session_base.hpp:21
zmq::session_base_t::reset
virtual void reset()
Definition: session_base.cpp:202
zmq::session_base_t::push_msg
virtual int push_msg(msg_t *msg_)
Definition: session_base.cpp:156
zmq::io_object_t
Definition: io_object.hpp:20
zmq::options_t
Definition: options.hpp:34
zmq::session_base_t::_incomplete_in
bool _incomplete_in
Definition: session_base.hpp:113
zmq::session_base_t::process_attach
void process_attach(zmq::i_engine *engine_) ZMQ_FINAL
Definition: session_base.cpp:381
zmq::session_base_t::clean_pipes
void clean_pipes()
Definition: session_base.cpp:218
zmq::session_base_t::create
static session_base_t * create(zmq::io_thread_t *io_thread_, bool active_, zmq::socket_base_t *socket_, const options_t &options_, address_t *addr_)
Definition: session_base.cpp:27
zmq::session_base_t::start_connecting
void start_connecting(bool wait_)
Definition: session_base.cpp:584
zmq::i_engine::error_reason_t
error_reason_t
Definition: i_engine.hpp:17
zmq::session_base_t::get_endpoint
const endpoint_uri_pair_t & get_endpoint() const
Definition: session_base.cpp:112
zmq::ZMQ_FINAL::_new_pipe
bool _new_pipe
Definition: session_base.hpp:165
zmq::session_base_t::_terminating_pipes
std::set< pipe_t * > _terminating_pipes
Definition: session_base.hpp:109
zmq::session_base_t::_zap_pipe
zmq::pipe_t * _zap_pipe
Definition: session_base.hpp:106
zmq::session_base_t::attach_pipe
void attach_pipe(zmq::pipe_t *pipe_)
Definition: session_base.cpp:135
zmq::session_base_t::write_zap_msg
int write_zap_msg(msg_t *msg_)
Definition: session_base.cpp:187
string
GLsizei const GLchar *const * string
Definition: glcorearb.h:3083
zmq::socket_base_t
Definition: socket_base.hpp:31
zmq::session_base_t::_engine
zmq::i_engine * _engine
Definition: session_base.hpp:120
zmq::session_base_t::~session_base_t
~session_base_t() ZMQ_OVERRIDE
Definition: session_base.cpp:117
ZMQ_FINAL
#define ZMQ_FINAL
Definition: macros.hpp:35
zmq
Definition: zmq.hpp:229
zmq::session_base_t::process_plug
void process_plug() ZMQ_FINAL
Definition: session_base.cpp:321
ZMQ_OVERRIDE
#define ZMQ_OVERRIDE
Definition: zmq.hpp:91
zmq::session_base_t::engine_ready
void engine_ready()
Definition: session_base.cpp:394
zmq::session_base_t::get_socket
socket_base_t * get_socket() const
Definition: session_base.cpp:316
zmq::session_base_t::zap_enabled
bool zap_enabled() const
Definition: session_base.cpp:376
zmq::endpoint_uri_pair_t
Definition: endpoint.hpp:17
zmq::session_base_t::_pipe
zmq::pipe_t * _pipe
Definition: session_base.hpp:103
zmq::session_base_t::_socket
zmq::socket_base_t * _socket
Definition: session_base.hpp:123
zmq::session_base_t::flush
void flush()
Definition: session_base.cpp:206
ZMQ_NON_COPYABLE_NOR_MOVABLE
#define ZMQ_NON_COPYABLE_NOR_MOVABLE(classname)
Definition: macros.hpp:58
zmq::session_base_t::_active
const bool _active
Definition: session_base.hpp:100
pipe.hpp
zmq::session_base_t::linger_timer_id
@ linger_timer_id
Definition: session_base.hpp:132
zmq::i_pipe_events
Definition: pipe.hpp:33
zmq::session_base_t::read_activated
void read_activated(zmq::pipe_t *pipe_) ZMQ_FINAL
Definition: session_base.cpp:275
zmq::session_base_t::pull_msg
virtual int pull_msg(msg_t *msg_)
Definition: session_base.cpp:144
zmq::session_base_t::process_term
void process_term(int linger_) ZMQ_FINAL
Definition: session_base.cpp:483
msg.hpp
zmq::session_base_t::read_zap_msg
int read_zap_msg(msg_t *msg_)
Definition: session_base.cpp:172
zmq::session_base_t::write_activated
void write_activated(zmq::pipe_t *pipe_) ZMQ_FINAL
Definition: session_base.cpp:297
zmq::session_base_t::timer_event
void timer_event(int id_) ZMQ_FINAL
Definition: session_base.cpp:522
zmq::session_base_t::zap_connect
int zap_connect()
Definition: session_base.cpp:333
zmq::address_t
Definition: address.hpp:64
zmq::session_base_t::session_base_t
session_base_t(zmq::io_thread_t *io_thread_, bool active_, zmq::socket_base_t *socket_, const options_t &options_, address_t *addr_)
Definition: session_base.cpp:88
socket_base.hpp
zmq::session_base_t::rollback
void rollback()
Definition: session_base.cpp:212
io_object.hpp
zmq::session_base_t::process_conn_failed
void process_conn_failed() ZMQ_OVERRIDE
Definition: session_base.cpp:534
zmq::session_base_t::_addr
address_t * _addr
Definition: session_base.hpp:139
zmq::session_base_t::pipe_terminated
void pipe_terminated(zmq::pipe_t *pipe_) ZMQ_FINAL
Definition: session_base.cpp:239
zmq::own_t
Definition: own.hpp:21
zmq::session_base_t::_has_linger_timer
bool _has_linger_timer
Definition: session_base.hpp:136
zmq::session_base_t::_io_thread
zmq::io_thread_t * _io_thread
Definition: session_base.hpp:127
zmq::session_base_t::engine_error
void engine_error(bool handshaked_, zmq::i_engine::error_reason_t reason_)
Definition: session_base.cpp:426
zmq::session_base_t::reconnect
void reconnect()
Definition: session_base.cpp:541
zmq::i_engine
Definition: i_engine.hpp:15
i_engine.hpp
own.hpp
zmq::session_base_t::_pending
bool _pending
Definition: session_base.hpp:117
zmq::msg_t
Definition: msg.hpp:33
options_
DebugStringOptions options_
Definition: src/google/protobuf/descriptor.cc:2410
zmq::session_base_t::hiccuped
void hiccuped(zmq::pipe_t *pipe_) ZMQ_FINAL
Definition: session_base.cpp:309


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:58