condition_variable.hpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #ifndef __ZMQ_CONDITON_VARIABLE_HPP_INCLUDED__
4 #define __ZMQ_CONDITON_VARIABLE_HPP_INCLUDED__
5 
6 #include "err.hpp"
7 #include "mutex.hpp"
8 
9 // Condition variable class encapsulates OS mutex in a platform-independent way.
10 
11 #if defined(ZMQ_USE_CV_IMPL_NONE)
12 
13 namespace zmq
14 {
15 class condition_variable_t
16 {
17  public:
18  inline condition_variable_t () { zmq_assert (false); }
19 
20  inline int wait (mutex_t *mutex_, int timeout_)
21  {
22  zmq_assert (false);
23  return -1;
24  }
25 
26  inline void broadcast () { zmq_assert (false); }
27 
28  ZMQ_NON_COPYABLE_NOR_MOVABLE (condition_variable_t)
29 };
30 }
31 
32 #elif defined(ZMQ_USE_CV_IMPL_WIN32API)
33 
34 #include "windows.hpp"
35 
36 namespace zmq
37 {
38 class condition_variable_t
39 {
40  public:
41  inline condition_variable_t () { InitializeConditionVariable (&_cv); }
42 
43  inline int wait (mutex_t *mutex_, int timeout_)
44  {
45  int rc = SleepConditionVariableCS (&_cv, mutex_->get_cs (), timeout_);
46 
47  if (rc != 0)
48  return 0;
49 
50  rc = GetLastError ();
51 
52  if (rc != ERROR_TIMEOUT)
53  win_assert (rc);
54 
55  errno = EAGAIN;
56  return -1;
57  }
58 
59  inline void broadcast () { WakeAllConditionVariable (&_cv); }
60 
61  private:
62  CONDITION_VARIABLE _cv;
63 
64  ZMQ_NON_COPYABLE_NOR_MOVABLE (condition_variable_t)
65 };
66 }
67 
68 #elif defined(ZMQ_USE_CV_IMPL_STL11)
69 
70 #include <condition_variable>
71 
72 namespace zmq
73 {
74 class condition_variable_t
75 {
76  public:
77  condition_variable_t () ZMQ_DEFAULT;
78 
79  int wait (mutex_t *mutex_, int timeout_)
80  {
81  // this assumes that the mutex mutex_ has been locked by the caller
82  int res = 0;
83  if (timeout_ == -1) {
84  _cv.wait (
85  *mutex_); // unlock mtx and wait cv.notify_all(), lock mtx after cv.notify_all()
86  } else if (_cv.wait_for (*mutex_, std::chrono::milliseconds (timeout_))
88  // time expired
89  errno = EAGAIN;
90  res = -1;
91  }
92  return res;
93  }
94 
95  void broadcast ()
96  {
97  // this assumes that the mutex associated with _cv has been locked by the caller
98  _cv.notify_all ();
99  }
100 
101  private:
102  std::condition_variable_any _cv;
103 
104  ZMQ_NON_COPYABLE_NOR_MOVABLE (condition_variable_t)
105 };
106 }
107 
108 #elif defined(ZMQ_USE_CV_IMPL_VXWORKS)
109 
110 #include <sysLib.h>
111 
112 namespace zmq
113 {
114 class condition_variable_t
115 {
116  public:
117  inline condition_variable_t () ZMQ_DEFAULT;
118 
119  inline ~condition_variable_t ()
120  {
121  scoped_lock_t l (_listenersMutex);
122  for (size_t i = 0; i < _listeners.size (); i++) {
123  semDelete (_listeners[i]);
124  }
125  }
126 
127  inline int wait (mutex_t *mutex_, int timeout_)
128  {
129  //Atomically releases lock, blocks the current executing thread,
130  //and adds it to the list of threads waiting on *this. The thread
131  //will be unblocked when broadcast() is executed.
132  //It may also be unblocked spuriously. When unblocked, regardless
133  //of the reason, lock is reacquired and wait exits.
134 
135  SEM_ID sem = semBCreate (SEM_Q_PRIORITY, SEM_EMPTY);
136  {
137  scoped_lock_t l (_listenersMutex);
138  _listeners.push_back (sem);
139  }
140  mutex_->unlock ();
141 
142  int rc;
143  if (timeout_ < 0)
144  rc = semTake (sem, WAIT_FOREVER);
145  else {
146  int ticksPerSec = sysClkRateGet ();
147  int timeoutTicks = (timeout_ * ticksPerSec) / 1000 + 1;
148  rc = semTake (sem, timeoutTicks);
149  }
150 
151  {
152  scoped_lock_t l (_listenersMutex);
153  // remove sem from listeners
154  for (size_t i = 0; i < _listeners.size (); i++) {
155  if (_listeners[i] == sem) {
156  _listeners.erase (_listeners.begin () + i);
157  break;
158  }
159  }
160  semDelete (sem);
161  }
162  mutex_->lock ();
163 
164  if (rc == 0)
165  return 0;
166 
167  if (rc == S_objLib_OBJ_TIMEOUT) {
168  errno = EAGAIN;
169  return -1;
170  }
171 
172  return -1;
173  }
174 
175  inline void broadcast ()
176  {
177  scoped_lock_t l (_listenersMutex);
178  for (size_t i = 0; i < _listeners.size (); i++) {
179  semGive (_listeners[i]);
180  }
181  }
182 
183  private:
184  mutex_t _listenersMutex;
185  std::vector<SEM_ID> _listeners;
186 
187  ZMQ_NON_COPYABLE_NOR_MOVABLE (condition_variable_t)
188 };
189 }
190 
191 #elif defined(ZMQ_USE_CV_IMPL_PTHREADS)
192 
193 #include <pthread.h>
194 
195 #if defined(__ANDROID_API__) && __ANDROID_API__ < 21
196 #define ANDROID_LEGACY
197 extern "C" int pthread_cond_timedwait_monotonic_np (pthread_cond_t *,
198  pthread_mutex_t *,
199  const struct timespec *);
200 #endif
201 
202 namespace zmq
203 {
204 class condition_variable_t
205 {
206  public:
207  inline condition_variable_t ()
208  {
209  pthread_condattr_t attr;
210  pthread_condattr_init (&attr);
211 #if !defined(ZMQ_HAVE_OSX) && !defined(ANDROID_LEGACY)
212  pthread_condattr_setclock (&attr, CLOCK_MONOTONIC);
213 #endif
214  int rc = pthread_cond_init (&_cond, &attr);
215  posix_assert (rc);
216  }
217 
218  inline ~condition_variable_t ()
219  {
220  int rc = pthread_cond_destroy (&_cond);
221  posix_assert (rc);
222  }
223 
224  inline int wait (mutex_t *mutex_, int timeout_)
225  {
226  int rc;
227 
228  if (timeout_ != -1) {
229  struct timespec timeout;
230 
231 #ifdef ZMQ_HAVE_OSX
232  timeout.tv_sec = 0;
233  timeout.tv_nsec = 0;
234 #else
235  rc = clock_gettime (CLOCK_MONOTONIC, &timeout);
236  posix_assert (rc);
237 #endif
238 
239  timeout.tv_sec += timeout_ / 1000;
240  timeout.tv_nsec += (timeout_ % 1000) * 1000000;
241 
242  if (timeout.tv_nsec >= 1000000000) {
243  timeout.tv_sec++;
244  timeout.tv_nsec -= 1000000000;
245  }
246 #ifdef ZMQ_HAVE_OSX
247  rc = pthread_cond_timedwait_relative_np (
248  &_cond, mutex_->get_mutex (), &timeout);
249 #elif defined(ANDROID_LEGACY)
250  rc = pthread_cond_timedwait_monotonic_np (
251  &_cond, mutex_->get_mutex (), &timeout);
252 #else
253  rc =
254  pthread_cond_timedwait (&_cond, mutex_->get_mutex (), &timeout);
255 #endif
256  } else
257  rc = pthread_cond_wait (&_cond, mutex_->get_mutex ());
258 
259  if (rc == 0)
260  return 0;
261 
262  if (rc == ETIMEDOUT) {
263  errno = EAGAIN;
264  return -1;
265  }
266 
267  posix_assert (rc);
268  return -1;
269  }
270 
271  inline void broadcast ()
272  {
273  int rc = pthread_cond_broadcast (&_cond);
274  posix_assert (rc);
275  }
276 
277  private:
278  pthread_cond_t _cond;
279 
280  ZMQ_NON_COPYABLE_NOR_MOVABLE (condition_variable_t)
281 };
282 }
283 
284 #endif
285 
286 #endif
EAGAIN
#define EAGAIN
Definition: errno.hpp:14
ZMQ_DEFAULT
#define ZMQ_DEFAULT
Definition: macros.hpp:43
zmq_assert
#define zmq_assert(x)
Definition: err.hpp:102
errno
int errno
zmq
Definition: zmq.hpp:229
windows.hpp
ZMQ_NON_COPYABLE_NOR_MOVABLE
#define ZMQ_NON_COPYABLE_NOR_MOVABLE(classname)
Definition: macros.hpp:58
mutex_
internal::WrappedMutex mutex_
Definition: src/google/protobuf/message.cc:579
timeout
GLbitfield GLuint64 timeout
Definition: glcorearb.h:3588
i
int i
Definition: gmock-matchers_test.cc:764
posix_assert
#define posix_assert(x)
Definition: err.hpp:124
err.hpp
ETIMEDOUT
#define ETIMEDOUT
Definition: zmq.h:149
mutex.hpp


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:48