5 #if !defined ZMQ_HAVE_WINDOWS
8 #include <sys/socket.h>
9 #include <netinet/in.h>
10 #include <arpa/inet.h>
11 #ifdef ZMQ_HAVE_VXWORKS
23 #ifndef IPV6_ADD_MEMBERSHIP
24 #define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP
28 #include <TargetConditionals.h>
31 zmq::udp_engine_t::udp_engine_t (
const options_t &
options_) :
35 _handle (static_cast<handle_t> (
NULL)),
38 _send_enabled (
false),
43 zmq::udp_engine_t::~udp_engine_t ()
48 #ifdef ZMQ_HAVE_WINDOWS
50 wsa_assert (rc != SOCKET_ERROR);
63 _send_enabled = send_;
64 _recv_enabled = recv_;
67 _fd =
open_socket (_address->resolved.udp_addr->family (), SOCK_DGRAM,
77 void zmq::udp_engine_t::plug (io_thread_t *io_thread_, session_base_t *session_)
87 io_object_t::plug (io_thread_);
88 _handle = add_fd (_fd);
90 const udp_address_t *
const udp_addr = _address->resolved.udp_addr;
95 if (!_options.bound_device.empty ()) {
99 error (connection_error);
105 if (!_options.raw_socket) {
106 const ip_addr_t *out = udp_addr->target_addr ();
107 _out_address = out->as_sockaddr ();
108 _out_address_len = out->sockaddr_len ();
110 if (out->is_multicast ()) {
111 const bool is_ipv6 = (out->family () == AF_INET6);
113 | set_udp_multicast_loop (_fd, is_ipv6,
114 _options.multicast_loop);
116 if (_options.multicast_hops > 0) {
118 | set_udp_multicast_ttl (_fd, is_ipv6,
119 _options.multicast_hops);
122 rc = rc | set_udp_multicast_iface (_fd, is_ipv6, udp_addr);
126 _out_address =
reinterpret_cast<sockaddr *
> (&_raw_address);
133 rc = rc | set_udp_reuse_address (_fd,
true);
135 const ip_addr_t *bind_addr = udp_addr->bind_addr ();
136 ip_addr_t any = ip_addr_t::any (bind_addr->family ());
137 const ip_addr_t *real_bind_addr;
139 const bool multicast = udp_addr->is_mcast ();
144 rc = rc | set_udp_reuse_port (_fd,
true);
148 any.set_port (bind_addr->port ());
150 real_bind_addr = &any;
152 real_bind_addr = bind_addr;
156 error (protocol_error);
160 #ifdef ZMQ_HAVE_VXWORKS
162 | bind (_fd, (sockaddr *) real_bind_addr->as_sockaddr (),
163 real_bind_addr->sockaddr_len ());
166 | bind (_fd, real_bind_addr->as_sockaddr (),
167 real_bind_addr->sockaddr_len ());
171 error (protocol_error);
176 rc = rc | add_membership (_fd, udp_addr);
181 error (protocol_error);
184 set_pollout (_handle);
188 set_pollin (_handle);
196 int zmq::udp_engine_t::set_udp_multicast_loop (
fd_t s_,
204 level = IPPROTO_IPV6;
205 optname = IPV6_MULTICAST_LOOP;
208 optname = IP_MULTICAST_LOOP;
211 int loop = loop_ ? 1 : 0;
212 const int rc = setsockopt (
s_,
level, optname,
213 reinterpret_cast<char *
> (&loop),
sizeof (loop));
218 int zmq::udp_engine_t::set_udp_multicast_ttl (
fd_t s_,
bool is_ipv6_,
int hops_)
223 level = IPPROTO_IPV6;
229 setsockopt (
s_,
level, IP_MULTICAST_TTL,
230 reinterpret_cast<char *
> (&hops_),
sizeof (hops_));
235 int zmq::udp_engine_t::set_udp_multicast_iface (
fd_t s_,
237 const udp_address_t *addr_)
242 int bind_if = addr_->bind_if ();
247 rc = setsockopt (
s_, IPPROTO_IPV6, IPV6_MULTICAST_IF,
248 reinterpret_cast<char *
> (&bind_if),
252 struct in_addr bind_addr = addr_->bind_addr ()->ipv4.sin_addr;
254 if (bind_addr.s_addr != INADDR_ANY) {
255 rc = setsockopt (
s_, IPPROTO_IP, IP_MULTICAST_IF,
256 reinterpret_cast<char *
> (&bind_addr),
265 int zmq::udp_engine_t::set_udp_reuse_address (
fd_t s_,
bool on_)
267 int on = on_ ? 1 : 0;
268 const int rc = setsockopt (
s_, SOL_SOCKET, SO_REUSEADDR,
269 reinterpret_cast<char *
> (&on),
sizeof (on));
274 int zmq::udp_engine_t::set_udp_reuse_port (
fd_t s_,
bool on_)
279 int on = on_ ? 1 : 0;
280 int rc = setsockopt (
s_, SOL_SOCKET, SO_REUSEPORT,
281 reinterpret_cast<char *
> (&on),
sizeof (on));
287 int zmq::udp_engine_t::add_membership (
fd_t s_,
const udp_address_t *addr_)
289 const ip_addr_t *mcast_addr = addr_->target_addr ();
292 if (mcast_addr->family () == AF_INET) {
294 mreq.imr_multiaddr = mcast_addr->ipv4.sin_addr;
295 mreq.imr_interface = addr_->bind_addr ()->ipv4.sin_addr;
297 rc = setsockopt (
s_, IPPROTO_IP, IP_ADD_MEMBERSHIP,
298 reinterpret_cast<char *
> (&mreq),
sizeof (mreq));
300 }
else if (mcast_addr->family () == AF_INET6) {
301 struct ipv6_mreq mreq;
302 const int iface = addr_->bind_if ();
306 mreq.ipv6mr_multiaddr = mcast_addr->ipv6.sin6_addr;
307 mreq.ipv6mr_interface = iface;
310 reinterpret_cast<char *
> (&mreq),
sizeof (mreq));
317 void zmq::udp_engine_t::error (error_reason_t reason_)
320 _session->engine_error (
false, reason_);
324 void zmq::udp_engine_t::terminate ()
332 io_object_t::unplug ();
337 void zmq::udp_engine_t::sockaddr_to_msg (
zmq::msg_t *msg_,
338 const sockaddr_in *addr_)
340 const char *
const name = inet_ntoa (addr_->sin_addr);
344 snprintf (port, 6,
"%d",
static_cast<int> (ntohs (addr_->sin_port)));
356 char *
address =
static_cast<char *
> (msg_->
data ());
360 memcpy (
address, port,
static_cast<size_t> (port_len));
365 int zmq::udp_engine_t::resolve_raw_address (
const char *
name_,
size_t length_)
367 memset (&_raw_address, 0,
sizeof _raw_address);
369 const char *delimiter =
NULL;
373 int chars_left =
static_cast<int> (length_);
374 const char *current_char =
name_ + length_;
376 if (*(--current_char) ==
':') {
377 delimiter = current_char;
380 }
while (--chars_left != 0);
389 const std::string port_str (delimiter + 1,
name_ + length_ - delimiter - 1);
392 const uint16_t port =
static_cast<uint16_t
> (atoi (port_str.c_str ()));
398 _raw_address.sin_family = AF_INET;
399 _raw_address.sin_port = htons (port);
400 _raw_address.sin_addr.s_addr = inet_addr (addr_str.c_str ());
402 if (_raw_address.sin_addr.s_addr == INADDR_NONE) {
410 void zmq::udp_engine_t::out_event ()
413 int rc = _session->pull_msg (&group_msg);
418 rc = _session->pull_msg (&body_msg);
422 const size_t group_size = group_msg.size ();
423 const size_t body_size = body_msg.size ();
426 if (_options.raw_socket) {
427 rc = resolve_raw_address (
static_cast<char *
> (group_msg.data ()),
432 rc = group_msg.close ();
435 rc = body_msg.close ();
443 memcpy (_out_buffer, body_msg.data (), body_size);
445 size = group_size + body_size + 1;
448 _out_buffer[0] =
static_cast<unsigned char> (group_size);
449 memcpy (_out_buffer + 1, group_msg.data (), group_size);
450 memcpy (_out_buffer + 1 + group_size, body_msg.data (), body_size);
453 rc = group_msg.close ();
459 #ifdef ZMQ_HAVE_WINDOWS
460 rc = sendto (_fd, _out_buffer,
static_cast<int> (
size), 0, _out_address,
462 #elif defined ZMQ_HAVE_VXWORKS
463 rc = sendto (_fd,
reinterpret_cast<caddr_t
> (_out_buffer),
size, 0,
464 (sockaddr *) _out_address, _out_address_len);
466 rc = sendto (_fd, _out_buffer,
size, 0, _out_address, _out_address_len);
469 #ifdef ZMQ_HAVE_WINDOWS
470 if (WSAGetLastError () != WSAEWOULDBLOCK) {
472 error (connection_error);
475 if (rc != EWOULDBLOCK) {
477 error (connection_error);
482 reset_pollout (_handle);
488 return _empty_endpoint;
491 void zmq::udp_engine_t::restart_output ()
494 if (!_send_enabled) {
496 while (_session->pull_msg (&msg) == 0)
499 set_pollout (_handle);
504 void zmq::udp_engine_t::in_event ()
506 sockaddr_storage in_address;
512 reinterpret_cast<sockaddr *
> (&in_address), &in_addrlen);
515 #ifdef ZMQ_HAVE_WINDOWS
516 if (WSAGetLastError () != WSAEWOULDBLOCK) {
518 error (connection_error);
521 if (nbytes != EWOULDBLOCK) {
523 error (connection_error);
534 if (_options.raw_socket) {
536 sockaddr_to_msg (&msg,
reinterpret_cast<sockaddr_in *
> (&in_address));
543 const char *group_buffer = _in_buffer + 1;
544 const int group_size = _in_buffer[0];
546 rc = msg.init_size (group_size);
548 msg.set_flags (msg_t::more);
549 memcpy (msg.data (), group_buffer, group_size);
552 if (nbytes - 1 < group_size)
555 body_size = nbytes - 1 - group_size;
556 body_offset = 1 + group_size;
559 rc = _session->push_msg (&msg);
567 reset_pollin (_handle);
573 rc = msg.init_size (body_size);
575 memcpy (msg.data (), _in_buffer + body_offset, body_size);
578 rc = _session->push_msg (&msg);
585 reset_pollin (_handle);
594 bool zmq::udp_engine_t::restart_input ()
597 set_pollin (_handle);