socks_connecter.cpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #include "precompiled.hpp"
4 #include <new>
5 #include <string>
6 
7 #include "macros.hpp"
8 #include "socks_connecter.hpp"
9 #include "random.hpp"
10 #include "err.hpp"
11 #include "ip.hpp"
12 #include "tcp.hpp"
13 #include "address.hpp"
14 #include "tcp_address.hpp"
15 #include "session_base.hpp"
16 #include "socks.hpp"
17 
18 #ifndef ZMQ_HAVE_WINDOWS
19 #include <unistd.h>
20 #include <sys/types.h>
21 #include <sys/socket.h>
22 #if defined ZMQ_HAVE_VXWORKS
23 #include <sockLib.h>
24 #endif
25 #endif
26 
27 zmq::socks_connecter_t::socks_connecter_t (class io_thread_t *io_thread_,
28  class session_base_t *session_,
29  const options_t &options_,
30  address_t *addr_,
31  address_t *proxy_addr_,
32  bool delayed_start_) :
33  stream_connecter_base_t (
34  io_thread_, session_, options_, addr_, delayed_start_),
35  _proxy_addr (proxy_addr_),
36  _auth_method (socks_no_auth_required),
37  _status (unplugged)
38 {
39  zmq_assert (_addr->protocol == protocol_name::tcp);
40  _proxy_addr->to_string (_endpoint);
41 }
42 
43 zmq::socks_connecter_t::~socks_connecter_t ()
44 {
45  LIBZMQ_DELETE (_proxy_addr);
46 }
47 
48 void zmq::socks_connecter_t::set_auth_method_none ()
49 {
50  _auth_method = socks_no_auth_required;
51  _auth_username.clear ();
52  _auth_password.clear ();
53 }
54 
55 void zmq::socks_connecter_t::set_auth_method_basic (
56  const std::string &username_, const std::string &password_)
57 {
58  _auth_method = socks_basic_auth;
59  _auth_username = username_;
60  _auth_password = password_;
61 }
62 
63 void zmq::socks_connecter_t::in_event ()
64 {
65  int expected_status = -1;
66  zmq_assert (_status != unplugged);
67 
68  if (_status == waiting_for_choice) {
69  int rc = _choice_decoder.input (_s);
70  if (rc == 0 || rc == -1)
71  error ();
72  else if (_choice_decoder.message_ready ()) {
73  const socks_choice_t choice = _choice_decoder.decode ();
74  rc = process_server_response (choice);
75  if (rc == -1)
76  error ();
77  else {
78  if (choice.method == socks_basic_auth)
79  expected_status = sending_basic_auth_request;
80  else
81  expected_status = sending_request;
82  }
83  }
84  } else if (_status == waiting_for_auth_response) {
85  int rc = _auth_response_decoder.input (_s);
86  if (rc == 0 || rc == -1)
87  error ();
88  else if (_auth_response_decoder.message_ready ()) {
89  const socks_auth_response_t auth_response =
90  _auth_response_decoder.decode ();
91  rc = process_server_response (auth_response);
92  if (rc == -1)
93  error ();
94  else {
95  expected_status = sending_request;
96  }
97  }
98  } else if (_status == waiting_for_response) {
99  int rc = _response_decoder.input (_s);
100  if (rc == 0 || rc == -1)
101  error ();
102  else if (_response_decoder.message_ready ()) {
103  const socks_response_t response = _response_decoder.decode ();
104  rc = process_server_response (response);
105  if (rc == -1)
106  error ();
107  else {
108  rm_handle ();
109  create_engine (
110  _s, get_socket_name<tcp_address_t> (_s, socket_end_local));
111  _s = -1;
112  _status = unplugged;
113  }
114  }
115  } else
116  error ();
117 
118  if (expected_status == sending_basic_auth_request) {
119  _basic_auth_request_encoder.encode (
120  socks_basic_auth_request_t (_auth_username, _auth_password));
121  reset_pollin (_handle);
122  set_pollout (_handle);
123  _status = sending_basic_auth_request;
124  } else if (expected_status == sending_request) {
125  std::string hostname;
126  uint16_t port = 0;
127  if (parse_address (_addr->address, hostname, port) == -1)
128  error ();
129  else {
130  _request_encoder.encode (socks_request_t (1, hostname, port));
131  reset_pollin (_handle);
132  set_pollout (_handle);
133  _status = sending_request;
134  }
135  }
136 }
137 
138 void zmq::socks_connecter_t::out_event ()
139 {
140  zmq_assert (
141  _status == waiting_for_proxy_connection || _status == sending_greeting
142  || _status == sending_basic_auth_request || _status == sending_request);
143 
144  if (_status == waiting_for_proxy_connection) {
145  const int rc = static_cast<int> (check_proxy_connection ());
146  if (rc == -1)
147  error ();
148  else {
149  _greeting_encoder.encode (socks_greeting_t (_auth_method));
150  _status = sending_greeting;
151  }
152  } else if (_status == sending_greeting) {
153  zmq_assert (_greeting_encoder.has_pending_data ());
154  const int rc = _greeting_encoder.output (_s);
155  if (rc == -1 || rc == 0)
156  error ();
157  else if (!_greeting_encoder.has_pending_data ()) {
158  reset_pollout (_handle);
159  set_pollin (_handle);
160  _status = waiting_for_choice;
161  }
162  } else if (_status == sending_basic_auth_request) {
163  zmq_assert (_basic_auth_request_encoder.has_pending_data ());
164  const int rc = _basic_auth_request_encoder.output (_s);
165  if (rc == -1 || rc == 0)
166  error ();
167  else if (!_basic_auth_request_encoder.has_pending_data ()) {
168  reset_pollout (_handle);
169  set_pollin (_handle);
170  _status = waiting_for_auth_response;
171  }
172  } else {
173  zmq_assert (_request_encoder.has_pending_data ());
174  const int rc = _request_encoder.output (_s);
175  if (rc == -1 || rc == 0)
176  error ();
177  else if (!_request_encoder.has_pending_data ()) {
178  reset_pollout (_handle);
179  set_pollin (_handle);
180  _status = waiting_for_response;
181  }
182  }
183 }
184 
185 void zmq::socks_connecter_t::start_connecting ()
186 {
187  zmq_assert (_status == unplugged);
188 
189  // Open the connecting socket.
190  const int rc = connect_to_proxy ();
191 
192  // Connect may succeed in synchronous manner.
193  if (rc == 0) {
194  _handle = add_fd (_s);
195  set_pollout (_handle);
196  _status = sending_greeting;
197  }
198  // Connection establishment may be delayed. Poll for its completion.
199  else if (errno == EINPROGRESS) {
200  _handle = add_fd (_s);
201  set_pollout (_handle);
202  _status = waiting_for_proxy_connection;
203  _socket->event_connect_delayed (
205  }
206  // Handle any other error condition by eventual reconnect.
207  else {
208  if (_s != retired_fd)
209  close ();
210  add_reconnect_timer ();
211  }
212 }
213 
214 int zmq::socks_connecter_t::process_server_response (
215  const socks_choice_t &response_)
216 {
217  return response_.method == socks_no_auth_required
218  || response_.method == socks_basic_auth
219  ? 0
220  : -1;
221 }
222 
223 int zmq::socks_connecter_t::process_server_response (
224  const socks_response_t &response_)
225 {
226  return response_.response_code == 0 ? 0 : -1;
227 }
228 
229 int zmq::socks_connecter_t::process_server_response (
230  const socks_auth_response_t &response_)
231 {
232  return response_.response_code == 0 ? 0 : -1;
233 }
234 
235 void zmq::socks_connecter_t::error ()
236 {
237  rm_fd (_handle);
238  close ();
239  _greeting_encoder.reset ();
240  _choice_decoder.reset ();
241  _basic_auth_request_encoder.reset ();
242  _auth_response_decoder.reset ();
243  _request_encoder.reset ();
244  _response_decoder.reset ();
245  _status = unplugged;
246  add_reconnect_timer ();
247 }
248 
249 int zmq::socks_connecter_t::connect_to_proxy ()
250 {
251  zmq_assert (_s == retired_fd);
252 
253  // Resolve the address
254  if (_proxy_addr->resolved.tcp_addr != NULL) {
255  LIBZMQ_DELETE (_proxy_addr->resolved.tcp_addr);
256  }
257 
258  _proxy_addr->resolved.tcp_addr = new (std::nothrow) tcp_address_t ();
259  alloc_assert (_proxy_addr->resolved.tcp_addr);
260  // Automatic fallback to ipv4 is disabled here since this was the existing
261  // behaviour, however I don't see a real reason for this. Maybe this can
262  // be changed to true (and then the parameter can be removed entirely).
263  _s = tcp_open_socket (_proxy_addr->address.c_str (), options, false, false,
264  _proxy_addr->resolved.tcp_addr);
265  if (_s == retired_fd) {
266  // TODO we should emit some event in this case!
267  LIBZMQ_DELETE (_proxy_addr->resolved.tcp_addr);
268  return -1;
269  }
270  zmq_assert (_proxy_addr->resolved.tcp_addr != NULL);
271 
272  // Set the socket to non-blocking mode so that we get async connect().
273  unblock_socket (_s);
274 
275  const tcp_address_t *const tcp_addr = _proxy_addr->resolved.tcp_addr;
276 
277  int rc;
278 
279  // Set a source address for conversations
280  if (tcp_addr->has_src_addr ()) {
281 #if defined ZMQ_HAVE_VXWORKS
282  rc = ::bind (_s, (sockaddr *) tcp_addr->src_addr (),
283  tcp_addr->src_addrlen ());
284 #else
285  rc = ::bind (_s, tcp_addr->src_addr (), tcp_addr->src_addrlen ());
286 #endif
287  if (rc == -1) {
288  close ();
289  return -1;
290  }
291  }
292 
293  // Connect to the remote peer.
294 #if defined ZMQ_HAVE_VXWORKS
295  rc = ::connect (_s, (sockaddr *) tcp_addr->addr (), tcp_addr->addrlen ());
296 #else
297  rc = ::connect (_s, tcp_addr->addr (), tcp_addr->addrlen ());
298 #endif
299  // Connect was successful immediately.
300  if (rc == 0)
301  return 0;
302 
303  // Translate error codes indicating asynchronous connect has been
304  // launched to a uniform EINPROGRESS.
305 #ifdef ZMQ_HAVE_WINDOWS
306  const int last_error = WSAGetLastError ();
307  if (last_error == WSAEINPROGRESS || last_error == WSAEWOULDBLOCK)
308  errno = EINPROGRESS;
309  else {
310  errno = wsa_error_to_errno (last_error);
311  close ();
312  }
313 #else
314  if (errno == EINTR)
315  errno = EINPROGRESS;
316 #endif
317  return -1;
318 }
319 
320 zmq::fd_t zmq::socks_connecter_t::check_proxy_connection () const
321 {
322  // Async connect has finished. Check whether an error occurred
323  int err = 0;
324 #if defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_VXWORKS
325  int len = sizeof err;
326 #else
327  socklen_t len = sizeof err;
328 #endif
329 
330  int rc = getsockopt (_s, SOL_SOCKET, SO_ERROR,
331  reinterpret_cast<char *> (&err), &len);
332 
333  // Assert if the error was caused by 0MQ bug.
334  // Networking problems are OK. No need to assert.
335 #ifdef ZMQ_HAVE_WINDOWS
336  zmq_assert (rc == 0);
337  if (err != 0) {
338  wsa_assert (err == WSAECONNREFUSED || err == WSAETIMEDOUT
339  || err == WSAECONNABORTED || err == WSAEHOSTUNREACH
340  || err == WSAENETUNREACH || err == WSAENETDOWN
341  || err == WSAEACCES || err == WSAEINVAL
342  || err == WSAEADDRINUSE);
343  return -1;
344  }
345 #else
346  // Following code should handle both Berkeley-derived socket
347  // implementations and Solaris.
348  if (rc == -1)
349  err = errno;
350  if (err != 0) {
351  errno = err;
353  || errno == ETIMEDOUT || errno == EHOSTUNREACH
354  || errno == ENETUNREACH || errno == ENETDOWN
355  || errno == EINVAL);
356  return -1;
357  }
358 #endif
359 
360  rc = tune_tcp_socket (_s);
361  rc = rc
363  _s, options.tcp_keepalive, options.tcp_keepalive_cnt,
364  options.tcp_keepalive_idle, options.tcp_keepalive_intvl);
365  if (rc != 0)
366  return -1;
367 
368  return 0;
369 }
370 
371 int zmq::socks_connecter_t::parse_address (const std::string &address_,
372  std::string &hostname_,
373  uint16_t &port_)
374 {
375  // Find the ':' at end that separates address from the port number.
376  const size_t idx = address_.rfind (':');
377  if (idx == std::string::npos) {
378  errno = EINVAL;
379  return -1;
380  }
381 
382  // Extract hostname
383  if (idx < 2 || address_[0] != '[' || address_[idx - 1] != ']')
384  hostname_ = address_.substr (0, idx);
385  else
386  hostname_ = address_.substr (1, idx - 2);
387 
388  // Separate the hostname/port.
389  const std::string port_str = address_.substr (idx + 1);
390  // Parse the port number (0 is not a valid port).
391  port_ = static_cast<uint16_t> (atoi (port_str.c_str ()));
392  if (port_ == 0) {
393  errno = EINVAL;
394  return -1;
395  }
396  return 0;
397 }
response
const std::string response
LIBZMQ_DELETE
#define LIBZMQ_DELETE(p_object)
Definition: macros.hpp:7
ip.hpp
zmq::socket_end_local
@ socket_end_local
Definition: address.hpp:112
zmq::make_unconnected_connect_endpoint_pair
endpoint_uri_pair_t make_unconnected_connect_endpoint_pair(const std::string &endpoint_)
Definition: endpoint.cpp:7
NULL
NULL
Definition: test_security_zap.cpp:405
EINTR
#define EINTR
Definition: errno.hpp:7
zmq::retired_fd
@ retired_fd
Definition: fd.hpp:32
options
Message * options
Definition: src/google/protobuf/descriptor.cc:3119
zmq_errno
ZMQ_EXPORT int zmq_errno(void)
Definition: zmq.cpp:101
EINVAL
#define EINVAL
Definition: errno.hpp:25
zmq::protocol_name::tcp
static const char tcp[]
Definition: address.hpp:38
precompiled.hpp
zmq_assert
#define zmq_assert(x)
Definition: err.hpp:102
random.hpp
socks_connecter.hpp
string
GLsizei const GLchar *const * string
Definition: glcorearb.h:3083
errno
int errno
ECONNREFUSED
#define ECONNREFUSED
Definition: zmq.h:122
error
Definition: cJSON.c:88
idx
static uint32_t idx(tarjan *t, const upb_refcounted *r)
Definition: ruby/ext/google/protobuf_c/upb.c:5925
zmq::fd_t
int fd_t
Definition: zmq.hpp:287
socks.hpp
alloc_assert
#define alloc_assert(x)
Definition: err.hpp:146
errno_assert
#define errno_assert(x)
Definition: err.hpp:113
macros.hpp
tcp_address.hpp
err
static UPB_NORETURN void err(tarjan *t)
Definition: ruby/ext/google/protobuf_c/upb.c:5856
ENETDOWN
#define ENETDOWN
Definition: zmq.h:113
EINPROGRESS
#define EINPROGRESS
Definition: zmq.h:125
address.hpp
zmq::unblock_socket
void unblock_socket(fd_t s_)
Definition: ip.cpp:107
zmq::tune_tcp_keepalives
int tune_tcp_keepalives(fd_t s_, int keepalive_, int keepalive_cnt_, int keepalive_idle_, int keepalive_intvl_)
Definition: tcp.cpp:71
len
int len
Definition: php/ext/google/protobuf/map.c:206
tcp.hpp
zmq::tune_tcp_socket
int tune_tcp_socket(fd_t s_)
Definition: tcp.cpp:30
zmq::tcp_open_socket
fd_t tcp_open_socket(const char *address_, const options_t &options_, bool local_, bool fallback_to_ipv4_, tcp_address_t *out_tcp_addr_)
Definition: tcp.cpp:332
ECONNRESET
#define ECONNRESET
Definition: zmq.h:143
err.hpp
EHOSTUNREACH
#define EHOSTUNREACH
Definition: zmq.h:152
ENETUNREACH
#define ENETUNREACH
Definition: zmq.h:137
ETIMEDOUT
#define ETIMEDOUT
Definition: zmq.h:149
session_base.hpp
options_
DebugStringOptions options_
Definition: src/google/protobuf/descriptor.cc:2410


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:58