mailbox_safe.cpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #include "precompiled.hpp"
4 #include "mailbox_safe.hpp"
5 #include "clock.hpp"
6 #include "err.hpp"
7 
8 #include <algorithm>
9 
10 zmq::mailbox_safe_t::mailbox_safe_t (mutex_t *sync_) : _sync (sync_)
11 {
12  // Get the pipe into passive state. That way, if the users starts by
13  // polling on the associated file descriptor it will get woken up when
14  // new command is posted.
15  const bool ok = _cpipe.check_read ();
16  zmq_assert (!ok);
17 }
18 
19 zmq::mailbox_safe_t::~mailbox_safe_t ()
20 {
21  // TODO: Retrieve and deallocate commands inside the cpipe.
22 
23  // Work around problem that other threads might still be in our
24  // send() method, by waiting on the mutex before disappearing.
25  _sync->lock ();
26  _sync->unlock ();
27 }
28 
29 void zmq::mailbox_safe_t::add_signaler (signaler_t *signaler_)
30 {
31  _signalers.push_back (signaler_);
32 }
33 
34 void zmq::mailbox_safe_t::remove_signaler (signaler_t *signaler_)
35 {
36  // TODO: make a copy of array and signal outside the lock
37  const std::vector<zmq::signaler_t *>::iterator end = _signalers.end ();
38  const std::vector<signaler_t *>::iterator it =
39  std::find (_signalers.begin (), end, signaler_);
40 
41  if (it != end)
42  _signalers.erase (it);
43 }
44 
45 void zmq::mailbox_safe_t::clear_signalers ()
46 {
47  _signalers.clear ();
48 }
49 
50 void zmq::mailbox_safe_t::send (const command_t &cmd_)
51 {
52  _sync->lock ();
53  _cpipe.write (cmd_, false);
54  const bool ok = _cpipe.flush ();
55 
56  if (!ok) {
57  _cond_var.broadcast ();
58 
59  for (std::vector<signaler_t *>::iterator it = _signalers.begin (),
60  end = _signalers.end ();
61  it != end; ++it) {
62  (*it)->send ();
63  }
64  }
65 
66  _sync->unlock ();
67 }
68 
69 int zmq::mailbox_safe_t::recv (command_t *cmd_, int timeout_)
70 {
71  // Try to get the command straight away.
72  if (_cpipe.read (cmd_))
73  return 0;
74 
75  // If the timeout is zero, it will be quicker to release the lock, giving other a chance to send a command
76  // and immediately relock it.
77  if (timeout_ == 0) {
78  _sync->unlock ();
79  _sync->lock ();
80  } else {
81  // Wait for signal from the command sender.
82  const int rc = _cond_var.wait (_sync, timeout_);
83  if (rc == -1) {
84  errno_assert (errno == EAGAIN || errno == EINTR);
85  return -1;
86  }
87  }
88 
89  // Another thread may already fetch the command
90  const bool ok = _cpipe.read (cmd_);
91 
92  if (!ok) {
93  errno = EAGAIN;
94  return -1;
95  }
96 
97  return 0;
98 }
end
GLuint GLuint end
Definition: glcorearb.h:2858
EINTR
#define EINTR
Definition: errno.hpp:7
EAGAIN
#define EAGAIN
Definition: errno.hpp:14
precompiled.hpp
zmq_assert
#define zmq_assert(x)
Definition: err.hpp:102
mailbox_safe.hpp
errno
int errno
clock.hpp
send
void send(fd_t fd_, const char(&data_)[N])
Definition: test_security_curve.cpp:209
ok
ROSCPP_DECL bool ok()
errno_assert
#define errno_assert(x)
Definition: err.hpp:113
zmq::ZMQ_FINAL::_cpipe
cpipe_t _cpipe
Definition: mailbox.hpp:43
err.hpp
it
MapIter it
Definition: php/ext/google/protobuf/map.c:205


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:55