12 #define ZMQ_TYPE_UNSAFE
18 #if !defined ZMQ_HAVE_POLLER
23 #if defined ZMQ_POLL_BASED_ON_POLL && !defined ZMQ_HAVE_WINDOWS
34 #if !defined ZMQ_HAVE_WINDOWS
36 #ifdef ZMQ_HAVE_VXWORKS
42 #if defined ZMQ_HAVE_UIO
75 #include <sys/select.h>
78 #if defined ZMQ_HAVE_OPENPGM
79 #define __PGM_WININT_H__
118 zmq::ctx_t *ctx =
new (std::nothrow) zmq::ctx_t;
120 if (!ctx->valid ()) {
130 if (!ctx_ || !(
static_cast<zmq::ctx_t *
> (ctx_))->check_tag ()) {
135 const int rc = (
static_cast<zmq::ctx_t *
> (ctx_))->terminate ();
136 const int en =
errno;
139 if (!rc || en !=
EINTR) {
149 if (!ctx_ || !(
static_cast<zmq::ctx_t *
> (ctx_))->check_tag ()) {
153 return (
static_cast<zmq::ctx_t *
> (ctx_))->shutdown ();
166 if (!ctx_ || !(
static_cast<zmq::ctx_t *
> (ctx_))->check_tag ()) {
170 return (
static_cast<zmq::ctx_t *
> (ctx_))
171 ->set (option_, optval_, optvallen_);
176 if (!ctx_ || !(
static_cast<zmq::ctx_t *
> (ctx_))->check_tag ()) {
180 return (
static_cast<zmq::ctx_t *
> (ctx_))->get (option_);
185 if (!ctx_ || !(
static_cast<zmq::ctx_t *
> (ctx_))->check_tag ()) {
189 return (
static_cast<zmq::ctx_t *
> (ctx_))
190 ->get (option_, optval_, optvallen_);
198 if (io_threads_ >= 0) {
223 if (!
s_ || !
s->check_tag ()) {
232 if (!ctx_ || !(
static_cast<zmq::ctx_t *
> (ctx_))->check_tag ()) {
236 zmq::ctx_t *ctx =
static_cast<zmq::ctx_t *
> (ctx_);
238 return static_cast<void *
> (
s);
258 return s->setsockopt (option_, optval_, optvallen_);
266 return s->getsockopt (option_, optval_, optvallen_);
270 void *
s_,
const char *addr_, uint64_t events_,
int event_version_,
int type_)
275 return s->monitor (addr_, events_, event_version_, type_);
288 return s->join (group_);
296 return s->leave (group_);
304 return s->bind (addr_);
312 return s->connect (addr_);
317 zmq::peer_t *
s =
static_cast<zmq::peer_t *
> (
s_);
318 if (!
s_ || !
s->check_tag ()) {
333 return s->connect_peer (addr_);
342 return s->term_endpoint (addr_);
350 return s->term_endpoint (addr_);
359 const int rc =
s_->send (
reinterpret_cast<zmq::msg_t *
> (msg_), flags_);
365 size_t max_msgsz = INT_MAX;
368 return static_cast<int> (sz < max_msgsz ? sz : max_msgsz);
377 int zmq_send (
void *
s_,
const void *buf_,
size_t len_,
int flags_)
445 for (
size_t i = 0;
i < count_; ++
i) {
471 const int rc =
s_->recv (
reinterpret_cast<zmq::msg_t *
> (msg_), flags_);
477 return static_cast<int> (sz < INT_MAX ? sz : INT_MAX);
496 const int nbytes =
s_recvmsg (
s, &msg, flags_);
506 const size_t to_copy = size_t (nbytes) < len_ ? size_t (nbytes) : len_;
540 if (
unlikely (!count_ || *count_ <= 0 || !
a_)) {
545 const size_t count = *count_;
547 bool recvmore =
true;
551 for (
size_t i = 0; recvmore &&
i <
count; ++
i) {
556 const int nbytes =
s_recvmsg (
s, &msg, flags_);
567 a_[
i].iov_base =
static_cast<char *
> (malloc (
a_[
i].iov_len));
589 return (
reinterpret_cast<zmq::msg_t *
> (msg_))->init ();
594 return (
reinterpret_cast<zmq::msg_t *
> (msg_))->init_size (size_);
599 return (
reinterpret_cast<zmq::msg_t *
> (msg_))->init_buffer (buf_, size_);
605 return (
reinterpret_cast<zmq::msg_t *
> (msg_))
606 ->init_data (
data_, size_, ffn_, hint_);
627 return (
reinterpret_cast<zmq::msg_t *
> (msg_))->close ();
632 return (
reinterpret_cast<zmq::msg_t *
> (dest_))
633 ->move (*
reinterpret_cast<zmq::msg_t *
> (src_));
638 return (
reinterpret_cast<zmq::msg_t *
> (dest_))
639 ->copy (*
reinterpret_cast<zmq::msg_t *
> (src_));
644 return (
reinterpret_cast<zmq::msg_t *
> (msg_))->data ();
659 const char *fd_string;
666 if (fd_string ==
NULL)
669 return atoi (fd_string);
690 return (
reinterpret_cast<zmq::msg_t *
> (msg_))
691 ->set_routing_id (routing_id_);
696 return (
reinterpret_cast<zmq::msg_t *
> (msg_))->get_routing_id ();
701 return (
reinterpret_cast<zmq::msg_t *
> (msg_))->set_group (group_);
706 return (
reinterpret_cast<zmq::msg_t *
> (msg_))->group ();
727 #if defined ZMQ_HAVE_POLLER
737 bool repeat_items =
false;
739 for (
int i = 0;
i < nitems_;
i++) {
744 if (items_[
i].socket) {
746 for (
int j = 0; j <
i; ++j) {
748 if (items_[j].socket == items_[
i].socket) {
765 for (
int j = 0; j <
i; ++j) {
767 if (!items_[j].socket && items_[j].fd == items_[
i].fd) {
801 int j_start = 0, found_events = rc;
802 for (
int i = 0;
i < nitems_;
i++) {
803 for (
int j = j_start; j < found_events; ++j) {
804 if ((items_[
i].socket && items_[
i].socket == events[j].socket)
805 || (!(items_[
i].socket || events[j].socket)
806 && items_[
i].fd == events[j].fd)) {
825 #endif // ZMQ_HAVE_POLLER
829 #if defined ZMQ_HAVE_POLLER
832 for (
int i = 0;
i != nitems_;
i++) {
833 if (items_[
i].socket) {
836 if (
s->is_thread_safe ())
844 #endif // ZMQ_HAVE_POLLER
845 #if defined ZMQ_POLL_BASED_ON_POLL || defined ZMQ_POLL_BASED_ON_SELECT
853 #if defined ZMQ_HAVE_WINDOWS
854 Sleep (timeout_ > 0 ? timeout_ : INFINITE);
856 #elif defined ZMQ_HAVE_VXWORKS
858 ns_.tv_sec = timeout_ / 1000;
859 ns_.tv_nsec = timeout_ % 1000 * 1000000;
860 return nanosleep (&ns_, 0);
862 return usleep (timeout_ * 1000);
873 #if defined ZMQ_POLL_BASED_ON_POLL
877 for (
int i = 0;
i != nitems_;
i++) {
880 if (items_[
i].socket) {
887 pollfds[
i].events = items_[
i].
events ? POLLIN : 0;
892 pollfds[
i].fd = items_[
i].
fd;
905 zmq::optimized_fd_set_t pollset_in (nitems_);
906 FD_ZERO (pollset_in.get ());
907 zmq::optimized_fd_set_t pollset_out (nitems_);
908 FD_ZERO (pollset_out.get ());
909 zmq::optimized_fd_set_t pollset_err (nitems_);
910 FD_ZERO (pollset_err.get ());
915 for (
int i = 0;
i != nitems_;
i++) {
918 if (items_[
i].socket) {
925 if (items_[
i].events) {
926 FD_SET (notify_fd, pollset_in.get ());
927 if (maxfd < notify_fd)
935 FD_SET (items_[
i].fd, pollset_in.get ());
937 FD_SET (items_[
i].fd, pollset_out.get ());
939 FD_SET (items_[
i].fd, pollset_err.get ());
940 if (maxfd < items_[
i].fd)
941 maxfd = items_[
i].
fd;
945 zmq::optimized_fd_set_t inset (nitems_);
946 zmq::optimized_fd_set_t outset (nitems_);
947 zmq::optimized_fd_set_t errset (nitems_);
950 bool first_pass =
true;
954 #if defined ZMQ_POLL_BASED_ON_POLL
958 zmq::compute_timeout (first_pass, timeout_, now,
end);
962 const int rc =
poll (&pollfds[0], nitems_,
timeout);
969 for (
int i = 0;
i != nitems_;
i++) {
974 if (items_[
i].socket) {
975 size_t zmq_events_size =
sizeof (uint32_t);
992 if (pollfds[
i].revents & POLLIN)
994 if (pollfds[
i].revents & POLLOUT)
996 if (pollfds[
i].revents & POLLPRI)
998 if (pollfds[
i].revents & ~(POLLIN | POLLOUT | POLLPRI))
1002 if (items_[
i].revents)
1015 }
else if (timeout_ < 0)
1018 timeout.tv_sec =
static_cast<long> ((
end - now) / 1000);
1019 timeout.tv_usec =
static_cast<long> ((
end - now) % 1000 * 1000);
1025 memcpy (inset.get (), pollset_in.get (),
1026 zmq::valid_pollset_bytes (*pollset_in.get ()));
1027 memcpy (outset.get (), pollset_out.get (),
1028 zmq::valid_pollset_bytes (*pollset_out.get ()));
1029 memcpy (errset.get (), pollset_err.get (),
1030 zmq::valid_pollset_bytes (*pollset_err.get ()));
1031 #if defined ZMQ_HAVE_WINDOWS
1033 select (0, inset.get (), outset.get (), errset.get (), ptimeout);
1034 if (
unlikely (rc == SOCKET_ERROR)) {
1035 errno = zmq::wsa_error_to_errno (WSAGetLastError ());
1040 int rc = select (maxfd + 1, inset.get (), outset.get (),
1041 errset.get (), ptimeout);
1051 for (
int i = 0;
i != nitems_;
i++) {
1056 if (items_[
i].socket) {
1057 size_t zmq_events_size =
sizeof (uint32_t);
1058 uint32_t zmq_events;
1073 if (FD_ISSET (items_[
i].fd, inset.get ()))
1075 if (FD_ISSET (items_[
i].fd, outset.get ()))
1077 if (FD_ISSET (items_[
i].fd, errset.get ()))
1081 if (items_[
i].revents)
1108 end = now + timeout_;
1129 #ifdef ZMQ_HAVE_PPOLL
1131 int zmq_poll_check_items_ (
zmq_pollitem_t *items_,
int nitems_,
long timeout_)
1140 #if defined ZMQ_HAVE_WINDOWS
1141 Sleep (timeout_ > 0 ? timeout_ : INFINITE);
1143 #elif defined ZMQ_HAVE_VXWORKS
1144 struct timespec ns_;
1145 ns_.tv_sec = timeout_ / 1000;
1146 ns_.tv_nsec = timeout_ % 1000 * 1000000;
1147 return nanosleep (&ns_, 0);
1149 return usleep (timeout_ * 1000);
1159 struct zmq_poll_select_fds_t_
1161 explicit zmq_poll_select_fds_t_ (
int nitems_) :
1162 pollset_in (nitems_),
1163 pollset_out (nitems_),
1164 pollset_err (nitems_),
1170 FD_ZERO (pollset_in.get ());
1171 FD_ZERO (pollset_out.get ());
1172 FD_ZERO (pollset_err.get ());
1175 zmq::optimized_fd_set_t pollset_in;
1176 zmq::optimized_fd_set_t pollset_out;
1177 zmq::optimized_fd_set_t pollset_err;
1178 zmq::optimized_fd_set_t inset;
1179 zmq::optimized_fd_set_t outset;
1180 zmq::optimized_fd_set_t errset;
1184 zmq_poll_select_fds_t_
1185 zmq_poll_build_select_fds_ (
zmq_pollitem_t *items_,
int nitems_,
int &rc)
1192 zmq_poll_select_fds_t_ fds (nitems_);
1195 for (
int i = 0;
i != nitems_;
i++) {
1198 if (items_[
i].socket) {
1199 size_t zmq_fd_size =
sizeof (
zmq::fd_t);
1207 if (items_[
i].events) {
1208 FD_SET (notify_fd, fds.pollset_in.get ());
1209 if (fds.maxfd < notify_fd)
1210 fds.maxfd = notify_fd;
1217 FD_SET (items_[
i].fd, fds.pollset_in.get ());
1219 FD_SET (items_[
i].fd, fds.pollset_out.get ());
1221 FD_SET (items_[
i].fd, fds.pollset_err.get ());
1222 if (fds.maxfd < items_[
i].fd)
1223 fds.maxfd = items_[
i].
fd;
1231 timeval *zmq_poll_select_set_timeout_ (
1232 long timeout_,
bool first_pass, uint64_t now, uint64_t
end, timeval &
timeout)
1239 }
else if (timeout_ < 0)
1242 timeout.tv_sec =
static_cast<long> ((
end - now) / 1000);
1243 timeout.tv_usec =
static_cast<long> ((
end - now) % 1000 * 1000);
1249 timespec *zmq_poll_select_set_timeout_ (
1250 long timeout_,
bool first_pass, uint64_t now, uint64_t
end, timespec &
timeout)
1257 }
else if (timeout_ < 0)
1260 timeout.tv_sec =
static_cast<long> ((
end - now) / 1000);
1261 timeout.tv_nsec =
static_cast<long> ((
end - now) % 1000 * 1000000);
1269 zmq_poll_select_fds_t_ &fds,
1273 for (
int i = 0;
i != nitems_;
i++) {
1278 if (items_[
i].socket) {
1279 size_t zmq_events_size =
sizeof (uint32_t);
1280 uint32_t zmq_events;
1293 if (FD_ISSET (items_[
i].fd, fds.inset.get ()))
1295 if (FD_ISSET (items_[
i].fd, fds.outset.get ()))
1297 if (FD_ISSET (items_[
i].fd, fds.errset.get ()))
1301 if (items_[
i].revents)
1308 bool zmq_poll_must_break_loop_ (
long timeout_,
1337 end = now + timeout_;
1352 #endif // ZMQ_HAVE_PPOLL
1358 const sigset_t *sigmask_)
1364 const void *sigmask_)
1367 #ifdef ZMQ_HAVE_PPOLL
1368 int rc = zmq_poll_check_items_ (items_, nitems_, timeout_);
1376 zmq_poll_select_fds_t_ fds =
1377 zmq_poll_build_select_fds_ (items_, nitems_, rc);
1382 bool first_pass =
true;
1388 timespec *ptimeout = zmq_poll_select_set_timeout_ (timeout_, first_pass,
1393 memcpy (fds.inset.get (), fds.pollset_in.get (),
1394 zmq::valid_pollset_bytes (*fds.pollset_in.get ()));
1395 memcpy (fds.outset.get (), fds.pollset_out.get (),
1396 zmq::valid_pollset_bytes (*fds.pollset_out.get ()));
1397 memcpy (fds.errset.get (), fds.pollset_err.get (),
1398 zmq::valid_pollset_bytes (*fds.pollset_err.get ()));
1400 pselect (fds.maxfd + 1, fds.inset.get (), fds.outset.get (),
1401 fds.errset.get (), ptimeout, sigmask_);
1409 rc = zmq_poll_select_check_events_ (items_, nitems_, fds, nevents);
1414 if (zmq_poll_must_break_loop_ (timeout_, nevents, first_pass, clock,
1424 #endif // ZMQ_HAVE_PPOLL
1518 ->add (socket, user_data_, events_);
1531 ->add_fd (fd_, user_data_, events_);
1545 ->modify (socket, events_);
1555 ->modify_fd (fd_, events_);
1580 if (rc < 0 && event_) {
1587 return rc >= 0 ? 0 : rc;
1602 if (n_events_ < 0) {
1610 n_events_, timeout_);
1628 const void *routing_id_,
1629 size_t routing_id_size_)
1635 return s->get_peer_state (routing_id_, routing_id_size_);
1649 void *timers = *timers_p_;
1650 if (!timers || !(
static_cast<zmq::timers_t *
> (timers))->check_tag ()) {
1664 if (!timers_ || !(
static_cast<zmq::timers_t *
> (timers_))->check_tag ()) {
1670 ->add (interval_, handler_, arg_);
1675 if (!timers_ || !(
static_cast<zmq::timers_t *
> (timers_))->check_tag ()) {
1680 return (
static_cast<zmq::timers_t *
> (timers_))->cancel (timer_id_);
1685 if (!timers_ || !(
static_cast<zmq::timers_t *
> (timers_))->check_tag ()) {
1691 ->set_interval (timer_id_, interval_);
1696 if (!timers_ || !(
static_cast<zmq::timers_t *
> (timers_))->check_tag ()) {
1701 return (
static_cast<zmq::timers_t *
> (timers_))->reset (timer_id_);
1706 if (!timers_ || !(
static_cast<zmq::timers_t *
> (timers_))->check_tag ()) {
1711 return (
static_cast<zmq::timers_t *
> (timers_))->timeout ();
1716 if (!timers_ || !(
static_cast<zmq::timers_t *
> (timers_))->check_tag ()) {
1721 return (
static_cast<zmq::timers_t *
> (timers_))->execute ();
1726 int zmq_proxy (
void *frontend_,
void *backend_,
void *capture_)
1728 if (!frontend_ || !backend_) {
1743 if (!frontend_ || !backend_) {
1765 #if defined(ZMQ_HAVE_IPC)
1766 if (strcmp (capability_, zmq::protocol_name::ipc) == 0)
1769 #if defined(ZMQ_HAVE_OPENPGM)
1770 if (strcmp (capability_, zmq::protocol_name::pgm) == 0)
1773 #if defined(ZMQ_HAVE_TIPC)
1774 if (strcmp (capability_, zmq::protocol_name::tipc) == 0)
1777 #if defined(ZMQ_HAVE_NORM)
1778 if (strcmp (capability_, zmq::protocol_name::norm) == 0)
1781 #if defined(ZMQ_HAVE_CURVE)
1782 if (strcmp (capability_,
"curve") == 0)
1785 #if defined(HAVE_LIBGSSAPI_KRB5)
1786 if (strcmp (capability_,
"gssapi") == 0)
1789 #if defined(ZMQ_HAVE_VMCI)
1790 if (strcmp (capability_, zmq::protocol_name::vmci) == 0)
1793 #if defined(ZMQ_BUILD_DRAFT_API)
1794 if (strcmp (capability_,
"draft") == 0)
1797 #if defined(ZMQ_HAVE_WS)
1798 if (strcmp (capability_,
"WS") == 0)
1801 #if defined(ZMQ_HAVE_WSS)
1802 if (strcmp (capability_,
"WSS") == 0)
1814 return s->query_pipes_stats ();