5 #ifdef ZMQ_HAVE_OPENPGM
23 #define MSG_ERRQUEUE 0x2000
26 zmq::pgm_socket_t::pgm_socket_t (
bool receiver_,
const options_t &
options_) :
34 pgm_msgv_processed (0)
43 int zmq::pgm_socket_t::init_address (
const char *network_,
44 struct pgm_addrinfo_t **res,
45 uint16_t *port_number)
48 const char *port_delim = strrchr (network_,
':');
54 *port_number = atoi (port_delim + 1);
57 if (port_delim - network_ >= (
int)
sizeof (network) - 1) {
61 memset (network,
'\0',
sizeof (network));
62 memcpy (network, network_, port_delim - network_);
64 pgm_error_t *pgm_error =
NULL;
65 struct pgm_addrinfo_t hints;
67 memset (&hints, 0,
sizeof (hints));
68 hints.ai_family = AF_UNSPEC;
69 if (!pgm_getaddrinfo (network,
NULL, res, &pgm_error)) {
72 if (pgm_error->domain == PGM_ERROR_DOMAIN_IF &&
75 (pgm_error->code != PGM_ERROR_SERVICE
76 && pgm_error->code != PGM_ERROR_SOCKTNOSUPPORT)) {
78 pgm_error_free (pgm_error);
99 pgm_msgv_processed = 0;
101 uint16_t port_number;
102 struct pgm_addrinfo_t *res =
NULL;
103 sa_family_t sa_family;
105 pgm_error_t *pgm_error =
NULL;
107 if (init_address (network_, &res, &port_number) < 0) {
114 sa_family = res->ai_send_addrs[0].gsr_group.ss_family;
117 if (udp_encapsulation_) {
118 if (!pgm_socket (&
sock, sa_family, SOCK_SEQPACKET, IPPROTO_UDP,
122 if (pgm_error->domain == PGM_ERROR_DOMAIN_SOCKET
123 && (pgm_error->code != PGM_ERROR_BADF
124 && pgm_error->code != PGM_ERROR_FAULT
125 && pgm_error->code != PGM_ERROR_NOPROTOOPT
126 && pgm_error->code != PGM_ERROR_FAILED))
136 const int encapsulation_port = port_number;
137 if (!pgm_setsockopt (
sock, IPPROTO_PGM, PGM_UDP_ENCAP_UCAST_PORT,
138 &encapsulation_port,
sizeof (encapsulation_port)))
140 if (!pgm_setsockopt (
sock, IPPROTO_PGM, PGM_UDP_ENCAP_MCAST_PORT,
141 &encapsulation_port,
sizeof (encapsulation_port)))
144 if (!pgm_socket (&
sock, sa_family, SOCK_SEQPACKET, IPPROTO_PGM,
148 if (pgm_error->domain == PGM_ERROR_DOMAIN_SOCKET
149 && (pgm_error->code != PGM_ERROR_BADF
150 && pgm_error->code != PGM_ERROR_FAULT
151 && pgm_error->code != PGM_ERROR_NOPROTOOPT
152 && pgm_error->code != PGM_ERROR_FAILED))
163 const int rcvbuf = (int)
options.rcvbuf;
165 if (!pgm_setsockopt (
sock, SOL_SOCKET, SO_RCVBUF, &rcvbuf,
170 const int sndbuf = (int)
options.sndbuf;
172 if (!pgm_setsockopt (
sock, SOL_SOCKET, SO_SNDBUF, &sndbuf,
177 const int max_tpdu = (int)
options.multicast_maxtpdu;
178 if (!pgm_setsockopt (
sock, IPPROTO_PGM, PGM_MTU, &max_tpdu,
184 const int recv_only = 1, rxw_max_tpdu = (int)
options.multicast_maxtpdu,
185 rxw_sqns = compute_sqns (rxw_max_tpdu),
186 peer_expiry = pgm_secs (300), spmr_expiry = pgm_msecs (25),
187 nak_bo_ivl = pgm_msecs (50), nak_rpt_ivl = pgm_msecs (200),
188 nak_rdata_ivl = pgm_msecs (200), nak_data_retries = 50,
189 nak_ncf_retries = 50;
191 if (!pgm_setsockopt (
sock, IPPROTO_PGM, PGM_RECV_ONLY, &recv_only,
193 || !pgm_setsockopt (
sock, IPPROTO_PGM, PGM_RXW_SQNS, &rxw_sqns,
195 || !pgm_setsockopt (
sock, IPPROTO_PGM, PGM_PEER_EXPIRY,
196 &peer_expiry,
sizeof (peer_expiry))
197 || !pgm_setsockopt (
sock, IPPROTO_PGM, PGM_SPMR_EXPIRY,
198 &spmr_expiry,
sizeof (spmr_expiry))
199 || !pgm_setsockopt (
sock, IPPROTO_PGM, PGM_NAK_BO_IVL, &nak_bo_ivl,
201 || !pgm_setsockopt (
sock, IPPROTO_PGM, PGM_NAK_RPT_IVL,
202 &nak_rpt_ivl,
sizeof (nak_rpt_ivl))
203 || !pgm_setsockopt (
sock, IPPROTO_PGM, PGM_NAK_RDATA_IVL,
204 &nak_rdata_ivl,
sizeof (nak_rdata_ivl))
205 || !pgm_setsockopt (
sock, IPPROTO_PGM, PGM_NAK_DATA_RETRIES,
206 &nak_data_retries,
sizeof (nak_data_retries))
207 || !pgm_setsockopt (
sock, IPPROTO_PGM, PGM_NAK_NCF_RETRIES,
208 &nak_ncf_retries,
sizeof (nak_ncf_retries)))
211 const int send_only = 1, max_rte = (int) ((
options.rate * 1000) / 8),
212 txw_max_tpdu = (
int)
options.multicast_maxtpdu,
213 txw_sqns = compute_sqns (txw_max_tpdu),
214 ambient_spm = pgm_secs (30),
216 pgm_msecs (100), pgm_msecs (100), pgm_msecs (100),
217 pgm_msecs (100), pgm_msecs (1300), pgm_secs (7),
218 pgm_secs (16), pgm_secs (25), pgm_secs (30)};
220 if (!pgm_setsockopt (
sock, IPPROTO_PGM, PGM_SEND_ONLY, &send_only,
222 || !pgm_setsockopt (
sock, IPPROTO_PGM, PGM_ODATA_MAX_RTE, &max_rte,
224 || !pgm_setsockopt (
sock, IPPROTO_PGM, PGM_TXW_SQNS, &txw_sqns,
226 || !pgm_setsockopt (
sock, IPPROTO_PGM, PGM_AMBIENT_SPM,
227 &ambient_spm,
sizeof (ambient_spm))
228 || !pgm_setsockopt (
sock, IPPROTO_PGM, PGM_HEARTBEAT_SPM,
229 &heartbeat_spm,
sizeof (heartbeat_spm)))
234 struct pgm_sockaddr_t addr;
236 memset (&addr, 0,
sizeof (addr));
237 addr.sa_port = port_number;
238 addr.sa_addr.sport = DEFAULT_DATA_SOURCE_PORT;
244 if (!pgm_gsi_create_from_data (&addr.sa_addr.gsi, (uint8_t *)
buf, 8))
249 struct pgm_interface_req_t if_req;
250 memset (&if_req, 0,
sizeof (if_req));
251 if_req.ir_interface = res->ai_recv_addrs[0].gsr_interface;
252 if_req.ir_scope_id = 0;
253 if (AF_INET6 == sa_family) {
254 struct sockaddr_in6 sa6;
255 memcpy (&sa6, &res->ai_recv_addrs[0].gsr_group, sizeof (sa6));
256 if_req.ir_scope_id = sa6.sin6_scope_id;
258 if (!pgm_bind3 (
sock, &addr,
sizeof (addr), &if_req,
sizeof (if_req),
259 &if_req,
sizeof (if_req), &pgm_error)) {
262 if ((pgm_error->domain == PGM_ERROR_DOMAIN_SOCKET
263 || pgm_error->domain == PGM_ERROR_DOMAIN_IF)
264 && (pgm_error->code != PGM_ERROR_INVAL
265 && pgm_error->code != PGM_ERROR_BADF
266 && pgm_error->code != PGM_ERROR_FAULT))
276 for (
unsigned i = 0;
i < res->ai_recv_addrs_len;
i++) {
277 if (!pgm_setsockopt (
sock, IPPROTO_PGM, PGM_JOIN_GROUP,
278 &res->ai_recv_addrs[
i], sizeof (
struct group_req)))
281 if (!pgm_setsockopt (
sock, IPPROTO_PGM, PGM_SEND_GROUP,
282 &res->ai_send_addrs[0], sizeof (
struct group_req)))
285 pgm_freeaddrinfo (res);
291 const int multicast_loop = 0;
292 if (!pgm_setsockopt (
sock, IPPROTO_PGM, PGM_MULTICAST_LOOP,
293 &multicast_loop,
sizeof (multicast_loop)))
296 const int multicast_hops =
options.multicast_hops;
297 if (!pgm_setsockopt (
sock, IPPROTO_PGM, PGM_MULTICAST_HOPS,
298 &multicast_hops,
sizeof (multicast_hops)))
303 const int dscp = 0x2e << 2;
304 if (AF_INET6 != sa_family)
305 pgm_setsockopt (
sock, IPPROTO_PGM, PGM_TOS, &dscp,
sizeof (dscp));
307 const int nonblocking = 1;
308 if (!pgm_setsockopt (
sock, IPPROTO_PGM, PGM_NOBLOCK, &nonblocking,
309 sizeof (nonblocking)))
314 if (!pgm_connect (
sock, &pgm_error)) {
323 size_t max_tsdu_size = get_max_tsdu_size ();
324 pgm_msgv_len = (int)
options.in_batch_size / max_tsdu_size;
325 if ((
int)
options.in_batch_size % max_tsdu_size)
329 pgm_msgv = (pgm_msgv_t *) malloc (
sizeof (pgm_msgv_t) * pgm_msgv_len);
337 pgm_close (
sock, FALSE);
341 pgm_freeaddrinfo (res);
344 if (pgm_error !=
NULL) {
345 pgm_error_free (pgm_error);
352 zmq::pgm_socket_t::~pgm_socket_t ()
357 pgm_close (
sock, TRUE);
362 void zmq::pgm_socket_t::get_receiver_fds (
fd_t *receive_fd_,
363 fd_t *waiting_pipe_fd_)
371 socklen =
sizeof (*receive_fd_);
373 pgm_getsockopt (
sock, IPPROTO_PGM, PGM_RECV_SOCK, receive_fd_, &socklen);
375 zmq_assert (socklen ==
sizeof (*receive_fd_));
377 socklen =
sizeof (*waiting_pipe_fd_);
378 rc = pgm_getsockopt (
sock, IPPROTO_PGM, PGM_PENDING_SOCK, waiting_pipe_fd_,
381 zmq_assert (socklen ==
sizeof (*waiting_pipe_fd_));
389 void zmq::pgm_socket_t::get_sender_fds (
fd_t *send_fd_,
391 fd_t *rdata_notify_fd_,
392 fd_t *pending_notify_fd_)
402 socklen =
sizeof (*send_fd_);
403 rc = pgm_getsockopt (
sock, IPPROTO_PGM, PGM_SEND_SOCK, send_fd_, &socklen);
405 zmq_assert (socklen ==
sizeof (*receive_fd_));
407 socklen =
sizeof (*receive_fd_);
409 pgm_getsockopt (
sock, IPPROTO_PGM, PGM_RECV_SOCK, receive_fd_, &socklen);
411 zmq_assert (socklen ==
sizeof (*receive_fd_));
413 socklen =
sizeof (*rdata_notify_fd_);
414 rc = pgm_getsockopt (
sock, IPPROTO_PGM, PGM_REPAIR_SOCK, rdata_notify_fd_,
417 zmq_assert (socklen ==
sizeof (*rdata_notify_fd_));
419 socklen =
sizeof (*pending_notify_fd_);
420 rc = pgm_getsockopt (
sock, IPPROTO_PGM, PGM_PENDING_SOCK,
421 pending_notify_fd_, &socklen);
423 zmq_assert (socklen ==
sizeof (*pending_notify_fd_));
432 const int status = pgm_send (
sock,
data_, data_len_, &nbytes);
439 zmq_assert (status == PGM_IO_STATUS_RATE_LIMITED
440 || status == PGM_IO_STATUS_WOULD_BLOCK);
442 if (status == PGM_IO_STATUS_RATE_LIMITED)
449 last_tx_status = status;
454 long zmq::pgm_socket_t::get_rx_timeout ()
456 if (last_rx_status != PGM_IO_STATUS_RATE_LIMITED
457 && last_rx_status != PGM_IO_STATUS_TIMER_PENDING)
461 socklen_t optlen =
sizeof (tv);
462 const bool rc = pgm_getsockopt (
sock, IPPROTO_PGM,
463 last_rx_status == PGM_IO_STATUS_RATE_LIMITED
469 const long timeout = (tv.tv_sec * 1000) + (tv.tv_usec / 1000);
474 long zmq::pgm_socket_t::get_tx_timeout ()
476 if (last_tx_status != PGM_IO_STATUS_RATE_LIMITED)
480 socklen_t optlen =
sizeof (tv);
482 pgm_getsockopt (
sock, IPPROTO_PGM, PGM_RATE_REMAIN, &tv, &optlen);
485 const long timeout = (tv.tv_sec * 1000) + (tv.tv_usec / 1000);
491 size_t zmq::pgm_socket_t::get_max_tsdu_size ()
494 socklen_t optlen =
sizeof (max_tsdu);
496 bool rc = pgm_getsockopt (
sock, IPPROTO_PGM, PGM_MSS, &max_tsdu, &optlen);
499 return (
size_t) max_tsdu;
506 size_t raw_data_len = 0;
510 if (nbytes_rec == nbytes_processed && nbytes_rec > 0) {
513 nbytes_processed = 0;
514 pgm_msgv_processed = 0;
521 if (nbytes_rec == nbytes_processed) {
529 pgm_error_t *pgm_error =
NULL;
531 const int status = pgm_recvmsgv (
sock, pgm_msgv, pgm_msgv_len,
532 MSG_ERRQUEUE, &nbytes_rec, &pgm_error);
537 last_rx_status = status;
541 if (status == PGM_IO_STATUS_TIMER_PENDING) {
552 if (status == PGM_IO_STATUS_RATE_LIMITED) {
562 if (status == PGM_IO_STATUS_WOULD_BLOCK) {
572 if (status == PGM_IO_STATUS_RESET) {
573 struct pgm_sk_buff_t *skb = pgm_msgv[0].msgv_skb[0];
587 zmq_assert (pgm_msgv_processed <= pgm_msgv_len);
594 zmq_assert (pgm_msgv[pgm_msgv_processed].msgv_len == 1);
596 struct pgm_sk_buff_t *skb = pgm_msgv[pgm_msgv_processed].msgv_skb[0];
599 *raw_data_ = skb->data;
600 raw_data_len = skb->len;
606 pgm_msgv_processed++;
607 zmq_assert (pgm_msgv_processed <= pgm_msgv_len);
608 nbytes_processed += raw_data_len;
613 void zmq::pgm_socket_t::process_upstream ()
615 pgm_msgv_t dummy_msg;
617 size_t dummy_bytes = 0;
618 pgm_error_t *pgm_error =
NULL;
620 const int status = pgm_recvmsgv (
sock, &dummy_msg, 1, MSG_ERRQUEUE,
621 &dummy_bytes, &pgm_error);
628 && (status == PGM_IO_STATUS_TIMER_PENDING
629 || status == PGM_IO_STATUS_RATE_LIMITED
630 || status == PGM_IO_STATUS_WOULD_BLOCK));
632 last_rx_status = status;
634 if (status == PGM_IO_STATUS_TIMER_PENDING)
636 else if (status == PGM_IO_STATUS_RATE_LIMITED)
642 int zmq::pgm_socket_t::compute_sqns (
int tpdu_)
645 uint64_t rate = uint64_t (
options.rate) / 8;
648 uint64_t
size = uint64_t (
options.recovery_ivl) * rate;
651 uint64_t sqns =
size / tpdu_;