udp_engine.cpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #include "precompiled.hpp"
4 
5 #if !defined ZMQ_HAVE_WINDOWS
6 #include <sys/types.h>
7 #include <unistd.h>
8 #include <sys/socket.h>
9 #include <netinet/in.h>
10 #include <arpa/inet.h>
11 #ifdef ZMQ_HAVE_VXWORKS
12 #include <sockLib.h>
13 #endif
14 #endif
15 
16 #include "udp_address.hpp"
17 #include "udp_engine.hpp"
18 #include "session_base.hpp"
19 #include "err.hpp"
20 #include "ip.hpp"
21 
22 // OSX uses a different name for this socket option
23 #ifndef IPV6_ADD_MEMBERSHIP
24 #define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP
25 #endif
26 
27 #ifdef __APPLE__
28 #include <TargetConditionals.h>
29 #endif
30 
31 zmq::udp_engine_t::udp_engine_t (const options_t &options_) :
32  _plugged (false),
33  _fd (-1),
34  _session (NULL),
35  _handle (static_cast<handle_t> (NULL)),
36  _address (NULL),
37  _options (options_),
38  _send_enabled (false),
39  _recv_enabled (false)
40 {
41 }
42 
43 zmq::udp_engine_t::~udp_engine_t ()
44 {
45  zmq_assert (!_plugged);
46 
47  if (_fd != retired_fd) {
48 #ifdef ZMQ_HAVE_WINDOWS
49  const int rc = closesocket (_fd);
50  wsa_assert (rc != SOCKET_ERROR);
51 #else
52  int rc = close (_fd);
53  errno_assert (rc == 0);
54 #endif
55  _fd = retired_fd;
56  }
57 }
58 
59 int zmq::udp_engine_t::init (address_t *address_, bool send_, bool recv_)
60 {
61  zmq_assert (address_);
62  zmq_assert (send_ || recv_);
63  _send_enabled = send_;
64  _recv_enabled = recv_;
65  _address = address_;
66 
67  _fd = open_socket (_address->resolved.udp_addr->family (), SOCK_DGRAM,
68  IPPROTO_UDP);
69  if (_fd == retired_fd)
70  return -1;
71 
72  unblock_socket (_fd);
73 
74  return 0;
75 }
76 
77 void zmq::udp_engine_t::plug (io_thread_t *io_thread_, session_base_t *session_)
78 {
79  zmq_assert (!_plugged);
80  _plugged = true;
81 
82  zmq_assert (!_session);
83  zmq_assert (session_);
84  _session = session_;
85 
86  // Connect to I/O threads poller object.
87  io_object_t::plug (io_thread_);
88  _handle = add_fd (_fd);
89 
90  const udp_address_t *const udp_addr = _address->resolved.udp_addr;
91 
92  int rc = 0;
93 
94  // Bind the socket to a device if applicable
95  if (!_options.bound_device.empty ()) {
96  rc = rc | bind_to_device (_fd, _options.bound_device);
97  if (rc != 0) {
99  error (connection_error);
100  return;
101  }
102  }
103 
104  if (_send_enabled) {
105  if (!_options.raw_socket) {
106  const ip_addr_t *out = udp_addr->target_addr ();
107  _out_address = out->as_sockaddr ();
108  _out_address_len = out->sockaddr_len ();
109 
110  if (out->is_multicast ()) {
111  const bool is_ipv6 = (out->family () == AF_INET6);
112  rc = rc
113  | set_udp_multicast_loop (_fd, is_ipv6,
114  _options.multicast_loop);
115 
116  if (_options.multicast_hops > 0) {
117  rc = rc
118  | set_udp_multicast_ttl (_fd, is_ipv6,
119  _options.multicast_hops);
120  }
121 
122  rc = rc | set_udp_multicast_iface (_fd, is_ipv6, udp_addr);
123  }
124  } else {
126  _out_address = reinterpret_cast<sockaddr *> (&_raw_address);
127  _out_address_len =
128  static_cast<zmq_socklen_t> (sizeof (sockaddr_in));
129  }
130  }
131 
132  if (_recv_enabled) {
133  rc = rc | set_udp_reuse_address (_fd, true);
134 
135  const ip_addr_t *bind_addr = udp_addr->bind_addr ();
136  ip_addr_t any = ip_addr_t::any (bind_addr->family ());
137  const ip_addr_t *real_bind_addr;
138 
139  const bool multicast = udp_addr->is_mcast ();
140 
141  if (multicast) {
142  // Multicast addresses should be allowed to bind to more than
143  // one port as all ports should receive the message
144  rc = rc | set_udp_reuse_port (_fd, true);
145 
146  // In multicast we should bind ANY and use the mreq struct to
147  // specify the interface
148  any.set_port (bind_addr->port ());
149 
150  real_bind_addr = &any;
151  } else {
152  real_bind_addr = bind_addr;
153  }
154 
155  if (rc != 0) {
156  error (protocol_error);
157  return;
158  }
159 
160 #ifdef ZMQ_HAVE_VXWORKS
161  rc = rc
162  | bind (_fd, (sockaddr *) real_bind_addr->as_sockaddr (),
163  real_bind_addr->sockaddr_len ());
164 #else
165  rc = rc
166  | bind (_fd, real_bind_addr->as_sockaddr (),
167  real_bind_addr->sockaddr_len ());
168 #endif
169  if (rc != 0) {
171  error (protocol_error);
172  return;
173  }
174 
175  if (multicast) {
176  rc = rc | add_membership (_fd, udp_addr);
177  }
178  }
179 
180  if (rc != 0) {
181  error (protocol_error);
182  } else {
183  if (_send_enabled) {
184  set_pollout (_handle);
185  }
186 
187  if (_recv_enabled) {
188  set_pollin (_handle);
189 
190  // Call restart output to drop all join/leave commands
191  restart_output ();
192  }
193  }
194 }
195 
196 int zmq::udp_engine_t::set_udp_multicast_loop (fd_t s_,
197  bool is_ipv6_,
198  bool loop_)
199 {
200  int level;
201  int optname;
202 
203  if (is_ipv6_) {
204  level = IPPROTO_IPV6;
205  optname = IPV6_MULTICAST_LOOP;
206  } else {
207  level = IPPROTO_IP;
208  optname = IP_MULTICAST_LOOP;
209  }
210 
211  int loop = loop_ ? 1 : 0;
212  const int rc = setsockopt (s_, level, optname,
213  reinterpret_cast<char *> (&loop), sizeof (loop));
215  return rc;
216 }
217 
218 int zmq::udp_engine_t::set_udp_multicast_ttl (fd_t s_, bool is_ipv6_, int hops_)
219 {
220  int level;
221 
222  if (is_ipv6_) {
223  level = IPPROTO_IPV6;
224  } else {
225  level = IPPROTO_IP;
226  }
227 
228  const int rc =
229  setsockopt (s_, level, IP_MULTICAST_TTL,
230  reinterpret_cast<char *> (&hops_), sizeof (hops_));
232  return rc;
233 }
234 
235 int zmq::udp_engine_t::set_udp_multicast_iface (fd_t s_,
236  bool is_ipv6_,
237  const udp_address_t *addr_)
238 {
239  int rc = 0;
240 
241  if (is_ipv6_) {
242  int bind_if = addr_->bind_if ();
243 
244  if (bind_if > 0) {
245  // If a bind interface is provided we tell the
246  // kernel to use it to send multicast packets
247  rc = setsockopt (s_, IPPROTO_IPV6, IPV6_MULTICAST_IF,
248  reinterpret_cast<char *> (&bind_if),
249  sizeof (bind_if));
250  }
251  } else {
252  struct in_addr bind_addr = addr_->bind_addr ()->ipv4.sin_addr;
253 
254  if (bind_addr.s_addr != INADDR_ANY) {
255  rc = setsockopt (s_, IPPROTO_IP, IP_MULTICAST_IF,
256  reinterpret_cast<char *> (&bind_addr),
257  sizeof (bind_addr));
258  }
259  }
260 
262  return rc;
263 }
264 
265 int zmq::udp_engine_t::set_udp_reuse_address (fd_t s_, bool on_)
266 {
267  int on = on_ ? 1 : 0;
268  const int rc = setsockopt (s_, SOL_SOCKET, SO_REUSEADDR,
269  reinterpret_cast<char *> (&on), sizeof (on));
271  return rc;
272 }
273 
274 int zmq::udp_engine_t::set_udp_reuse_port (fd_t s_, bool on_)
275 {
276 #ifndef SO_REUSEPORT
277  return 0;
278 #else
279  int on = on_ ? 1 : 0;
280  int rc = setsockopt (s_, SOL_SOCKET, SO_REUSEPORT,
281  reinterpret_cast<char *> (&on), sizeof (on));
283  return rc;
284 #endif
285 }
286 
287 int zmq::udp_engine_t::add_membership (fd_t s_, const udp_address_t *addr_)
288 {
289  const ip_addr_t *mcast_addr = addr_->target_addr ();
290  int rc = 0;
291 
292  if (mcast_addr->family () == AF_INET) {
293  struct ip_mreq mreq;
294  mreq.imr_multiaddr = mcast_addr->ipv4.sin_addr;
295  mreq.imr_interface = addr_->bind_addr ()->ipv4.sin_addr;
296 
297  rc = setsockopt (s_, IPPROTO_IP, IP_ADD_MEMBERSHIP,
298  reinterpret_cast<char *> (&mreq), sizeof (mreq));
299 
300  } else if (mcast_addr->family () == AF_INET6) {
301  struct ipv6_mreq mreq;
302  const int iface = addr_->bind_if ();
303 
304  zmq_assert (iface >= -1);
305 
306  mreq.ipv6mr_multiaddr = mcast_addr->ipv6.sin6_addr;
307  mreq.ipv6mr_interface = iface;
308 
309  rc = setsockopt (s_, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP,
310  reinterpret_cast<char *> (&mreq), sizeof (mreq));
311  }
312 
314  return rc;
315 }
316 
317 void zmq::udp_engine_t::error (error_reason_t reason_)
318 {
319  zmq_assert (_session);
320  _session->engine_error (false, reason_);
321  terminate ();
322 }
323 
324 void zmq::udp_engine_t::terminate ()
325 {
326  zmq_assert (_plugged);
327  _plugged = false;
328 
329  rm_fd (_handle);
330 
331  // Disconnect from I/O threads poller object.
332  io_object_t::unplug ();
333 
334  delete this;
335 }
336 
337 void zmq::udp_engine_t::sockaddr_to_msg (zmq::msg_t *msg_,
338  const sockaddr_in *addr_)
339 {
340  const char *const name = inet_ntoa (addr_->sin_addr);
341 
342  char port[6];
343  const int port_len =
344  snprintf (port, 6, "%d", static_cast<int> (ntohs (addr_->sin_port)));
345  zmq_assert (port_len > 0 && port_len < 6);
346 
347  const size_t name_len = strlen (name);
348  const int size = static_cast<int> (name_len) + 1 /* colon */
349  + port_len + 1; // terminating NUL
350  const int rc = msg_->init_size (size);
351  errno_assert (rc == 0);
352  msg_->set_flags (msg_t::more);
353 
354  // use memcpy instead of strcpy/strcat, since this is more efficient when
355  // we already know the lengths, which we calculated above
356  char *address = static_cast<char *> (msg_->data ());
357  memcpy (address, name, name_len);
358  address += name_len;
359  *address++ = ':';
360  memcpy (address, port, static_cast<size_t> (port_len));
361  address += port_len;
362  *address = 0;
363 }
364 
365 int zmq::udp_engine_t::resolve_raw_address (const char *name_, size_t length_)
366 {
367  memset (&_raw_address, 0, sizeof _raw_address);
368 
369  const char *delimiter = NULL;
370 
371  // Find delimiter, cannot use memrchr as it is not supported on windows
372  if (length_ != 0) {
373  int chars_left = static_cast<int> (length_);
374  const char *current_char = name_ + length_;
375  do {
376  if (*(--current_char) == ':') {
377  delimiter = current_char;
378  break;
379  }
380  } while (--chars_left != 0);
381  }
382 
383  if (!delimiter) {
384  errno = EINVAL;
385  return -1;
386  }
387 
388  const std::string addr_str (name_, delimiter - name_);
389  const std::string port_str (delimiter + 1, name_ + length_ - delimiter - 1);
390 
391  // Parse the port number (0 is not a valid port).
392  const uint16_t port = static_cast<uint16_t> (atoi (port_str.c_str ()));
393  if (port == 0) {
394  errno = EINVAL;
395  return -1;
396  }
397 
398  _raw_address.sin_family = AF_INET;
399  _raw_address.sin_port = htons (port);
400  _raw_address.sin_addr.s_addr = inet_addr (addr_str.c_str ());
401 
402  if (_raw_address.sin_addr.s_addr == INADDR_NONE) {
403  errno = EINVAL;
404  return -1;
405  }
406 
407  return 0;
408 }
409 
410 void zmq::udp_engine_t::out_event ()
411 {
412  msg_t group_msg;
413  int rc = _session->pull_msg (&group_msg);
414  errno_assert (rc == 0 || (rc == -1 && errno == EAGAIN));
415 
416  if (rc == 0) {
417  msg_t body_msg;
418  rc = _session->pull_msg (&body_msg);
419  // If there's a group, there should also be a body
420  errno_assert (rc == 0);
421 
422  const size_t group_size = group_msg.size ();
423  const size_t body_size = body_msg.size ();
424  size_t size;
425 
426  if (_options.raw_socket) {
427  rc = resolve_raw_address (static_cast<char *> (group_msg.data ()),
428  group_size);
429 
430  // We discard the message if address is not valid
431  if (rc != 0) {
432  rc = group_msg.close ();
433  errno_assert (rc == 0);
434 
435  rc = body_msg.close ();
436  errno_assert (rc == 0);
437 
438  return;
439  }
440 
441  size = body_size;
442 
443  memcpy (_out_buffer, body_msg.data (), body_size);
444  } else {
445  size = group_size + body_size + 1;
446 
447  // TODO: check if larger than maximum size
448  _out_buffer[0] = static_cast<unsigned char> (group_size);
449  memcpy (_out_buffer + 1, group_msg.data (), group_size);
450  memcpy (_out_buffer + 1 + group_size, body_msg.data (), body_size);
451  }
452 
453  rc = group_msg.close ();
454  errno_assert (rc == 0);
455 
456  body_msg.close ();
457  errno_assert (rc == 0);
458 
459 #ifdef ZMQ_HAVE_WINDOWS
460  rc = sendto (_fd, _out_buffer, static_cast<int> (size), 0, _out_address,
461  _out_address_len);
462 #elif defined ZMQ_HAVE_VXWORKS
463  rc = sendto (_fd, reinterpret_cast<caddr_t> (_out_buffer), size, 0,
464  (sockaddr *) _out_address, _out_address_len);
465 #else
466  rc = sendto (_fd, _out_buffer, size, 0, _out_address, _out_address_len);
467 #endif
468  if (rc < 0) {
469 #ifdef ZMQ_HAVE_WINDOWS
470  if (WSAGetLastError () != WSAEWOULDBLOCK) {
472  error (connection_error);
473  }
474 #else
475  if (rc != EWOULDBLOCK) {
477  error (connection_error);
478  }
479 #endif
480  }
481  } else {
482  reset_pollout (_handle);
483  }
484 }
485 
486 const zmq::endpoint_uri_pair_t &zmq::udp_engine_t::get_endpoint () const
487 {
488  return _empty_endpoint;
489 }
490 
491 void zmq::udp_engine_t::restart_output ()
492 {
493  // If we don't support send we just drop all messages
494  if (!_send_enabled) {
495  msg_t msg;
496  while (_session->pull_msg (&msg) == 0)
497  msg.close ();
498  } else {
499  set_pollout (_handle);
500  out_event ();
501  }
502 }
503 
504 void zmq::udp_engine_t::in_event ()
505 {
506  sockaddr_storage in_address;
507  zmq_socklen_t in_addrlen =
508  static_cast<zmq_socklen_t> (sizeof (sockaddr_storage));
509 
510  const int nbytes =
511  recvfrom (_fd, _in_buffer, MAX_UDP_MSG, 0,
512  reinterpret_cast<sockaddr *> (&in_address), &in_addrlen);
513 
514  if (nbytes < 0) {
515 #ifdef ZMQ_HAVE_WINDOWS
516  if (WSAGetLastError () != WSAEWOULDBLOCK) {
517  assert_success_or_recoverable (_fd, nbytes);
518  error (connection_error);
519  }
520 #else
521  if (nbytes != EWOULDBLOCK) {
522  assert_success_or_recoverable (_fd, nbytes);
523  error (connection_error);
524  }
525 #endif
526  return;
527  }
528 
529  int rc;
530  int body_size;
531  int body_offset;
532  msg_t msg;
533 
534  if (_options.raw_socket) {
535  zmq_assert (in_address.ss_family == AF_INET);
536  sockaddr_to_msg (&msg, reinterpret_cast<sockaddr_in *> (&in_address));
537 
538  body_size = nbytes;
539  body_offset = 0;
540  } else {
541  // TODO in out_event, the group size is an *unsigned* char. what is
542  // the maximum value?
543  const char *group_buffer = _in_buffer + 1;
544  const int group_size = _in_buffer[0];
545 
546  rc = msg.init_size (group_size);
547  errno_assert (rc == 0);
548  msg.set_flags (msg_t::more);
549  memcpy (msg.data (), group_buffer, group_size);
550 
551  // This doesn't fit, just ignore
552  if (nbytes - 1 < group_size)
553  return;
554 
555  body_size = nbytes - 1 - group_size;
556  body_offset = 1 + group_size;
557  }
558  // Push group description to session
559  rc = _session->push_msg (&msg);
560  errno_assert (rc == 0 || (rc == -1 && errno == EAGAIN));
561 
562  // Group description message doesn't fit in the pipe, drop
563  if (rc != 0) {
564  rc = msg.close ();
565  errno_assert (rc == 0);
566 
567  reset_pollin (_handle);
568  return;
569  }
570 
571  rc = msg.close ();
572  errno_assert (rc == 0);
573  rc = msg.init_size (body_size);
574  errno_assert (rc == 0);
575  memcpy (msg.data (), _in_buffer + body_offset, body_size);
576 
577  // Push message body to session
578  rc = _session->push_msg (&msg);
579  // Message body doesn't fit in the pipe, drop and reset session state
580  if (rc != 0) {
581  rc = msg.close ();
582  errno_assert (rc == 0);
583 
584  _session->reset ();
585  reset_pollin (_handle);
586  return;
587  }
588 
589  rc = msg.close ();
590  errno_assert (rc == 0);
591  _session->flush ();
592 }
593 
594 bool zmq::udp_engine_t::restart_input ()
595 {
596  if (_recv_enabled) {
597  set_pollin (_handle);
598  in_event ();
599  }
600 
601  return true;
602 }
zmq::bind_to_device
int bind_to_device(fd_t s_, const std::string &bound_device_)
Definition: ip.cpp:244
init
WEPOLL_INTERNAL int init(void)
Definition: wepoll.c:858
closesocket
#define closesocket
Definition: unittest_poller.cpp:13
name
GLuint const GLchar * name
Definition: glcorearb.h:3055
ip.hpp
udp_address.hpp
NULL
NULL
Definition: test_security_zap.cpp:405
EINVAL
#define EINVAL
Definition: errno.hpp:25
EAGAIN
#define EAGAIN
Definition: errno.hpp:14
precompiled.hpp
zmq_assert
#define zmq_assert(x)
Definition: err.hpp:102
string
GLsizei const GLchar *const * string
Definition: glcorearb.h:3083
errno
int errno
address
const char * address
Definition: builds/zos/test_fork.cpp:6
udp_engine.hpp
error
Definition: cJSON.c:88
zmq::msg_t::init_size
int init_size(size_t size_)
Definition: msg.cpp:62
retired_fd
@ retired_fd
Definition: libzmq/tests/testutil.hpp:117
zmq::assert_success_or_recoverable
void assert_success_or_recoverable(fd_t s_, int rc_)
Definition: ip.cpp:830
IPV6_ADD_MEMBERSHIP
#define IPV6_ADD_MEMBERSHIP
Definition: udp_engine.cpp:24
snprintf
int snprintf(char *str, size_t size, const char *format,...)
Definition: port.cc:64
errno_assert
#define errno_assert(x)
Definition: err.hpp:113
zmq::endpoint_uri_pair_t
Definition: endpoint.hpp:17
s_
std::string s_
Definition: gmock-matchers_test.cc:4128
size
#define size
Definition: glcorearb.h:2944
name_
string name_
Definition: googletest.cc:182
zmq::unblock_socket
void unblock_socket(fd_t s_)
Definition: ip.cpp:107
zmq::open_socket
fd_t open_socket(int domain_, int type_, int protocol_)
Definition: ip.cpp:73
size
GLsizeiptr size
Definition: glcorearb.h:2943
name_len
static size_t name_len(const char *name_)
Definition: mechanism.cpp:104
zmq::zmq_socklen_t
socklen_t zmq_socklen_t
Definition: address.hpp:107
err.hpp
fd_t
zmq_fd_t fd_t
Definition: libzmq/tests/testutil.hpp:98
MAX_UDP_MSG
#define MAX_UDP_MSG
Definition: udp_engine.hpp:10
session_base.hpp
zmq::msg_t::data
unsigned char data[max_vsm_size]
Definition: msg.hpp:239
false
#define false
Definition: cJSON.c:70
level
GLint level
Definition: glcorearb.h:2773
zmq::msg_t::set_flags
void set_flags(unsigned char flags_)
Definition: msg.cpp:433
zmq::msg_t
Definition: msg.hpp:33
options_
DebugStringOptions options_
Definition: src/google/protobuf/descriptor.cc:2410


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:07:00