io_thread.cpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #include "precompiled.hpp"
4 
5 #include <new>
6 
7 #include "macros.hpp"
8 #include "io_thread.hpp"
9 #include "err.hpp"
10 #include "ctx.hpp"
11 
12 zmq::io_thread_t::io_thread_t (ctx_t *ctx_, uint32_t tid_) :
13  object_t (ctx_, tid_),
14  _mailbox_handle (static_cast<poller_t::handle_t> (NULL))
15 {
16  _poller = new (std::nothrow) poller_t (*ctx_);
18 
19  if (_mailbox.get_fd () != retired_fd) {
20  _mailbox_handle = _poller->add_fd (_mailbox.get_fd (), this);
21  _poller->set_pollin (_mailbox_handle);
22  }
23 }
24 
25 zmq::io_thread_t::~io_thread_t ()
26 {
27  LIBZMQ_DELETE (_poller);
28 }
29 
31 {
32  char name[16] = "";
33  snprintf (name, sizeof (name), "IO/%u",
34  get_tid () - zmq::ctx_t::reaper_tid - 1);
35  // Start the underlying I/O thread.
36  _poller->start (name);
37 }
38 
39 void zmq::io_thread_t::stop ()
40 {
41  send_stop ();
42 }
43 
44 zmq::mailbox_t *zmq::io_thread_t::get_mailbox ()
45 {
46  return &_mailbox;
47 }
48 
49 int zmq::io_thread_t::get_load () const
50 {
51  return _poller->get_load ();
52 }
53 
54 void zmq::io_thread_t::in_event ()
55 {
56  // TODO: Do we want to limit number of commands I/O thread can
57  // process in a single go?
58 
59  command_t cmd;
60  int rc = _mailbox.recv (&cmd, 0);
61 
62  while (rc == 0 || errno == EINTR) {
63  if (rc == 0)
64  cmd.destination->process_command (cmd);
65  rc = _mailbox.recv (&cmd, 0);
66  }
67 
68  errno_assert (rc != 0 && errno == EAGAIN);
69 }
70 
71 void zmq::io_thread_t::out_event ()
72 {
73  // We are never polling for POLLOUT here. This function is never called.
74  zmq_assert (false);
75 }
76 
77 void zmq::io_thread_t::timer_event (int)
78 {
79  // No timers here. This function is never called.
80  zmq_assert (false);
81 }
82 
83 zmq::poller_t *zmq::io_thread_t::get_poller () const
84 {
85  zmq_assert (_poller);
86  return _poller;
87 }
88 
89 void zmq::io_thread_t::process_stop ()
90 {
91  zmq_assert (_mailbox_handle);
92  _poller->rm_fd (_mailbox_handle);
93  _poller->stop ();
94 }
LIBZMQ_DELETE
#define LIBZMQ_DELETE(p_object)
Definition: macros.hpp:7
name
GLuint const GLchar * name
Definition: glcorearb.h:3055
NULL
NULL
Definition: test_security_zap.cpp:405
EINTR
#define EINTR
Definition: errno.hpp:7
zmq::retired_fd
@ retired_fd
Definition: fd.hpp:32
EAGAIN
#define EAGAIN
Definition: errno.hpp:14
precompiled.hpp
zmq_assert
#define zmq_assert(x)
Definition: err.hpp:102
errno
int errno
ctx.hpp
snprintf
int snprintf(char *str, size_t size, const char *format,...)
Definition: port.cc:64
alloc_assert
#define alloc_assert(x)
Definition: err.hpp:146
errno_assert
#define errno_assert(x)
Definition: err.hpp:113
macros.hpp
start
GLuint start
Definition: glcorearb.h:2858
io_thread.hpp
err.hpp
zmq::io_object_t::_poller
poller_t * _poller
Definition: io_object.hpp:50


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:54