zmq.cpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 // "Tell them I was a writer.
4 // A maker of software.
5 // A humanist. A father.
6 // And many things.
7 // But above all, a writer.
8 // Thank You. :)"
9 // - Pieter Hintjens
10 
11 #include "precompiled.hpp"
12 #define ZMQ_TYPE_UNSAFE
13 
14 #include "macros.hpp"
15 #include "poller.hpp"
16 #include "peer.hpp"
17 
18 #if !defined ZMQ_HAVE_POLLER
19 // On AIX platform, poll.h has to be included first to get consistent
20 // definition of pollfd structure (AIX uses 'reqevents' and 'retnevents'
21 // instead of 'events' and 'revents' and defines macros to map from POSIX-y
22 // names to AIX-specific names).
23 #if defined ZMQ_POLL_BASED_ON_POLL && !defined ZMQ_HAVE_WINDOWS
24 #include <poll.h>
25 #endif
26 
27 #include "polling_util.hpp"
28 #endif
29 
30 // TODO: determine if this is an issue, since zmq.h is being loaded from pch.
31 // zmq.h must be included *after* poll.h for AIX to build properly
32 //#include "../include/zmq.h"
33 
34 #if !defined ZMQ_HAVE_WINDOWS
35 #include <unistd.h>
36 #ifdef ZMQ_HAVE_VXWORKS
37 #include <strings.h>
38 #endif
39 #endif
40 
41 // XSI vector I/O
42 #if defined ZMQ_HAVE_UIO
43 #include <sys/uio.h>
44 #else
45 struct iovec
46 {
47  void *iov_base;
48  size_t iov_len;
49 };
50 #endif
51 
52 #include <string.h>
53 #include <stdlib.h>
54 #include <new>
55 #include <climits>
56 
57 #include "proxy.hpp"
58 #include "socket_base.hpp"
59 #include "stdint.hpp"
60 #include "config.hpp"
61 #include "likely.hpp"
62 #include "clock.hpp"
63 #include "ctx.hpp"
64 #include "err.hpp"
65 #include "msg.hpp"
66 #include "fd.hpp"
67 #include "metadata.hpp"
68 #include "socket_poller.hpp"
69 #include "timers.hpp"
70 #include "ip.hpp"
71 #include "address.hpp"
72 
73 #ifdef ZMQ_HAVE_PPOLL
74 #include "polling_util.hpp"
75 #include <sys/select.h>
76 #endif
77 
78 #if defined ZMQ_HAVE_OPENPGM
79 #define __PGM_WININT_H__
80 #include <pgm/pgm.h>
81 #endif
82 
83 // Compile time check whether msg_t fits into zmq_msg_t.
84 typedef char
85  check_msg_t_size[sizeof (zmq::msg_t) == sizeof (zmq_msg_t) ? 1 : -1];
86 
87 
88 void zmq_version (int *major_, int *minor_, int *patch_)
89 {
90  *major_ = ZMQ_VERSION_MAJOR;
91  *minor_ = ZMQ_VERSION_MINOR;
92  *patch_ = ZMQ_VERSION_PATCH;
93 }
94 
95 
96 const char *zmq_strerror (int errnum_)
97 {
98  return zmq::errno_to_string (errnum_);
99 }
100 
101 int zmq_errno (void)
102 {
103  return errno;
104 }
105 
106 
107 // New context API
108 
109 void *zmq_ctx_new (void)
110 {
111  // We do this before the ctx constructor since its embedded mailbox_t
112  // object needs the network to be up and running (at least on Windows).
113  if (!zmq::initialize_network ()) {
114  return NULL;
115  }
116 
117  // Create 0MQ context.
118  zmq::ctx_t *ctx = new (std::nothrow) zmq::ctx_t;
119  if (ctx) {
120  if (!ctx->valid ()) {
121  delete ctx;
122  return NULL;
123  }
124  }
125  return ctx;
126 }
127 
128 int zmq_ctx_term (void *ctx_)
129 {
130  if (!ctx_ || !(static_cast<zmq::ctx_t *> (ctx_))->check_tag ()) {
131  errno = EFAULT;
132  return -1;
133  }
134 
135  const int rc = (static_cast<zmq::ctx_t *> (ctx_))->terminate ();
136  const int en = errno;
137 
138  // Shut down only if termination was not interrupted by a signal.
139  if (!rc || en != EINTR) {
141  }
142 
143  errno = en;
144  return rc;
145 }
146 
147 int zmq_ctx_shutdown (void *ctx_)
148 {
149  if (!ctx_ || !(static_cast<zmq::ctx_t *> (ctx_))->check_tag ()) {
150  errno = EFAULT;
151  return -1;
152  }
153  return (static_cast<zmq::ctx_t *> (ctx_))->shutdown ();
154 }
155 
156 int zmq_ctx_set (void *ctx_, int option_, int optval_)
157 {
158  return zmq_ctx_set_ext (ctx_, option_, &optval_, sizeof (int));
159 }
160 
161 int zmq_ctx_set_ext (void *ctx_,
162  int option_,
163  const void *optval_,
164  size_t optvallen_)
165 {
166  if (!ctx_ || !(static_cast<zmq::ctx_t *> (ctx_))->check_tag ()) {
167  errno = EFAULT;
168  return -1;
169  }
170  return (static_cast<zmq::ctx_t *> (ctx_))
171  ->set (option_, optval_, optvallen_);
172 }
173 
174 int zmq_ctx_get (void *ctx_, int option_)
175 {
176  if (!ctx_ || !(static_cast<zmq::ctx_t *> (ctx_))->check_tag ()) {
177  errno = EFAULT;
178  return -1;
179  }
180  return (static_cast<zmq::ctx_t *> (ctx_))->get (option_);
181 }
182 
183 int zmq_ctx_get_ext (void *ctx_, int option_, void *optval_, size_t *optvallen_)
184 {
185  if (!ctx_ || !(static_cast<zmq::ctx_t *> (ctx_))->check_tag ()) {
186  errno = EFAULT;
187  return -1;
188  }
189  return (static_cast<zmq::ctx_t *> (ctx_))
190  ->get (option_, optval_, optvallen_);
191 }
192 
193 
194 // Stable/legacy context API
195 
196 void *zmq_init (int io_threads_)
197 {
198  if (io_threads_ >= 0) {
199  void *ctx = zmq_ctx_new ();
200  zmq_ctx_set (ctx, ZMQ_IO_THREADS, io_threads_);
201  return ctx;
202  }
203  errno = EINVAL;
204  return NULL;
205 }
206 
207 int zmq_term (void *ctx_)
208 {
209  return zmq_ctx_term (ctx_);
210 }
211 
212 int zmq_ctx_destroy (void *ctx_)
213 {
214  return zmq_ctx_term (ctx_);
215 }
216 
217 
218 // Sockets
219 
221 {
222  zmq::socket_base_t *s = static_cast<zmq::socket_base_t *> (s_);
223  if (!s_ || !s->check_tag ()) {
224  errno = ENOTSOCK;
225  return NULL;
226  }
227  return s;
228 }
229 
230 void *zmq_socket (void *ctx_, int type_)
231 {
232  if (!ctx_ || !(static_cast<zmq::ctx_t *> (ctx_))->check_tag ()) {
233  errno = EFAULT;
234  return NULL;
235  }
236  zmq::ctx_t *ctx = static_cast<zmq::ctx_t *> (ctx_);
237  zmq::socket_base_t *s = ctx->create_socket (type_);
238  return static_cast<void *> (s);
239 }
240 
241 int zmq_close (void *s_)
242 {
244  if (!s)
245  return -1;
246  s->close ();
247  return 0;
248 }
249 
250 int zmq_setsockopt (void *s_,
251  int option_,
252  const void *optval_,
253  size_t optvallen_)
254 {
256  if (!s)
257  return -1;
258  return s->setsockopt (option_, optval_, optvallen_);
259 }
260 
261 int zmq_getsockopt (void *s_, int option_, void *optval_, size_t *optvallen_)
262 {
264  if (!s)
265  return -1;
266  return s->getsockopt (option_, optval_, optvallen_);
267 }
268 
270  void *s_, const char *addr_, uint64_t events_, int event_version_, int type_)
271 {
273  if (!s)
274  return -1;
275  return s->monitor (addr_, events_, event_version_, type_);
276 }
277 
278 int zmq_socket_monitor (void *s_, const char *addr_, int events_)
279 {
280  return zmq_socket_monitor_versioned (s_, addr_, events_, 1, ZMQ_PAIR);
281 }
282 
283 int zmq_join (void *s_, const char *group_)
284 {
286  if (!s)
287  return -1;
288  return s->join (group_);
289 }
290 
291 int zmq_leave (void *s_, const char *group_)
292 {
294  if (!s)
295  return -1;
296  return s->leave (group_);
297 }
298 
299 int zmq_bind (void *s_, const char *addr_)
300 {
302  if (!s)
303  return -1;
304  return s->bind (addr_);
305 }
306 
307 int zmq_connect (void *s_, const char *addr_)
308 {
310  if (!s)
311  return -1;
312  return s->connect (addr_);
313 }
314 
315 uint32_t zmq_connect_peer (void *s_, const char *addr_)
316 {
317  zmq::peer_t *s = static_cast<zmq::peer_t *> (s_);
318  if (!s_ || !s->check_tag ()) {
319  errno = ENOTSOCK;
320  return 0;
321  }
322 
323  int socket_type;
324  size_t socket_type_size = sizeof (socket_type);
325  if (s->getsockopt (ZMQ_TYPE, &socket_type, &socket_type_size) != 0)
326  return 0;
327 
328  if (socket_type != ZMQ_PEER) {
329  errno = ENOTSUP;
330  return 0;
331  }
332 
333  return s->connect_peer (addr_);
334 }
335 
336 
337 int zmq_unbind (void *s_, const char *addr_)
338 {
340  if (!s)
341  return -1;
342  return s->term_endpoint (addr_);
343 }
344 
345 int zmq_disconnect (void *s_, const char *addr_)
346 {
348  if (!s)
349  return -1;
350  return s->term_endpoint (addr_);
351 }
352 
353 // Sending functions.
354 
355 static inline int
357 {
358  size_t sz = zmq_msg_size (msg_);
359  const int rc = s_->send (reinterpret_cast<zmq::msg_t *> (msg_), flags_);
360  if (unlikely (rc < 0))
361  return -1;
362 
363  // This is what I'd like to do, my C++ fu is too weak -- PH 2016/02/09
364  // int max_msgsz = s_->parent->get (ZMQ_MAX_MSGSZ);
365  size_t max_msgsz = INT_MAX;
366 
367  // Truncate returned size to INT_MAX to avoid overflow to negative values
368  return static_cast<int> (sz < max_msgsz ? sz : max_msgsz);
369 }
370 
371 /* To be deprecated once zmq_msg_send() is stable */
372 int zmq_sendmsg (void *s_, zmq_msg_t *msg_, int flags_)
373 {
374  return zmq_msg_send (msg_, s_, flags_);
375 }
376 
377 int zmq_send (void *s_, const void *buf_, size_t len_, int flags_)
378 {
380  if (!s)
381  return -1;
382  zmq_msg_t msg;
383  int rc = zmq_msg_init_buffer (&msg, buf_, len_);
384  if (unlikely (rc < 0))
385  return -1;
386 
387  rc = s_sendmsg (s, &msg, flags_);
388  if (unlikely (rc < 0)) {
389  const int err = errno;
390  const int rc2 = zmq_msg_close (&msg);
391  errno_assert (rc2 == 0);
392  errno = err;
393  return -1;
394  }
395  // Note the optimisation here. We don't close the msg object as it is
396  // empty anyway. This may change when implementation of zmq_msg_t changes.
397  return rc;
398 }
399 
400 int zmq_send_const (void *s_, const void *buf_, size_t len_, int flags_)
401 {
403  if (!s)
404  return -1;
405  zmq_msg_t msg;
406  int rc =
407  zmq_msg_init_data (&msg, const_cast<void *> (buf_), len_, NULL, NULL);
408  if (rc != 0)
409  return -1;
410 
411  rc = s_sendmsg (s, &msg, flags_);
412  if (unlikely (rc < 0)) {
413  const int err = errno;
414  const int rc2 = zmq_msg_close (&msg);
415  errno_assert (rc2 == 0);
416  errno = err;
417  return -1;
418  }
419  // Note the optimisation here. We don't close the msg object as it is
420  // empty anyway. This may change when implementation of zmq_msg_t changes.
421  return rc;
422 }
423 
424 
425 // Send multiple messages.
426 // TODO: this function has no man page
427 //
428 // If flag bit ZMQ_SNDMORE is set the vector is treated as
429 // a single multi-part message, i.e. the last message has
430 // ZMQ_SNDMORE bit switched off.
431 //
432 int zmq_sendiov (void *s_, iovec *a_, size_t count_, int flags_)
433 {
435  if (!s)
436  return -1;
437  if (unlikely (count_ <= 0 || !a_)) {
438  errno = EINVAL;
439  return -1;
440  }
441 
442  int rc = 0;
443  zmq_msg_t msg;
444 
445  for (size_t i = 0; i < count_; ++i) {
446  rc = zmq_msg_init_size (&msg, a_[i].iov_len);
447  if (rc != 0) {
448  rc = -1;
449  break;
450  }
451  memcpy (zmq_msg_data (&msg), a_[i].iov_base, a_[i].iov_len);
452  if (i == count_ - 1)
453  flags_ = flags_ & ~ZMQ_SNDMORE;
454  rc = s_sendmsg (s, &msg, flags_);
455  if (unlikely (rc < 0)) {
456  const int err = errno;
457  const int rc2 = zmq_msg_close (&msg);
458  errno_assert (rc2 == 0);
459  errno = err;
460  rc = -1;
461  break;
462  }
463  }
464  return rc;
465 }
466 
467 // Receiving functions.
468 
469 static int s_recvmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_)
470 {
471  const int rc = s_->recv (reinterpret_cast<zmq::msg_t *> (msg_), flags_);
472  if (unlikely (rc < 0))
473  return -1;
474 
475  // Truncate returned size to INT_MAX to avoid overflow to negative values
476  const size_t sz = zmq_msg_size (msg_);
477  return static_cast<int> (sz < INT_MAX ? sz : INT_MAX);
478 }
479 
480 /* To be deprecated once zmq_msg_recv() is stable */
481 int zmq_recvmsg (void *s_, zmq_msg_t *msg_, int flags_)
482 {
483  return zmq_msg_recv (msg_, s_, flags_);
484 }
485 
486 
487 int zmq_recv (void *s_, void *buf_, size_t len_, int flags_)
488 {
490  if (!s)
491  return -1;
492  zmq_msg_t msg;
493  int rc = zmq_msg_init (&msg);
494  errno_assert (rc == 0);
495 
496  const int nbytes = s_recvmsg (s, &msg, flags_);
497  if (unlikely (nbytes < 0)) {
498  const int err = errno;
499  rc = zmq_msg_close (&msg);
500  errno_assert (rc == 0);
501  errno = err;
502  return -1;
503  }
504 
505  // An oversized message is silently truncated.
506  const size_t to_copy = size_t (nbytes) < len_ ? size_t (nbytes) : len_;
507 
508  // We explicitly allow a null buffer argument if len is zero
509  if (to_copy) {
510  assert (buf_);
511  memcpy (buf_, zmq_msg_data (&msg), to_copy);
512  }
513  rc = zmq_msg_close (&msg);
514  errno_assert (rc == 0);
515 
516  return nbytes;
517 }
518 
519 // Receive a multi-part message
520 //
521 // Receives up to *count_ parts of a multi-part message.
522 // Sets *count_ to the actual number of parts read.
523 // ZMQ_RCVMORE is set to indicate if a complete multi-part message was read.
524 // Returns number of message parts read, or -1 on error.
525 //
526 // Note: even if -1 is returned, some parts of the message
527 // may have been read. Therefore the client must consult
528 // *count_ to retrieve message parts successfully read,
529 // even if -1 is returned.
530 //
531 // The iov_base* buffers of each iovec *a_ filled in by this
532 // function may be freed using free().
533 // TODO: this function has no man page
534 //
535 int zmq_recviov (void *s_, iovec *a_, size_t *count_, int flags_)
536 {
538  if (!s)
539  return -1;
540  if (unlikely (!count_ || *count_ <= 0 || !a_)) {
541  errno = EINVAL;
542  return -1;
543  }
544 
545  const size_t count = *count_;
546  int nread = 0;
547  bool recvmore = true;
548 
549  *count_ = 0;
550 
551  for (size_t i = 0; recvmore && i < count; ++i) {
552  zmq_msg_t msg;
553  int rc = zmq_msg_init (&msg);
554  errno_assert (rc == 0);
555 
556  const int nbytes = s_recvmsg (s, &msg, flags_);
557  if (unlikely (nbytes < 0)) {
558  const int err = errno;
559  rc = zmq_msg_close (&msg);
560  errno_assert (rc == 0);
561  errno = err;
562  nread = -1;
563  break;
564  }
565 
566  a_[i].iov_len = zmq_msg_size (&msg);
567  a_[i].iov_base = static_cast<char *> (malloc (a_[i].iov_len));
568  if (unlikely (!a_[i].iov_base)) {
569  errno = ENOMEM;
570  return -1;
571  }
572  memcpy (a_[i].iov_base, static_cast<char *> (zmq_msg_data (&msg)),
573  a_[i].iov_len);
574  // Assume zmq_socket ZMQ_RVCMORE is properly set.
575  const zmq::msg_t *p_msg = reinterpret_cast<const zmq::msg_t *> (&msg);
576  recvmore = p_msg->flags () & zmq::msg_t::more;
577  rc = zmq_msg_close (&msg);
578  errno_assert (rc == 0);
579  ++*count_;
580  ++nread;
581  }
582  return nread;
583 }
584 
585 // Message manipulators.
586 
588 {
589  return (reinterpret_cast<zmq::msg_t *> (msg_))->init ();
590 }
591 
592 int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_)
593 {
594  return (reinterpret_cast<zmq::msg_t *> (msg_))->init_size (size_);
595 }
596 
597 int zmq_msg_init_buffer (zmq_msg_t *msg_, const void *buf_, size_t size_)
598 {
599  return (reinterpret_cast<zmq::msg_t *> (msg_))->init_buffer (buf_, size_);
600 }
601 
603  zmq_msg_t *msg_, void *data_, size_t size_, zmq_free_fn *ffn_, void *hint_)
604 {
605  return (reinterpret_cast<zmq::msg_t *> (msg_))
606  ->init_data (data_, size_, ffn_, hint_);
607 }
608 
609 int zmq_msg_send (zmq_msg_t *msg_, void *s_, int flags_)
610 {
612  if (!s)
613  return -1;
614  return s_sendmsg (s, msg_, flags_);
615 }
616 
617 int zmq_msg_recv (zmq_msg_t *msg_, void *s_, int flags_)
618 {
620  if (!s)
621  return -1;
622  return s_recvmsg (s, msg_, flags_);
623 }
624 
626 {
627  return (reinterpret_cast<zmq::msg_t *> (msg_))->close ();
628 }
629 
630 int zmq_msg_move (zmq_msg_t *dest_, zmq_msg_t *src_)
631 {
632  return (reinterpret_cast<zmq::msg_t *> (dest_))
633  ->move (*reinterpret_cast<zmq::msg_t *> (src_));
634 }
635 
636 int zmq_msg_copy (zmq_msg_t *dest_, zmq_msg_t *src_)
637 {
638  return (reinterpret_cast<zmq::msg_t *> (dest_))
639  ->copy (*reinterpret_cast<zmq::msg_t *> (src_));
640 }
641 
642 void *zmq_msg_data (zmq_msg_t *msg_)
643 {
644  return (reinterpret_cast<zmq::msg_t *> (msg_))->data ();
645 }
646 
647 size_t zmq_msg_size (const zmq_msg_t *msg_)
648 {
649  return ((zmq::msg_t *) msg_)->size ();
650 }
651 
652 int zmq_msg_more (const zmq_msg_t *msg_)
653 {
654  return zmq_msg_get (msg_, ZMQ_MORE);
655 }
656 
657 int zmq_msg_get (const zmq_msg_t *msg_, int property_)
658 {
659  const char *fd_string;
660 
661  switch (property_) {
662  case ZMQ_MORE:
663  return (((zmq::msg_t *) msg_)->flags () & zmq::msg_t::more) ? 1 : 0;
664  case ZMQ_SRCFD:
665  fd_string = zmq_msg_gets (msg_, "__fd");
666  if (fd_string == NULL)
667  return -1;
668 
669  return atoi (fd_string);
670  case ZMQ_SHARED:
671  return (((zmq::msg_t *) msg_)->is_cmsg ())
672  || (((zmq::msg_t *) msg_)->flags () & zmq::msg_t::shared)
673  ? 1
674  : 0;
675  default:
676  errno = EINVAL;
677  return -1;
678  }
679 }
680 
681 int zmq_msg_set (zmq_msg_t *, int, int)
682 {
683  // No properties supported at present
684  errno = EINVAL;
685  return -1;
686 }
687 
688 int zmq_msg_set_routing_id (zmq_msg_t *msg_, uint32_t routing_id_)
689 {
690  return (reinterpret_cast<zmq::msg_t *> (msg_))
691  ->set_routing_id (routing_id_);
692 }
693 
695 {
696  return (reinterpret_cast<zmq::msg_t *> (msg_))->get_routing_id ();
697 }
698 
699 int zmq_msg_set_group (zmq_msg_t *msg_, const char *group_)
700 {
701  return (reinterpret_cast<zmq::msg_t *> (msg_))->set_group (group_);
702 }
703 
704 const char *zmq_msg_group (zmq_msg_t *msg_)
705 {
706  return (reinterpret_cast<zmq::msg_t *> (msg_))->group ();
707 }
708 
709 // Get message metadata string
710 
711 const char *zmq_msg_gets (const zmq_msg_t *msg_, const char *property_)
712 {
713  const zmq::metadata_t *metadata =
714  reinterpret_cast<const zmq::msg_t *> (msg_)->metadata ();
715  const char *value = NULL;
716  if (metadata)
717  value = metadata->get (std::string (property_));
718  if (value)
719  return value;
720 
721  errno = EINVAL;
722  return NULL;
723 }
724 
725 // Polling.
726 
727 #if defined ZMQ_HAVE_POLLER
728 static int zmq_poller_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
729 {
730  // implement zmq_poll on top of zmq_poller
731  int rc;
732  zmq_poller_event_t *events;
733  zmq::socket_poller_t poller;
734  events = new (std::nothrow) zmq_poller_event_t[nitems_];
735  alloc_assert (events);
736 
737  bool repeat_items = false;
738  // Register sockets with poller
739  for (int i = 0; i < nitems_; i++) {
740  items_[i].revents = 0;
741 
742  bool modify = false;
743  short e = items_[i].events;
744  if (items_[i].socket) {
745  // Poll item is a 0MQ socket.
746  for (int j = 0; j < i; ++j) {
747  // Check for repeat entries
748  if (items_[j].socket == items_[i].socket) {
749  repeat_items = true;
750  modify = true;
751  e |= items_[j].events;
752  }
753  }
754  if (modify) {
755  rc = zmq_poller_modify (&poller, items_[i].socket, e);
756  } else {
757  rc = zmq_poller_add (&poller, items_[i].socket, NULL, e);
758  }
759  if (rc < 0) {
760  delete[] events;
761  return rc;
762  }
763  } else {
764  // Poll item is a raw file descriptor.
765  for (int j = 0; j < i; ++j) {
766  // Check for repeat entries
767  if (!items_[j].socket && items_[j].fd == items_[i].fd) {
768  repeat_items = true;
769  modify = true;
770  e |= items_[j].events;
771  }
772  }
773  if (modify) {
774  rc = zmq_poller_modify_fd (&poller, items_[i].fd, e);
775  } else {
776  rc = zmq_poller_add_fd (&poller, items_[i].fd, NULL, e);
777  }
778  if (rc < 0) {
779  delete[] events;
780  return rc;
781  }
782  }
783  }
784 
785  // Wait for events
786  rc = zmq_poller_wait_all (&poller, events, nitems_, timeout_);
787  if (rc < 0) {
788  delete[] events;
789  if (zmq_errno () == EAGAIN) {
790  return 0;
791  }
792  return rc;
793  }
794 
795  // Transform poller events into zmq_pollitem events.
796  // items_ contains all items, while events only contains fired events.
797  // If no sockets are repeated (likely), the two are still co-ordered, so step through the items
798  // checking for matches only on the first event.
799  // If there are repeat items, they cannot be assumed to be co-ordered,
800  // so each pollitem must check fired events from the beginning.
801  int j_start = 0, found_events = rc;
802  for (int i = 0; i < nitems_; i++) {
803  for (int j = j_start; j < found_events; ++j) {
804  if ((items_[i].socket && items_[i].socket == events[j].socket)
805  || (!(items_[i].socket || events[j].socket)
806  && items_[i].fd == events[j].fd)) {
807  items_[i].revents = events[j].events & items_[i].events;
808  if (!repeat_items) {
809  // no repeats, we can ignore events we've already seen
810  j_start++;
811  }
812  break;
813  }
814  if (!repeat_items) {
815  // no repeats, never have to look at j > j_start
816  break;
817  }
818  }
819  }
820 
821  // Cleanup
822  delete[] events;
823  return rc;
824 }
825 #endif // ZMQ_HAVE_POLLER
826 
827 int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
828 {
829 #if defined ZMQ_HAVE_POLLER
830  // if poller is present, use that if there is at least 1 thread-safe socket,
831  // otherwise fall back to the previous implementation as it's faster.
832  for (int i = 0; i != nitems_; i++) {
833  if (items_[i].socket) {
834  zmq::socket_base_t *s = as_socket_base_t (items_[i].socket);
835  if (s) {
836  if (s->is_thread_safe ())
837  return zmq_poller_poll (items_, nitems_, timeout_);
838  } else {
839  //as_socket_base_t returned NULL : socket is invalid
840  return -1;
841  }
842  }
843  }
844 #endif // ZMQ_HAVE_POLLER
845 #if defined ZMQ_POLL_BASED_ON_POLL || defined ZMQ_POLL_BASED_ON_SELECT
846  if (unlikely (nitems_ < 0)) {
847  errno = EINVAL;
848  return -1;
849  }
850  if (unlikely (nitems_ == 0)) {
851  if (timeout_ == 0)
852  return 0;
853 #if defined ZMQ_HAVE_WINDOWS
854  Sleep (timeout_ > 0 ? timeout_ : INFINITE);
855  return 0;
856 #elif defined ZMQ_HAVE_VXWORKS
857  struct timespec ns_;
858  ns_.tv_sec = timeout_ / 1000;
859  ns_.tv_nsec = timeout_ % 1000 * 1000000;
860  return nanosleep (&ns_, 0);
861 #else
862  return usleep (timeout_ * 1000);
863 #endif
864  }
865  if (!items_) {
866  errno = EFAULT;
867  return -1;
868  }
869 
870  zmq::clock_t clock;
871  uint64_t now = 0;
872  uint64_t end = 0;
873 #if defined ZMQ_POLL_BASED_ON_POLL
875 
876  // Build pollset for poll () system call.
877  for (int i = 0; i != nitems_; i++) {
878  // If the poll item is a 0MQ socket, we poll on the file descriptor
879  // retrieved by the ZMQ_FD socket option.
880  if (items_[i].socket) {
881  size_t zmq_fd_size = sizeof (zmq::fd_t);
882  if (zmq_getsockopt (items_[i].socket, ZMQ_FD, &pollfds[i].fd,
883  &zmq_fd_size)
884  == -1) {
885  return -1;
886  }
887  pollfds[i].events = items_[i].events ? POLLIN : 0;
888  }
889  // Else, the poll item is a raw file descriptor. Just convert the
890  // events to normal POLLIN/POLLOUT for poll ().
891  else {
892  pollfds[i].fd = items_[i].fd;
893  pollfds[i].events =
894  (items_[i].events & ZMQ_POLLIN ? POLLIN : 0)
895  | (items_[i].events & ZMQ_POLLOUT ? POLLOUT : 0)
896  | (items_[i].events & ZMQ_POLLPRI ? POLLPRI : 0);
897  }
898  }
899 #else
900  // Ensure we do not attempt to select () on more than FD_SETSIZE
901  // file descriptors.
902  // TODO since this function is called by a client, we could return errno EINVAL/ENOMEM/... here
903  zmq_assert (nitems_ <= FD_SETSIZE);
904 
905  zmq::optimized_fd_set_t pollset_in (nitems_);
906  FD_ZERO (pollset_in.get ());
907  zmq::optimized_fd_set_t pollset_out (nitems_);
908  FD_ZERO (pollset_out.get ());
909  zmq::optimized_fd_set_t pollset_err (nitems_);
910  FD_ZERO (pollset_err.get ());
911 
912  zmq::fd_t maxfd = 0;
913 
914  // Build the fd_sets for passing to select ().
915  for (int i = 0; i != nitems_; i++) {
916  // If the poll item is a 0MQ socket we are interested in input on the
917  // notification file descriptor retrieved by the ZMQ_FD socket option.
918  if (items_[i].socket) {
919  size_t zmq_fd_size = sizeof (zmq::fd_t);
920  zmq::fd_t notify_fd;
921  if (zmq_getsockopt (items_[i].socket, ZMQ_FD, &notify_fd,
922  &zmq_fd_size)
923  == -1)
924  return -1;
925  if (items_[i].events) {
926  FD_SET (notify_fd, pollset_in.get ());
927  if (maxfd < notify_fd)
928  maxfd = notify_fd;
929  }
930  }
931  // Else, the poll item is a raw file descriptor. Convert the poll item
932  // events to the appropriate fd_sets.
933  else {
934  if (items_[i].events & ZMQ_POLLIN)
935  FD_SET (items_[i].fd, pollset_in.get ());
936  if (items_[i].events & ZMQ_POLLOUT)
937  FD_SET (items_[i].fd, pollset_out.get ());
938  if (items_[i].events & ZMQ_POLLERR)
939  FD_SET (items_[i].fd, pollset_err.get ());
940  if (maxfd < items_[i].fd)
941  maxfd = items_[i].fd;
942  }
943  }
944 
945  zmq::optimized_fd_set_t inset (nitems_);
946  zmq::optimized_fd_set_t outset (nitems_);
947  zmq::optimized_fd_set_t errset (nitems_);
948 #endif
949 
950  bool first_pass = true;
951  int nevents = 0;
952 
953  while (true) {
954 #if defined ZMQ_POLL_BASED_ON_POLL
955 
956  // Compute the timeout for the subsequent poll.
957  const zmq::timeout_t timeout =
958  zmq::compute_timeout (first_pass, timeout_, now, end);
959 
960  // Wait for events.
961  {
962  const int rc = poll (&pollfds[0], nitems_, timeout);
963  if (rc == -1 && errno == EINTR) {
964  return -1;
965  }
966  errno_assert (rc >= 0);
967  }
968  // Check for the events.
969  for (int i = 0; i != nitems_; i++) {
970  items_[i].revents = 0;
971 
972  // The poll item is a 0MQ socket. Retrieve pending events
973  // using the ZMQ_EVENTS socket option.
974  if (items_[i].socket) {
975  size_t zmq_events_size = sizeof (uint32_t);
976  uint32_t zmq_events;
977  if (zmq_getsockopt (items_[i].socket, ZMQ_EVENTS, &zmq_events,
978  &zmq_events_size)
979  == -1) {
980  return -1;
981  }
982  if ((items_[i].events & ZMQ_POLLOUT)
983  && (zmq_events & ZMQ_POLLOUT))
984  items_[i].revents |= ZMQ_POLLOUT;
985  if ((items_[i].events & ZMQ_POLLIN)
986  && (zmq_events & ZMQ_POLLIN))
987  items_[i].revents |= ZMQ_POLLIN;
988  }
989  // Else, the poll item is a raw file descriptor, simply convert
990  // the events to zmq_pollitem_t-style format.
991  else {
992  if (pollfds[i].revents & POLLIN)
993  items_[i].revents |= ZMQ_POLLIN;
994  if (pollfds[i].revents & POLLOUT)
995  items_[i].revents |= ZMQ_POLLOUT;
996  if (pollfds[i].revents & POLLPRI)
997  items_[i].revents |= ZMQ_POLLPRI;
998  if (pollfds[i].revents & ~(POLLIN | POLLOUT | POLLPRI))
999  items_[i].revents |= ZMQ_POLLERR;
1000  }
1001 
1002  if (items_[i].revents)
1003  nevents++;
1004  }
1005 
1006 #else
1007 
1008  // Compute the timeout for the subsequent poll.
1009  timeval timeout;
1010  timeval *ptimeout;
1011  if (first_pass) {
1012  timeout.tv_sec = 0;
1013  timeout.tv_usec = 0;
1014  ptimeout = &timeout;
1015  } else if (timeout_ < 0)
1016  ptimeout = NULL;
1017  else {
1018  timeout.tv_sec = static_cast<long> ((end - now) / 1000);
1019  timeout.tv_usec = static_cast<long> ((end - now) % 1000 * 1000);
1020  ptimeout = &timeout;
1021  }
1022 
1023  // Wait for events. Ignore interrupts if there's infinite timeout.
1024  while (true) {
1025  memcpy (inset.get (), pollset_in.get (),
1026  zmq::valid_pollset_bytes (*pollset_in.get ()));
1027  memcpy (outset.get (), pollset_out.get (),
1028  zmq::valid_pollset_bytes (*pollset_out.get ()));
1029  memcpy (errset.get (), pollset_err.get (),
1030  zmq::valid_pollset_bytes (*pollset_err.get ()));
1031 #if defined ZMQ_HAVE_WINDOWS
1032  int rc =
1033  select (0, inset.get (), outset.get (), errset.get (), ptimeout);
1034  if (unlikely (rc == SOCKET_ERROR)) {
1035  errno = zmq::wsa_error_to_errno (WSAGetLastError ());
1036  wsa_assert (errno == ENOTSOCK);
1037  return -1;
1038  }
1039 #else
1040  int rc = select (maxfd + 1, inset.get (), outset.get (),
1041  errset.get (), ptimeout);
1042  if (unlikely (rc == -1)) {
1043  errno_assert (errno == EINTR || errno == EBADF);
1044  return -1;
1045  }
1046 #endif
1047  break;
1048  }
1049 
1050  // Check for the events.
1051  for (int i = 0; i != nitems_; i++) {
1052  items_[i].revents = 0;
1053 
1054  // The poll item is a 0MQ socket. Retrieve pending events
1055  // using the ZMQ_EVENTS socket option.
1056  if (items_[i].socket) {
1057  size_t zmq_events_size = sizeof (uint32_t);
1058  uint32_t zmq_events;
1059  if (zmq_getsockopt (items_[i].socket, ZMQ_EVENTS, &zmq_events,
1060  &zmq_events_size)
1061  == -1)
1062  return -1;
1063  if ((items_[i].events & ZMQ_POLLOUT)
1064  && (zmq_events & ZMQ_POLLOUT))
1065  items_[i].revents |= ZMQ_POLLOUT;
1066  if ((items_[i].events & ZMQ_POLLIN)
1067  && (zmq_events & ZMQ_POLLIN))
1068  items_[i].revents |= ZMQ_POLLIN;
1069  }
1070  // Else, the poll item is a raw file descriptor, simply convert
1071  // the events to zmq_pollitem_t-style format.
1072  else {
1073  if (FD_ISSET (items_[i].fd, inset.get ()))
1074  items_[i].revents |= ZMQ_POLLIN;
1075  if (FD_ISSET (items_[i].fd, outset.get ()))
1076  items_[i].revents |= ZMQ_POLLOUT;
1077  if (FD_ISSET (items_[i].fd, errset.get ()))
1078  items_[i].revents |= ZMQ_POLLERR;
1079  }
1080 
1081  if (items_[i].revents)
1082  nevents++;
1083  }
1084 #endif
1085 
1086  // If timeout is zero, exit immediately whether there are events or not.
1087  if (timeout_ == 0)
1088  break;
1089 
1090  // If there are events to return, we can exit immediately.
1091  if (nevents)
1092  break;
1093 
1094  // At this point we are meant to wait for events but there are none.
1095  // If timeout is infinite we can just loop until we get some events.
1096  if (timeout_ < 0) {
1097  if (first_pass)
1098  first_pass = false;
1099  continue;
1100  }
1101 
1102  // The timeout is finite and there are no events. In the first pass
1103  // we get a timestamp of when the polling have begun. (We assume that
1104  // first pass have taken negligible time). We also compute the time
1105  // when the polling should time out.
1106  if (first_pass) {
1107  now = clock.now_ms ();
1108  end = now + timeout_;
1109  if (now == end)
1110  break;
1111  first_pass = false;
1112  continue;
1113  }
1114 
1115  // Find out whether timeout have expired.
1116  now = clock.now_ms ();
1117  if (now >= end)
1118  break;
1119  }
1120 
1121  return nevents;
1122 #else
1123  // Exotic platforms that support neither poll() nor select().
1124  errno = ENOTSUP;
1125  return -1;
1126 #endif
1127 }
1128 
1129 #ifdef ZMQ_HAVE_PPOLL
1130 // return values of 0 or -1 should be returned from zmq_poll; return value 1 means items passed checks
1131 int zmq_poll_check_items_ (zmq_pollitem_t *items_, int nitems_, long timeout_)
1132 {
1133  if (unlikely (nitems_ < 0)) {
1134  errno = EINVAL;
1135  return -1;
1136  }
1137  if (unlikely (nitems_ == 0)) {
1138  if (timeout_ == 0)
1139  return 0;
1140 #if defined ZMQ_HAVE_WINDOWS
1141  Sleep (timeout_ > 0 ? timeout_ : INFINITE);
1142  return 0;
1143 #elif defined ZMQ_HAVE_VXWORKS
1144  struct timespec ns_;
1145  ns_.tv_sec = timeout_ / 1000;
1146  ns_.tv_nsec = timeout_ % 1000 * 1000000;
1147  return nanosleep (&ns_, 0);
1148 #else
1149  return usleep (timeout_ * 1000);
1150 #endif
1151  }
1152  if (!items_) {
1153  errno = EFAULT;
1154  return -1;
1155  }
1156  return 1;
1157 }
1158 
1159 struct zmq_poll_select_fds_t_
1160 {
1161  explicit zmq_poll_select_fds_t_ (int nitems_) :
1162  pollset_in (nitems_),
1163  pollset_out (nitems_),
1164  pollset_err (nitems_),
1165  inset (nitems_),
1166  outset (nitems_),
1167  errset (nitems_),
1168  maxfd (0)
1169  {
1170  FD_ZERO (pollset_in.get ());
1171  FD_ZERO (pollset_out.get ());
1172  FD_ZERO (pollset_err.get ());
1173  }
1174 
1175  zmq::optimized_fd_set_t pollset_in;
1176  zmq::optimized_fd_set_t pollset_out;
1177  zmq::optimized_fd_set_t pollset_err;
1178  zmq::optimized_fd_set_t inset;
1179  zmq::optimized_fd_set_t outset;
1180  zmq::optimized_fd_set_t errset;
1181  zmq::fd_t maxfd;
1182 };
1183 
1184 zmq_poll_select_fds_t_
1185 zmq_poll_build_select_fds_ (zmq_pollitem_t *items_, int nitems_, int &rc)
1186 {
1187  // Ensure we do not attempt to select () on more than FD_SETSIZE
1188  // file descriptors.
1189  // TODO since this function is called by a client, we could return errno EINVAL/ENOMEM/... here
1190  zmq_assert (nitems_ <= FD_SETSIZE);
1191 
1192  zmq_poll_select_fds_t_ fds (nitems_);
1193 
1194  // Build the fd_sets for passing to select ().
1195  for (int i = 0; i != nitems_; i++) {
1196  // If the poll item is a 0MQ socket we are interested in input on the
1197  // notification file descriptor retrieved by the ZMQ_FD socket option.
1198  if (items_[i].socket) {
1199  size_t zmq_fd_size = sizeof (zmq::fd_t);
1200  zmq::fd_t notify_fd;
1201  if (zmq_getsockopt (items_[i].socket, ZMQ_FD, &notify_fd,
1202  &zmq_fd_size)
1203  == -1) {
1204  rc = -1;
1205  return fds;
1206  }
1207  if (items_[i].events) {
1208  FD_SET (notify_fd, fds.pollset_in.get ());
1209  if (fds.maxfd < notify_fd)
1210  fds.maxfd = notify_fd;
1211  }
1212  }
1213  // Else, the poll item is a raw file descriptor. Convert the poll item
1214  // events to the appropriate fd_sets.
1215  else {
1216  if (items_[i].events & ZMQ_POLLIN)
1217  FD_SET (items_[i].fd, fds.pollset_in.get ());
1218  if (items_[i].events & ZMQ_POLLOUT)
1219  FD_SET (items_[i].fd, fds.pollset_out.get ());
1220  if (items_[i].events & ZMQ_POLLERR)
1221  FD_SET (items_[i].fd, fds.pollset_err.get ());
1222  if (fds.maxfd < items_[i].fd)
1223  fds.maxfd = items_[i].fd;
1224  }
1225  }
1226 
1227  rc = 0;
1228  return fds;
1229 }
1230 
1231 timeval *zmq_poll_select_set_timeout_ (
1232  long timeout_, bool first_pass, uint64_t now, uint64_t end, timeval &timeout)
1233 {
1234  timeval *ptimeout;
1235  if (first_pass) {
1236  timeout.tv_sec = 0;
1237  timeout.tv_usec = 0;
1238  ptimeout = &timeout;
1239  } else if (timeout_ < 0)
1240  ptimeout = NULL;
1241  else {
1242  timeout.tv_sec = static_cast<long> ((end - now) / 1000);
1243  timeout.tv_usec = static_cast<long> ((end - now) % 1000 * 1000);
1244  ptimeout = &timeout;
1245  }
1246  return ptimeout;
1247 }
1248 
1249 timespec *zmq_poll_select_set_timeout_ (
1250  long timeout_, bool first_pass, uint64_t now, uint64_t end, timespec &timeout)
1251 {
1252  timespec *ptimeout;
1253  if (first_pass) {
1254  timeout.tv_sec = 0;
1255  timeout.tv_nsec = 0;
1256  ptimeout = &timeout;
1257  } else if (timeout_ < 0)
1258  ptimeout = NULL;
1259  else {
1260  timeout.tv_sec = static_cast<long> ((end - now) / 1000);
1261  timeout.tv_nsec = static_cast<long> ((end - now) % 1000 * 1000000);
1262  ptimeout = &timeout;
1263  }
1264  return ptimeout;
1265 }
1266 
1267 int zmq_poll_select_check_events_ (zmq_pollitem_t *items_,
1268  int nitems_,
1269  zmq_poll_select_fds_t_ &fds,
1270  int &nevents)
1271 {
1272  // Check for the events.
1273  for (int i = 0; i != nitems_; i++) {
1274  items_[i].revents = 0;
1275 
1276  // The poll item is a 0MQ socket. Retrieve pending events
1277  // using the ZMQ_EVENTS socket option.
1278  if (items_[i].socket) {
1279  size_t zmq_events_size = sizeof (uint32_t);
1280  uint32_t zmq_events;
1281  if (zmq_getsockopt (items_[i].socket, ZMQ_EVENTS, &zmq_events,
1282  &zmq_events_size)
1283  == -1)
1284  return -1;
1285  if ((items_[i].events & ZMQ_POLLOUT) && (zmq_events & ZMQ_POLLOUT))
1286  items_[i].revents |= ZMQ_POLLOUT;
1287  if ((items_[i].events & ZMQ_POLLIN) && (zmq_events & ZMQ_POLLIN))
1288  items_[i].revents |= ZMQ_POLLIN;
1289  }
1290  // Else, the poll item is a raw file descriptor, simply convert
1291  // the events to zmq_pollitem_t-style format.
1292  else {
1293  if (FD_ISSET (items_[i].fd, fds.inset.get ()))
1294  items_[i].revents |= ZMQ_POLLIN;
1295  if (FD_ISSET (items_[i].fd, fds.outset.get ()))
1296  items_[i].revents |= ZMQ_POLLOUT;
1297  if (FD_ISSET (items_[i].fd, fds.errset.get ()))
1298  items_[i].revents |= ZMQ_POLLERR;
1299  }
1300 
1301  if (items_[i].revents)
1302  nevents++;
1303  }
1304 
1305  return 0;
1306 }
1307 
1308 bool zmq_poll_must_break_loop_ (long timeout_,
1309  int nevents,
1310  bool &first_pass,
1311  zmq::clock_t &clock,
1312  uint64_t &now,
1313  uint64_t &end)
1314 {
1315  // If timeout is zero, exit immediately whether there are events or not.
1316  if (timeout_ == 0)
1317  return true;
1318 
1319  // If there are events to return, we can exit immediately.
1320  if (nevents)
1321  return true;
1322 
1323  // At this point we are meant to wait for events but there are none.
1324  // If timeout is infinite we can just loop until we get some events.
1325  if (timeout_ < 0) {
1326  if (first_pass)
1327  first_pass = false;
1328  return false;
1329  }
1330 
1331  // The timeout is finite and there are no events. In the first pass
1332  // we get a timestamp of when the polling have begun. (We assume that
1333  // first pass have taken negligible time). We also compute the time
1334  // when the polling should time out.
1335  if (first_pass) {
1336  now = clock.now_ms ();
1337  end = now + timeout_;
1338  if (now == end)
1339  return true;
1340  first_pass = false;
1341  return false;
1342  }
1343 
1344  // Find out whether timeout have expired.
1345  now = clock.now_ms ();
1346  if (now >= end)
1347  return true;
1348 
1349  // finally, in all other cases, we just continue
1350  return false;
1351 }
1352 #endif // ZMQ_HAVE_PPOLL
1353 
1354 #if !defined _WIN32
1356  int nitems_,
1357  long timeout_,
1358  const sigset_t *sigmask_)
1359 #else
1360 // Windows has no sigset_t
1361 int zmq_ppoll (zmq_pollitem_t *items_,
1362  int nitems_,
1363  long timeout_,
1364  const void *sigmask_)
1365 #endif
1366 {
1367 #ifdef ZMQ_HAVE_PPOLL
1368  int rc = zmq_poll_check_items_ (items_, nitems_, timeout_);
1369  if (rc <= 0) {
1370  return rc;
1371  }
1372 
1373  zmq::clock_t clock;
1374  uint64_t now = 0;
1375  uint64_t end = 0;
1376  zmq_poll_select_fds_t_ fds =
1377  zmq_poll_build_select_fds_ (items_, nitems_, rc);
1378  if (rc == -1) {
1379  return -1;
1380  }
1381 
1382  bool first_pass = true;
1383  int nevents = 0;
1384 
1385  while (true) {
1386  // Compute the timeout for the subsequent poll.
1387  timespec timeout;
1388  timespec *ptimeout = zmq_poll_select_set_timeout_ (timeout_, first_pass,
1389  now, end, timeout);
1390 
1391  // Wait for events. Ignore interrupts if there's infinite timeout.
1392  while (true) {
1393  memcpy (fds.inset.get (), fds.pollset_in.get (),
1394  zmq::valid_pollset_bytes (*fds.pollset_in.get ()));
1395  memcpy (fds.outset.get (), fds.pollset_out.get (),
1396  zmq::valid_pollset_bytes (*fds.pollset_out.get ()));
1397  memcpy (fds.errset.get (), fds.pollset_err.get (),
1398  zmq::valid_pollset_bytes (*fds.pollset_err.get ()));
1399  int rc =
1400  pselect (fds.maxfd + 1, fds.inset.get (), fds.outset.get (),
1401  fds.errset.get (), ptimeout, sigmask_);
1402  if (unlikely (rc == -1)) {
1403  errno_assert (errno == EINTR || errno == EBADF);
1404  return -1;
1405  }
1406  break;
1407  }
1408 
1409  rc = zmq_poll_select_check_events_ (items_, nitems_, fds, nevents);
1410  if (rc < 0) {
1411  return rc;
1412  }
1413 
1414  if (zmq_poll_must_break_loop_ (timeout_, nevents, first_pass, clock,
1415  now, end)) {
1416  break;
1417  }
1418  }
1419 
1420  return nevents;
1421 #else
1422  errno = ENOTSUP;
1423  return -1;
1424 #endif // ZMQ_HAVE_PPOLL
1425 }
1426 
1427 // The poller functionality
1428 
1429 void *zmq_poller_new (void)
1430 {
1431  zmq::socket_poller_t *poller = new (std::nothrow) zmq::socket_poller_t;
1432  if (!poller) {
1433  errno = ENOMEM;
1434  }
1435  return poller;
1436 }
1437 
1438 int zmq_poller_destroy (void **poller_p_)
1439 {
1440  if (poller_p_) {
1441  const zmq::socket_poller_t *const poller =
1442  static_cast<const zmq::socket_poller_t *> (*poller_p_);
1443  if (poller && poller->check_tag ()) {
1444  delete poller;
1445  *poller_p_ = NULL;
1446  return 0;
1447  }
1448  }
1449  errno = EFAULT;
1450  return -1;
1451 }
1452 
1453 
1454 static int check_poller (void *const poller_)
1455 {
1456  if (!poller_
1457  || !(static_cast<zmq::socket_poller_t *> (poller_))->check_tag ()) {
1458  errno = EFAULT;
1459  return -1;
1460  }
1461 
1462  return 0;
1463 }
1464 
1465 static int check_events (const short events_)
1466 {
1467  if (events_ & ~(ZMQ_POLLIN | ZMQ_POLLOUT | ZMQ_POLLERR | ZMQ_POLLPRI)) {
1468  errno = EINVAL;
1469  return -1;
1470  }
1471  return 0;
1472 }
1473 
1474 static int check_poller_registration_args (void *const poller_, void *const s_)
1475 {
1476  if (-1 == check_poller (poller_))
1477  return -1;
1478 
1479  if (!s_ || !(static_cast<zmq::socket_base_t *> (s_))->check_tag ()) {
1480  errno = ENOTSOCK;
1481  return -1;
1482  }
1483 
1484  return 0;
1485 }
1486 
1487 static int check_poller_fd_registration_args (void *const poller_,
1488  const zmq::fd_t fd_)
1489 {
1490  if (-1 == check_poller (poller_))
1491  return -1;
1492 
1493  if (fd_ == zmq::retired_fd) {
1494  errno = EBADF;
1495  return -1;
1496  }
1497 
1498  return 0;
1499 }
1500 
1501 int zmq_poller_size (void *poller_)
1502 {
1503  if (-1 == check_poller (poller_))
1504  return -1;
1505 
1506  return (static_cast<zmq::socket_poller_t *> (poller_))->size ();
1507 }
1508 
1509 int zmq_poller_add (void *poller_, void *s_, void *user_data_, short events_)
1510 {
1511  if (-1 == check_poller_registration_args (poller_, s_)
1512  || -1 == check_events (events_))
1513  return -1;
1514 
1515  zmq::socket_base_t *socket = static_cast<zmq::socket_base_t *> (s_);
1516 
1517  return (static_cast<zmq::socket_poller_t *> (poller_))
1518  ->add (socket, user_data_, events_);
1519 }
1520 
1521 int zmq_poller_add_fd (void *poller_,
1522  zmq::fd_t fd_,
1523  void *user_data_,
1524  short events_)
1525 {
1526  if (-1 == check_poller_fd_registration_args (poller_, fd_)
1527  || -1 == check_events (events_))
1528  return -1;
1529 
1530  return (static_cast<zmq::socket_poller_t *> (poller_))
1531  ->add_fd (fd_, user_data_, events_);
1532 }
1533 
1534 
1535 int zmq_poller_modify (void *poller_, void *s_, short events_)
1536 {
1537  if (-1 == check_poller_registration_args (poller_, s_)
1538  || -1 == check_events (events_))
1539  return -1;
1540 
1541  const zmq::socket_base_t *const socket =
1542  static_cast<const zmq::socket_base_t *> (s_);
1543 
1544  return (static_cast<zmq::socket_poller_t *> (poller_))
1545  ->modify (socket, events_);
1546 }
1547 
1548 int zmq_poller_modify_fd (void *poller_, zmq::fd_t fd_, short events_)
1549 {
1550  if (-1 == check_poller_fd_registration_args (poller_, fd_)
1551  || -1 == check_events (events_))
1552  return -1;
1553 
1554  return (static_cast<zmq::socket_poller_t *> (poller_))
1555  ->modify_fd (fd_, events_);
1556 }
1557 
1558 int zmq_poller_remove (void *poller_, void *s_)
1559 {
1560  if (-1 == check_poller_registration_args (poller_, s_))
1561  return -1;
1562 
1563  zmq::socket_base_t *socket = static_cast<zmq::socket_base_t *> (s_);
1564 
1565  return (static_cast<zmq::socket_poller_t *> (poller_))->remove (socket);
1566 }
1567 
1568 int zmq_poller_remove_fd (void *poller_, zmq::fd_t fd_)
1569 {
1570  if (-1 == check_poller_fd_registration_args (poller_, fd_))
1571  return -1;
1572 
1573  return (static_cast<zmq::socket_poller_t *> (poller_))->remove_fd (fd_);
1574 }
1575 
1576 int zmq_poller_wait (void *poller_, zmq_poller_event_t *event_, long timeout_)
1577 {
1578  const int rc = zmq_poller_wait_all (poller_, event_, 1, timeout_);
1579 
1580  if (rc < 0 && event_) {
1581  event_->socket = NULL;
1582  event_->fd = zmq::retired_fd;
1583  event_->user_data = NULL;
1584  event_->events = 0;
1585  }
1586  // wait_all returns number of events, but we return 0 for any success
1587  return rc >= 0 ? 0 : rc;
1588 }
1589 
1590 int zmq_poller_wait_all (void *poller_,
1591  zmq_poller_event_t *events_,
1592  int n_events_,
1593  long timeout_)
1594 {
1595  if (-1 == check_poller (poller_))
1596  return -1;
1597 
1598  if (!events_) {
1599  errno = EFAULT;
1600  return -1;
1601  }
1602  if (n_events_ < 0) {
1603  errno = EINVAL;
1604  return -1;
1605  }
1606 
1607  const int rc =
1608  (static_cast<zmq::socket_poller_t *> (poller_))
1609  ->wait (reinterpret_cast<zmq::socket_poller_t::event_t *> (events_),
1610  n_events_, timeout_);
1611 
1612  return rc;
1613 }
1614 
1615 int zmq_poller_fd (void *poller_, zmq_fd_t *fd_)
1616 {
1617  if (!poller_
1618  || !(static_cast<zmq::socket_poller_t *> (poller_)->check_tag ())) {
1619  errno = EFAULT;
1620  return -1;
1621  }
1622  return static_cast<zmq::socket_poller_t *> (poller_)->signaler_fd (fd_);
1623 }
1624 
1625 // Peer-specific state
1626 
1628  const void *routing_id_,
1629  size_t routing_id_size_)
1630 {
1631  const zmq::socket_base_t *const s = as_socket_base_t (s_);
1632  if (!s)
1633  return -1;
1634 
1635  return s->get_peer_state (routing_id_, routing_id_size_);
1636 }
1637 
1638 // Timers
1639 
1640 void *zmq_timers_new (void)
1641 {
1642  zmq::timers_t *timers = new (std::nothrow) zmq::timers_t;
1643  alloc_assert (timers);
1644  return timers;
1645 }
1646 
1647 int zmq_timers_destroy (void **timers_p_)
1648 {
1649  void *timers = *timers_p_;
1650  if (!timers || !(static_cast<zmq::timers_t *> (timers))->check_tag ()) {
1651  errno = EFAULT;
1652  return -1;
1653  }
1654  delete (static_cast<zmq::timers_t *> (timers));
1655  *timers_p_ = NULL;
1656  return 0;
1657 }
1658 
1659 int zmq_timers_add (void *timers_,
1660  size_t interval_,
1661  zmq_timer_fn handler_,
1662  void *arg_)
1663 {
1664  if (!timers_ || !(static_cast<zmq::timers_t *> (timers_))->check_tag ()) {
1665  errno = EFAULT;
1666  return -1;
1667  }
1668 
1669  return (static_cast<zmq::timers_t *> (timers_))
1670  ->add (interval_, handler_, arg_);
1671 }
1672 
1673 int zmq_timers_cancel (void *timers_, int timer_id_)
1674 {
1675  if (!timers_ || !(static_cast<zmq::timers_t *> (timers_))->check_tag ()) {
1676  errno = EFAULT;
1677  return -1;
1678  }
1679 
1680  return (static_cast<zmq::timers_t *> (timers_))->cancel (timer_id_);
1681 }
1682 
1683 int zmq_timers_set_interval (void *timers_, int timer_id_, size_t interval_)
1684 {
1685  if (!timers_ || !(static_cast<zmq::timers_t *> (timers_))->check_tag ()) {
1686  errno = EFAULT;
1687  return -1;
1688  }
1689 
1690  return (static_cast<zmq::timers_t *> (timers_))
1691  ->set_interval (timer_id_, interval_);
1692 }
1693 
1694 int zmq_timers_reset (void *timers_, int timer_id_)
1695 {
1696  if (!timers_ || !(static_cast<zmq::timers_t *> (timers_))->check_tag ()) {
1697  errno = EFAULT;
1698  return -1;
1699  }
1700 
1701  return (static_cast<zmq::timers_t *> (timers_))->reset (timer_id_);
1702 }
1703 
1704 long zmq_timers_timeout (void *timers_)
1705 {
1706  if (!timers_ || !(static_cast<zmq::timers_t *> (timers_))->check_tag ()) {
1707  errno = EFAULT;
1708  return -1;
1709  }
1710 
1711  return (static_cast<zmq::timers_t *> (timers_))->timeout ();
1712 }
1713 
1714 int zmq_timers_execute (void *timers_)
1715 {
1716  if (!timers_ || !(static_cast<zmq::timers_t *> (timers_))->check_tag ()) {
1717  errno = EFAULT;
1718  return -1;
1719  }
1720 
1721  return (static_cast<zmq::timers_t *> (timers_))->execute ();
1722 }
1723 
1724 // The proxy functionality
1725 
1726 int zmq_proxy (void *frontend_, void *backend_, void *capture_)
1727 {
1728  if (!frontend_ || !backend_) {
1729  errno = EFAULT;
1730  return -1;
1731  }
1732  // Runs zmq::proxy_steerable with a NULL control_.
1733  return zmq::proxy (static_cast<zmq::socket_base_t *> (frontend_),
1734  static_cast<zmq::socket_base_t *> (backend_),
1735  static_cast<zmq::socket_base_t *> (capture_));
1736 }
1737 
1738 int zmq_proxy_steerable (void *frontend_,
1739  void *backend_,
1740  void *capture_,
1741  void *control_)
1742 {
1743  if (!frontend_ || !backend_) {
1744  errno = EFAULT;
1745  return -1;
1746  }
1747  return zmq::proxy_steerable (static_cast<zmq::socket_base_t *> (frontend_),
1748  static_cast<zmq::socket_base_t *> (backend_),
1749  static_cast<zmq::socket_base_t *> (capture_),
1750  static_cast<zmq::socket_base_t *> (control_));
1751 }
1752 
1753 // The deprecated device functionality
1754 
1755 int zmq_device (int /* type */, void *frontend_, void *backend_)
1756 {
1757  return zmq::proxy (static_cast<zmq::socket_base_t *> (frontend_),
1758  static_cast<zmq::socket_base_t *> (backend_), NULL);
1759 }
1760 
1761 // Probe library capabilities; for now, reports on transport and security
1762 
1763 int zmq_has (const char *capability_)
1764 {
1765 #if defined(ZMQ_HAVE_IPC)
1766  if (strcmp (capability_, zmq::protocol_name::ipc) == 0)
1767  return true;
1768 #endif
1769 #if defined(ZMQ_HAVE_OPENPGM)
1770  if (strcmp (capability_, zmq::protocol_name::pgm) == 0)
1771  return true;
1772 #endif
1773 #if defined(ZMQ_HAVE_TIPC)
1774  if (strcmp (capability_, zmq::protocol_name::tipc) == 0)
1775  return true;
1776 #endif
1777 #if defined(ZMQ_HAVE_NORM)
1778  if (strcmp (capability_, zmq::protocol_name::norm) == 0)
1779  return true;
1780 #endif
1781 #if defined(ZMQ_HAVE_CURVE)
1782  if (strcmp (capability_, "curve") == 0)
1783  return true;
1784 #endif
1785 #if defined(HAVE_LIBGSSAPI_KRB5)
1786  if (strcmp (capability_, "gssapi") == 0)
1787  return true;
1788 #endif
1789 #if defined(ZMQ_HAVE_VMCI)
1790  if (strcmp (capability_, zmq::protocol_name::vmci) == 0)
1791  return true;
1792 #endif
1793 #if defined(ZMQ_BUILD_DRAFT_API)
1794  if (strcmp (capability_, "draft") == 0)
1795  return true;
1796 #endif
1797 #if defined(ZMQ_HAVE_WS)
1798  if (strcmp (capability_, "WS") == 0)
1799  return true;
1800 #endif
1801 #if defined(ZMQ_HAVE_WSS)
1802  if (strcmp (capability_, "WSS") == 0)
1803  return true;
1804 #endif
1805  // Whatever the application asked for, we don't have
1806  return false;
1807 }
1808 
1810 {
1812  if (!s)
1813  return -1;
1814  return s->query_pipes_stats ();
1815 }
zmq_msg_group
const char * zmq_msg_group(zmq_msg_t *msg_)
Definition: zmq.cpp:704
ZMQ_VERSION_PATCH
#define ZMQ_VERSION_PATCH
Definition: zmq.h:17
zmq_recvmsg
int zmq_recvmsg(void *s_, zmq_msg_t *msg_, int flags_)
Definition: zmq.cpp:481
ip.hpp
data_
StringPiece data_
Definition: bytestream_unittest.cc:60
zmq_proxy_steerable
int zmq_proxy_steerable(void *frontend_, void *backend_, void *capture_, void *control_)
Definition: zmq.cpp:1738
ZMQ_SHARED
#define ZMQ_SHARED
Definition: zmq.h:355
a_
int a_
Definition: common_unittest.cc:193
zmq::proxy
void proxy(void *frontend, void *backend, void *capture)
Definition: zmq.hpp:2274
zmq_send_const
int zmq_send_const(void *s_, const void *buf_, size_t len_, int flags_)
Definition: zmq.cpp:400
end
GLuint GLuint end
Definition: glcorearb.h:2858
zmq_poller_poll
static int zmq_poller_poll(zmq_pollitem_t *items_, int nitems_, long timeout_)
Definition: zmq.cpp:728
ENOTSUP
#define ENOTSUP
Definition: zmq.h:104
zmq_msg_set_group
int zmq_msg_set_group(zmq_msg_t *msg_, const char *group_)
Definition: zmq.cpp:699
zmq_pollitem_t::fd
zmq_fd_t fd
Definition: zmq.h:490
zmq::socket_poller_t
Definition: socket_poller.hpp:30
NULL
NULL
Definition: test_security_zap.cpp:405
EINTR
#define EINTR
Definition: errno.hpp:7
config.hpp
ZMQ_EVENTS
#define ZMQ_EVENTS
Definition: zmq.h:286
zmq_timers_execute
int zmq_timers_execute(void *timers_)
Definition: zmq.cpp:1714
zmq::retired_fd
@ retired_fd
Definition: fd.hpp:32
zmq_pollitem_t::events
short events
Definition: zmq.h:491
zmq_msg_init
int zmq_msg_init(zmq_msg_t *msg_)
Definition: zmq.cpp:587
EINVAL
#define EINVAL
Definition: errno.hpp:25
zmq_connect_peer
uint32_t zmq_connect_peer(void *s_, const char *addr_)
Definition: zmq.cpp:315
zmq_send
int zmq_send(void *s_, const void *buf_, size_t len_, int flags_)
Definition: zmq.cpp:377
zmq_disconnect
int zmq_disconnect(void *s_, const char *addr_)
Definition: zmq.cpp:345
s
XmlRpcServer s
check_poller
static int check_poller(void *const poller_)
Definition: zmq.cpp:1454
benchmarks.util.result_uploader.metadata
def metadata
Definition: result_uploader.py:97
EAGAIN
#define EAGAIN
Definition: errno.hpp:14
zmq_term
int zmq_term(void *ctx_)
Definition: zmq.cpp:207
zmq_msg_more
int zmq_msg_more(const zmq_msg_t *msg_)
Definition: zmq.cpp:652
ZMQ_TYPE
#define ZMQ_TYPE
Definition: zmq.h:287
zmq_timers_timeout
long zmq_timers_timeout(void *timers_)
Definition: zmq.cpp:1704
precompiled.hpp
zmq_assert
#define zmq_assert(x)
Definition: err.hpp:102
ZMQ_POLLPRI
#define ZMQ_POLLPRI
Definition: zmq.h:485
zmq_poller_remove_fd
int zmq_poller_remove_fd(void *poller_, zmq::fd_t fd_)
Definition: zmq.cpp:1568
zmq_msg_init_data
int zmq_msg_init_data(zmq_msg_t *msg_, void *data_, size_t size_, zmq_free_fn *ffn_, void *hint_)
Definition: zmq.cpp:602
polling_util.hpp
zmq_sendmsg
int zmq_sendmsg(void *s_, zmq_msg_t *msg_, int flags_)
Definition: zmq.cpp:372
zmq_socket
void * zmq_socket(void *ctx_, int type_)
Definition: zmq.cpp:230
zmq_ctx_new
void * zmq_ctx_new(void)
Definition: zmq.cpp:109
zmq_ctx_destroy
int zmq_ctx_destroy(void *ctx_)
Definition: zmq.cpp:212
zmq_timers_add
int zmq_timers_add(void *timers_, size_t interval_, zmq_timer_fn handler_, void *arg_)
Definition: zmq.cpp:1659
string
GLsizei const GLchar *const * string
Definition: glcorearb.h:3083
zmq_poller_fd
int zmq_poller_fd(void *poller_, zmq_fd_t *fd_)
Definition: zmq.cpp:1615
errno
int errno
clock.hpp
zmq_ctx_get_ext
int zmq_ctx_get_ext(void *ctx_, int option_, void *optval_, size_t *optvallen_)
Definition: zmq.cpp:183
zmq_device
int zmq_device(int, void *frontend_, void *backend_)
Definition: zmq.cpp:1755
zmq_ctx_shutdown
int zmq_ctx_shutdown(void *ctx_)
Definition: zmq.cpp:147
zmq_pollitem_t
Definition: zmq.h:487
zmq_msg_init_buffer
int zmq_msg_init_buffer(zmq_msg_t *msg_, const void *buf_, size_t size_)
Definition: zmq.cpp:597
zmq::socket_base_t
Definition: socket_base.hpp:31
ZMQ_MORE
#define ZMQ_MORE
Definition: zmq.h:354
zmq_msg_copy
int zmq_msg_copy(zmq_msg_t *dest_, zmq_msg_t *src_)
Definition: zmq.cpp:636
check_poller_registration_args
static int check_poller_registration_args(void *const poller_, void *const s_)
Definition: zmq.cpp:1474
zmq_has
int zmq_has(const char *capability_)
Definition: zmq.cpp:1763
zmq::fast_vector_t
Definition: polling_util.hpp:22
flags
GLbitfield flags
Definition: glcorearb.h:3585
zmq_timers_destroy
int zmq_timers_destroy(void **timers_p_)
Definition: zmq.cpp:1647
zmq::fd_t
int fd_t
Definition: zmq.hpp:287
zmq_poller_modify_fd
int zmq_poller_modify_fd(void *poller_, zmq::fd_t fd_, short events_)
Definition: zmq.cpp:1548
zmq_socket_get_peer_state
int zmq_socket_get_peer_state(void *s_, const void *routing_id_, size_t routing_id_size_)
Definition: zmq.cpp:1627
ZMQ_POLLIN
#define ZMQ_POLLIN
Definition: zmq.h:482
ctx.hpp
check_msg_t_size
char check_msg_t_size[sizeof(zmq::msg_t)==sizeof(zmq_msg_t) ? 1 :-1]
Definition: zmq.cpp:85
zmq_poller_event_t::events
short events
Definition: zmq_draft.h:119
zmq_ppoll
int zmq_ppoll(zmq_pollitem_t *items_, int nitems_, long timeout_, const sigset_t *sigmask_)
Definition: zmq.cpp:1355
zmq_msg_close
int zmq_msg_close(zmq_msg_t *msg_)
Definition: zmq.cpp:625
s_sendmsg
static int s_sendmsg(zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_)
Definition: zmq.cpp:356
ZMQ_POLLOUT
#define ZMQ_POLLOUT
Definition: zmq.h:483
zmq_version
void zmq_version(int *major_, int *minor_, int *patch_)
Definition: zmq.cpp:88
zmq_poller_event_t::socket
void * socket
Definition: zmq_draft.h:116
socket_poller.hpp
alloc_assert
#define alloc_assert(x)
Definition: err.hpp:146
errno_assert
#define errno_assert(x)
Definition: err.hpp:113
zmq_poller_modify
int zmq_poller_modify(void *poller_, void *s_, short events_)
Definition: zmq.cpp:1535
peer.hpp
zmq_msg_init_size
int zmq_msg_init_size(zmq_msg_t *msg_, size_t size_)
Definition: zmq.cpp:592
zmq::shutdown_network
void shutdown_network()
Definition: ip.cpp:305
macros.hpp
zmq::msg_t::flags
unsigned char flags
Definition: msg.hpp:232
stdint.hpp
poller.hpp
ZMQ_SRCFD
#define ZMQ_SRCFD
Definition: zmq.h:384
zmq_poller_event_t
Definition: zmq_draft.h:114
zmq_poller_new
void * zmq_poller_new(void)
Definition: zmq.cpp:1429
zmq_socket_monitor
int zmq_socket_monitor(void *s_, const char *addr_, int events_)
Definition: zmq.cpp:278
zmq_timer_fn
void() zmq_timer_fn(int timer_id, void *arg)
Definition: zmq.h:568
timers.hpp
zmq_timers_set_interval
int zmq_timers_set_interval(void *timers_, int timer_id_, size_t interval_)
Definition: zmq.cpp:1683
zmq::msg_t::shared
@ shared
Definition: msg.hpp:66
zmq_msg_t
Definition: zmq.h:218
s_
std::string s_
Definition: gmock-matchers_test.cc:4128
zmq_poller_wait
int zmq_poller_wait(void *poller_, zmq_poller_event_t *event_, long timeout_)
Definition: zmq.cpp:1576
EBADF
#define EBADF
Definition: errno.hpp:12
zmq_ctx_set_ext
int zmq_ctx_set_ext(void *ctx_, int option_, const void *optval_, size_t optvallen_)
Definition: zmq.cpp:161
zmq_close
int zmq_close(void *s_)
Definition: zmq.cpp:241
zmq_pollitem_t::revents
short revents
Definition: zmq.h:492
err
static UPB_NORETURN void err(tarjan *t)
Definition: ruby/ext/google/protobuf_c/upb.c:5856
ZMQ_POLLERR
#define ZMQ_POLLERR
Definition: zmq.h:484
zmq::detail::poll
int poll(zmq_pollitem_t *items_, size_t nitems_, long timeout_)
Definition: zmq.hpp:306
zmq_socket_monitor_versioned
int zmq_socket_monitor_versioned(void *s_, const char *addr_, uint64_t events_, int event_version_, int type_)
Definition: zmq.cpp:269
zmq_proxy
int zmq_proxy(void *frontend_, void *backend_, void *capture_)
Definition: zmq.cpp:1726
zmq_getsockopt
int zmq_getsockopt(void *s_, int option_, void *optval_, size_t *optvallen_)
Definition: zmq.cpp:261
timeout
GLbitfield GLuint64 timeout
Definition: glcorearb.h:3588
zmq_msg_set
int zmq_msg_set(zmq_msg_t *, int, int)
Definition: zmq.cpp:681
fd.hpp
ENOTSOCK
#define ENOTSOCK
Definition: zmq.h:128
zmq_fd_t
int zmq_fd_t
Definition: zmq.h:475
ZMQ_VERSION_MAJOR
#define ZMQ_VERSION_MAJOR
Definition: zmq.h:15
address.hpp
FD_SETSIZE
#define FD_SETSIZE
Definition: deprecated-msvc/vs2015_xp/platform.hpp:10
zmq_msg_get
int zmq_msg_get(const zmq_msg_t *msg_, int property_)
Definition: zmq.cpp:657
zmq_msg_size
size_t zmq_msg_size(const zmq_msg_t *msg_)
Definition: zmq.cpp:647
zmq_leave
int zmq_leave(void *s_, const char *group_)
Definition: zmq.cpp:291
zmq::proxy_steerable
void proxy_steerable(void *frontend, void *backend, void *capture, void *control)
Definition: zmq.hpp:2292
ZMQ_PEER
#define ZMQ_PEER
Definition: zmq_draft.h:21
zmq_free_fn
void() zmq_free_fn(void *data_, void *hint_)
Definition: zmq.h:234
i
int i
Definition: gmock-matchers_test.cc:764
zmq_msg_set_routing_id
int zmq_msg_set_routing_id(zmq_msg_t *msg_, uint32_t routing_id_)
Definition: zmq.cpp:688
msg.hpp
zmq_join
int zmq_join(void *s_, const char *group_)
Definition: zmq.cpp:283
zmq_socket_monitor_pipes_stats
int zmq_socket_monitor_pipes_stats(void *s_)
Definition: zmq.cpp:1809
zmq_ctx_get
int zmq_ctx_get(void *ctx_, int option_)
Definition: zmq.cpp:174
zmq::metadata_t
Definition: metadata.hpp:13
zmq_msg_send
int zmq_msg_send(zmq_msg_t *msg_, void *s_, int flags_)
Definition: zmq.cpp:609
socket_type
int socket_type
iovec
Definition: zmq.cpp:45
zmq_init
void * zmq_init(int io_threads_)
Definition: zmq.cpp:196
zmq_msg_recv
int zmq_msg_recv(zmq_msg_t *msg_, void *s_, int flags_)
Definition: zmq.cpp:617
zmq::msg_t::more
@ more
Definition: msg.hpp:55
metadata.hpp
zmq_bind
int zmq_bind(void *s_, const char *addr_)
Definition: zmq.cpp:299
ZMQ_PAIR
#define ZMQ_PAIR
Definition: zmq.h:258
iovec::iov_len
size_t iov_len
Definition: zmq.cpp:48
zmq::initialize_network
bool initialize_network()
Definition: ip.cpp:264
zmq_poller_size
int zmq_poller_size(void *poller_)
Definition: zmq.cpp:1501
zmq::socket_poller_t::check_tag
bool check_tag() const
Definition: socket_poller.cpp:69
zmq_poller_add
int zmq_poller_add(void *poller_, void *s_, void *user_data_, short events_)
Definition: zmq.cpp:1509
zmq_timers_reset
int zmq_timers_reset(void *timers_, int timer_id_)
Definition: zmq.cpp:1694
socket_base.hpp
zmq::clock_t
Definition: clock.hpp:27
zmq_msg_move
int zmq_msg_move(zmq_msg_t *dest_, zmq_msg_t *src_)
Definition: zmq.cpp:630
zmq_timers_new
void * zmq_timers_new(void)
Definition: zmq.cpp:1640
err.hpp
zmq_recv
int zmq_recv(void *s_, void *buf_, size_t len_, int flags_)
Definition: zmq.cpp:487
zmq::clock_t::now_ms
uint64_t now_ms()
Definition: clock.cpp:181
ZMQ_IO_THREADS
#define ZMQ_IO_THREADS
Definition: zmq.h:181
check_poller_fd_registration_args
static int check_poller_fd_registration_args(void *const poller_, const zmq::fd_t fd_)
Definition: zmq.cpp:1487
ZMQ_SNDMORE
#define ZMQ_SNDMORE
Definition: zmq.h:359
likely.hpp
zmq_msg_routing_id
uint32_t zmq_msg_routing_id(zmq_msg_t *msg_)
Definition: zmq.cpp:694
check_events
static int check_events(const short events_)
Definition: zmq.cpp:1465
zmq_ctx_term
int zmq_ctx_term(void *ctx_)
Definition: zmq.cpp:128
zmq_poller_remove
int zmq_poller_remove(void *poller_, void *s_)
Definition: zmq.cpp:1558
zmq_unbind
int zmq_unbind(void *s_, const char *addr_)
Definition: zmq.cpp:337
zmq_strerror
const char * zmq_strerror(int errnum_)
Definition: zmq.cpp:96
zmq_msg_gets
const char * zmq_msg_gets(const zmq_msg_t *msg_, const char *property_)
Definition: zmq.cpp:711
ZMQ_FD
#define ZMQ_FD
Definition: zmq.h:285
zmq_setsockopt
int zmq_setsockopt(void *s_, int option_, const void *optval_, size_t optvallen_)
Definition: zmq.cpp:250
zmq_poller_add_fd
int zmq_poller_add_fd(void *poller_, zmq::fd_t fd_, void *user_data_, short events_)
Definition: zmq.cpp:1521
value
GLsizei const GLfloat * value
Definition: glcorearb.h:3093
zmq::errno_to_string
const char * errno_to_string(int errno_)
Definition: err.cpp:7
iovec::iov_base
void * iov_base
Definition: zmq.cpp:47
zmq::timers_t
Definition: timers.hpp:16
zmq_sendiov
int zmq_sendiov(void *s_, iovec *a_, size_t count_, int flags_)
Definition: zmq.cpp:432
zmq_poller_event_t::fd
zmq_fd_t fd
Definition: zmq_draft.h:117
EFAULT
#define EFAULT
Definition: errno.hpp:17
count
GLint GLsizei count
Definition: glcorearb.h:2830
zmq_connect
int zmq_connect(void *s_, const char *addr_)
Definition: zmq.cpp:307
zmq_errno
int zmq_errno(void)
Definition: zmq.cpp:101
proxy.hpp
zmq_msg_data
void * zmq_msg_data(zmq_msg_t *msg_)
Definition: zmq.cpp:642
zmq_poller_event_t::user_data
void * user_data
Definition: zmq_draft.h:118
zmq_ctx_set
int zmq_ctx_set(void *ctx_, int option_, int optval_)
Definition: zmq.cpp:156
s_recvmsg
static int s_recvmsg(zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_)
Definition: zmq.cpp:469
as_socket_base_t
static zmq::socket_base_t * as_socket_base_t(void *s_)
Definition: zmq.cpp:220
ZMQ_VERSION_MINOR
#define ZMQ_VERSION_MINOR
Definition: zmq.h:16
zmq_poller_destroy
int zmq_poller_destroy(void **poller_p_)
Definition: zmq.cpp:1438
zmq::msg_t
Definition: msg.hpp:33
zmq_recviov
int zmq_recviov(void *s_, iovec *a_, size_t *count_, int flags_)
Definition: zmq.cpp:535
zmq_poller_wait_all
int zmq_poller_wait_all(void *poller_, zmq_poller_event_t *events_, int n_events_, long timeout_)
Definition: zmq.cpp:1590
unlikely
#define unlikely(x)
Definition: likely.hpp:11
zmq_timers_cancel
int zmq_timers_cancel(void *timers_, int timer_id_)
Definition: zmq.cpp:1673
zmq_poll
int zmq_poll(zmq_pollitem_t *items_, int nitems_, long timeout_)
Definition: zmq.cpp:827


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:07:02