reaper.cpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #include "precompiled.hpp"
4 #include "macros.hpp"
5 #include "reaper.hpp"
6 #include "socket_base.hpp"
7 #include "err.hpp"
8 
9 zmq::reaper_t::reaper_t (class ctx_t *ctx_, uint32_t tid_) :
10  object_t (ctx_, tid_),
11  _mailbox_handle (static_cast<poller_t::handle_t> (NULL)),
12  _poller (NULL),
13  _sockets (0),
14  _terminating (false)
15 {
16  if (!_mailbox.valid ())
17  return;
18 
19  _poller = new (std::nothrow) poller_t (*ctx_);
20  alloc_assert (_poller);
21 
22  if (_mailbox.get_fd () != retired_fd) {
23  _mailbox_handle = _poller->add_fd (_mailbox.get_fd (), this);
24  _poller->set_pollin (_mailbox_handle);
25  }
26 
27 #ifdef HAVE_FORK
28  _pid = getpid ();
29 #endif
30 }
31 
32 zmq::reaper_t::~reaper_t ()
33 {
34  LIBZMQ_DELETE (_poller);
35 }
36 
37 zmq::mailbox_t *zmq::reaper_t::get_mailbox ()
38 {
39  return &_mailbox;
40 }
41 
43 {
44  zmq_assert (_mailbox.valid ());
45 
46  // Start the thread.
47  _poller->start ("Reaper");
48 }
49 
50 void zmq::reaper_t::stop ()
51 {
52  if (get_mailbox ()->valid ()) {
53  send_stop ();
54  }
55 }
56 
57 void zmq::reaper_t::in_event ()
58 {
59  while (true) {
60 #ifdef HAVE_FORK
61  if (unlikely (_pid != getpid ())) {
62  //printf("zmq::reaper_t::in_event return in child process %d\n", (int)getpid());
63  return;
64  }
65 #endif
66 
67  // Get the next command. If there is none, exit.
68  command_t cmd;
69  const int rc = _mailbox.recv (&cmd, 0);
70  if (rc != 0 && errno == EINTR)
71  continue;
72  if (rc != 0 && errno == EAGAIN)
73  break;
74  errno_assert (rc == 0);
75 
76  // Process the command.
77  cmd.destination->process_command (cmd);
78  }
79 }
80 
81 void zmq::reaper_t::out_event ()
82 {
83  zmq_assert (false);
84 }
85 
86 void zmq::reaper_t::timer_event (int)
87 {
88  zmq_assert (false);
89 }
90 
91 void zmq::reaper_t::process_stop ()
92 {
93  _terminating = true;
94 
95  // If there are no sockets being reaped finish immediately.
96  if (!_sockets) {
97  send_done ();
98  _poller->rm_fd (_mailbox_handle);
99  _poller->stop ();
100  }
101 }
102 
103 void zmq::reaper_t::process_reap (socket_base_t *socket_)
104 {
105  // Add the socket to the poller.
106  socket_->start_reaping (_poller);
107 
108  ++_sockets;
109 }
110 
111 void zmq::reaper_t::process_reaped ()
112 {
113  --_sockets;
114 
115  // If reaped was already asked to terminate and there are no more sockets,
116  // finish immediately.
117  if (!_sockets && _terminating) {
118  send_done ();
119  _poller->rm_fd (_mailbox_handle);
120  _poller->stop ();
121  }
122 }
LIBZMQ_DELETE
#define LIBZMQ_DELETE(p_object)
Definition: macros.hpp:7
NULL
NULL
Definition: test_security_zap.cpp:405
EINTR
#define EINTR
Definition: errno.hpp:7
EAGAIN
#define EAGAIN
Definition: errno.hpp:14
precompiled.hpp
zmq_assert
#define zmq_assert(x)
Definition: err.hpp:102
errno
int errno
retired_fd
@ retired_fd
Definition: libzmq/tests/testutil.hpp:117
alloc_assert
#define alloc_assert(x)
Definition: err.hpp:146
errno_assert
#define errno_assert(x)
Definition: err.hpp:113
macros.hpp
start
GLuint start
Definition: glcorearb.h:2858
reaper.hpp
socket_base.hpp
err.hpp
false
#define false
Definition: cJSON.c:70
unlikely
#define unlikely(x)
Definition: likely.hpp:11


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:58