stream_connecter_base.cpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #include "precompiled.hpp"
5 #include "session_base.hpp"
6 #include "address.hpp"
7 #include "random.hpp"
8 #include "zmtp_engine.hpp"
9 #include "raw_engine.hpp"
10 
11 #ifndef ZMQ_HAVE_WINDOWS
12 #include <unistd.h>
13 #else
14 #include <winsock2.h>
15 #endif
16 
17 #include <limits>
18 
20  zmq::io_thread_t *io_thread_,
21  zmq::session_base_t *session_,
22  const zmq::options_t &options_,
23  zmq::address_t *addr_,
24  bool delayed_start_) :
25  own_t (io_thread_, options_),
26  io_object_t (io_thread_),
27  _addr (addr_),
28  _s (retired_fd),
29  _handle (static_cast<handle_t> (NULL)),
30  _socket (session_->get_socket ()),
31  _delayed_start (delayed_start_),
32  _reconnect_timer_started (false),
33  _current_reconnect_ivl (-1),
34  _session (session_)
35 {
36  zmq_assert (_addr);
38  // TODO the return value is unused! what if it fails? if this is impossible
39  // or does not matter, change such that endpoint in initialized using an
40  // initializer, and make endpoint const
41 }
42 
44 {
45  zmq_assert (!_reconnect_timer_started);
46  zmq_assert (!_handle);
47  zmq_assert (_s == retired_fd);
48 }
49 
51 {
52  if (_delayed_start)
53  add_reconnect_timer ();
54  else
55  start_connecting ();
56 }
57 
59 {
60  if (_reconnect_timer_started) {
61  cancel_timer (reconnect_timer_id);
62  _reconnect_timer_started = false;
63  }
64 
65  if (_handle) {
66  rm_handle ();
67  }
68 
69  if (_s != retired_fd)
70  close ();
71 
72  own_t::process_term (linger_);
73 }
74 
76 {
77  if (options.reconnect_ivl > 0) {
78  const int interval = get_new_reconnect_ivl ();
79  add_timer (interval, reconnect_timer_id);
80  _socket->event_connect_retried (
81  make_unconnected_connect_endpoint_pair (_endpoint), interval);
82  _reconnect_timer_started = true;
83  }
84 }
85 
87 {
88  if (options.reconnect_ivl_max > 0) {
89  int candidate_interval = 0;
90  if (_current_reconnect_ivl == -1)
91  candidate_interval = options.reconnect_ivl;
92  else if (_current_reconnect_ivl > std::numeric_limits<int>::max () / 2)
93  candidate_interval = std::numeric_limits<int>::max ();
94  else
95  candidate_interval = _current_reconnect_ivl * 2;
96 
97  if (candidate_interval > options.reconnect_ivl_max)
98  _current_reconnect_ivl = options.reconnect_ivl_max;
99  else
100  _current_reconnect_ivl = candidate_interval;
101  return _current_reconnect_ivl;
102  } else {
103  if (_current_reconnect_ivl == -1)
104  _current_reconnect_ivl = options.reconnect_ivl;
105  // The new interval is the base interval + random value.
106  const int random_jitter = generate_random () % options.reconnect_ivl;
107  const int interval =
108  _current_reconnect_ivl
109  < std::numeric_limits<int>::max () - random_jitter
110  ? _current_reconnect_ivl + random_jitter
111  : std::numeric_limits<int>::max ();
112  return interval;
113  }
114 }
115 
117 {
118  rm_fd (_handle);
119  _handle = static_cast<handle_t> (NULL);
120 }
121 
123 {
124  // TODO before, this was an assertion for _s != retired_fd, but this does not match usage of close
125  if (_s != retired_fd) {
126 #ifdef ZMQ_HAVE_WINDOWS
127  const int rc = closesocket (_s);
128  wsa_assert (rc != SOCKET_ERROR);
129 #else
130  const int rc = ::close (_s);
131  errno_assert (rc == 0);
132 #endif
133  _socket->event_closed (
135  _s = retired_fd;
136  }
137 }
138 
140 {
141  // We are not polling for incoming data, so we are actually called
142  // because of error here. However, we can get error on out event as well
143  // on some platforms, so we'll simply handle both events in the same way.
144  out_event ();
145 }
146 
148  fd_t fd_, const std::string &local_address_)
149 {
150  const endpoint_uri_pair_t endpoint_pair (local_address_, _endpoint,
152 
153  // Create the engine object for this connection.
154  i_engine *engine;
155  if (options.raw_socket)
156  engine = new (std::nothrow) raw_engine_t (fd_, options, endpoint_pair);
157  else
158  engine = new (std::nothrow) zmtp_engine_t (fd_, options, endpoint_pair);
159  alloc_assert (engine);
160 
161  // Attach the engine to the corresponding session object.
162  send_attach (_session, engine);
163 
164  // Shut the connecter down.
165  terminate ();
166 
167  _socket->event_connected (endpoint_pair, fd_);
168 }
169 
171 {
172  zmq_assert (id_ == reconnect_timer_id);
173  _reconnect_timer_started = false;
174  start_connecting ();
175 }
zmq::session_base_t
Definition: session_base.hpp:21
closesocket
#define closesocket
Definition: unittest_poller.cpp:13
zmq::stream_connecter_base_t::rm_handle
void rm_handle()
Definition: stream_connecter_base.cpp:116
zmq::make_unconnected_connect_endpoint_pair
endpoint_uri_pair_t make_unconnected_connect_endpoint_pair(const std::string &endpoint_)
Definition: endpoint.cpp:7
zmq::io_object_t
Definition: io_object.hpp:20
zmq::io_object_t::handle_t
poller_t::handle_t handle_t
Definition: io_object.hpp:32
NULL
NULL
Definition: test_security_zap.cpp:405
zmq::options_t
Definition: options.hpp:34
zmq::retired_fd
@ retired_fd
Definition: fd.hpp:32
options
Message * options
Definition: src/google/protobuf/descriptor.cc:3119
zmtp_engine.hpp
precompiled.hpp
zmq_assert
#define zmq_assert(x)
Definition: err.hpp:102
zmq::stream_connecter_base_t::stream_connecter_base_t
stream_connecter_base_t(zmq::io_thread_t *io_thread_, zmq::session_base_t *session_, const options_t &options_, address_t *addr_, bool delayed_start_)
Definition: stream_connecter_base.cpp:19
random.hpp
zmq::own_t::process_term
void process_term(int linger_) ZMQ_OVERRIDE
Definition: own.cpp:128
string
GLsizei const GLchar *const * string
Definition: glcorearb.h:3083
raw_engine.hpp
zmq::stream_connecter_base_t::~stream_connecter_base_t
~stream_connecter_base_t() ZMQ_OVERRIDE
Definition: stream_connecter_base.cpp:43
zmq::fd_t
int fd_t
Definition: zmq.hpp:287
retired_fd
@ retired_fd
Definition: libzmq/tests/testutil.hpp:117
alloc_assert
#define alloc_assert(x)
Definition: err.hpp:146
errno_assert
#define errno_assert(x)
Definition: err.hpp:113
zmq::endpoint_uri_pair_t
Definition: endpoint.hpp:17
zmq::endpoint_type_connect
@ endpoint_type_connect
Definition: endpoint.hpp:14
zmq::stream_connecter_base_t::_endpoint
std::string _endpoint
Definition: stream_connecter_base.hpp:62
address.hpp
zmq::stream_connecter_base_t::in_event
void in_event() ZMQ_OVERRIDE
Definition: stream_connecter_base.cpp:139
zmq::stream_connecter_base_t::timer_event
void timer_event(int id_) ZMQ_OVERRIDE
Definition: stream_connecter_base.cpp:170
zmq::address_t::to_string
int to_string(std::string &addr_) const
Definition: address.cpp:65
zmq::stream_connecter_base_t::get_new_reconnect_ivl
int get_new_reconnect_ivl()
Definition: stream_connecter_base.cpp:86
zmq::address_t
Definition: address.hpp:64
zmq::stream_connecter_base_t::process_plug
void process_plug() ZMQ_FINAL
Definition: stream_connecter_base.cpp:50
zmq::own_t
Definition: own.hpp:21
zmq::generate_random
uint32_t generate_random()
Definition: random.cpp:30
zmq::stream_connecter_base_t::create_engine
virtual void create_engine(fd_t fd, const std::string &local_address_)
Definition: stream_connecter_base.cpp:147
zmq::stream_connecter_base_t::close
void close()
Definition: stream_connecter_base.cpp:122
zmq::stream_connecter_base_t::process_term
void process_term(int linger_) ZMQ_OVERRIDE
Definition: stream_connecter_base.cpp:58
zmq::stream_connecter_base_t::_addr
address_t *const _addr
Definition: stream_connecter_base.hpp:52
stream_connecter_base.hpp
zmq::i_engine
Definition: i_engine.hpp:15
session_base.hpp
false
#define false
Definition: cJSON.c:70
zmq::stream_connecter_base_t::add_reconnect_timer
void add_reconnect_timer()
Definition: stream_connecter_base.cpp:75
options_
DebugStringOptions options_
Definition: src/google/protobuf/descriptor.cc:2410


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:59