thread.cpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #include "precompiled.hpp"
4 #include "macros.hpp"
5 #include "thread.hpp"
6 #include "err.hpp"
7 
8 #ifdef ZMQ_HAVE_WINDOWS
9 #include <winnt.h>
10 #endif
11 
12 #ifdef __MINGW32__
13 #include "pthread.h"
14 #endif
15 
17 {
18  return _started;
19 }
20 
21 #ifdef ZMQ_HAVE_WINDOWS
22 
23 extern "C" {
24 #if defined _WIN32_WCE
25 static DWORD thread_routine (LPVOID arg_)
26 #else
27 static unsigned int __stdcall thread_routine (void *arg_)
28 #endif
29 {
30  zmq::thread_t *self = static_cast<zmq::thread_t *> (arg_);
31  self->applyThreadName ();
32  self->_tfn (self->_arg);
33  return 0;
34 }
35 }
36 
37 void zmq::thread_t::start (thread_fn *tfn_, void *arg_, const char *name_)
38 {
39  _tfn = tfn_;
40  _arg = arg_;
41  if (name_)
42  strncpy (_name, name_, sizeof (_name) - 1);
43 
44  // set default stack size to 4MB to avoid std::map stack overflow on x64
45  unsigned int stack = 0;
46 #if defined _WIN64
47  stack = 0x400000;
48 #endif
49 
50 #if defined _WIN32_WCE
51  _descriptor = (HANDLE) CreateThread (NULL, stack, &::thread_routine, this,
52  0, &_thread_id);
53 #else
54  _descriptor = (HANDLE) _beginthreadex (NULL, stack, &::thread_routine, this,
55  0, &_thread_id);
56 #endif
57  win_assert (_descriptor != NULL);
58  _started = true;
59 }
60 
62 {
63  return GetCurrentThreadId () == _thread_id;
64 }
65 
66 void zmq::thread_t::stop ()
67 {
68  if (_started) {
69  const DWORD rc = WaitForSingleObject (_descriptor, INFINITE);
70  win_assert (rc != WAIT_FAILED);
71  const BOOL rc2 = CloseHandle (_descriptor);
72  win_assert (rc2 != 0);
73  }
74 }
75 
77  int priority_, int scheduling_policy_, const std::set<int> &affinity_cpus_)
78 {
79  // not implemented
80  LIBZMQ_UNUSED (priority_);
81  LIBZMQ_UNUSED (scheduling_policy_);
82  LIBZMQ_UNUSED (affinity_cpus_);
83 }
84 
85 void zmq::thread_t::
86  applySchedulingParameters () // to be called in secondary thread context
87 {
88  // not implemented
89 }
90 
91 #ifdef _MSC_VER
92 
93 namespace
94 {
95 #pragma pack(push, 8)
96 struct thread_info_t
97 {
98  DWORD _type;
99  LPCSTR _name;
100  DWORD _thread_id;
101  DWORD _flags;
102 };
103 #pragma pack(pop)
104 }
105 
106 #endif
107 
108 void zmq::thread_t::
109  applyThreadName () // to be called in secondary thread context
110 {
111  if (!_name[0] || !IsDebuggerPresent ())
112  return;
113 
114 #ifdef _MSC_VER
115 
116  thread_info_t thread_info;
117  thread_info._type = 0x1000;
118  thread_info._name = _name;
119  thread_info._thread_id = -1;
120  thread_info._flags = 0;
121 
122  __try {
123  const DWORD MS_VC_EXCEPTION = 0x406D1388;
124  RaiseException (MS_VC_EXCEPTION, 0,
125  sizeof (thread_info) / sizeof (ULONG_PTR),
126  (ULONG_PTR *) &thread_info);
127  }
128  __except (EXCEPTION_CONTINUE_EXECUTION) {
129  }
130 
131 #elif defined(__MINGW32__)
132 
133  int rc = pthread_setname_np (pthread_self (), _name);
134  if (rc)
135  return;
136 
137 #else
138 
139  // not implemented
140 
141 #endif
142 }
143 
144 
145 #elif defined ZMQ_HAVE_VXWORKS
146 
147 extern "C" {
148 static void *thread_routine (void *arg_)
149 {
150  zmq::thread_t *self = (zmq::thread_t *) arg_;
151  self->applySchedulingParameters ();
152  self->_tfn (self->_arg);
153  return NULL;
154 }
155 }
156 
157 void zmq::thread_t::start (thread_fn *tfn_, void *arg_, const char *name_)
158 {
160  _tfn = tfn_;
161  _arg = arg_;
162  _descriptor = taskSpawn (NULL, DEFAULT_PRIORITY, DEFAULT_OPTIONS,
163  DEFAULT_STACK_SIZE, (FUNCPTR) thread_routine,
164  (int) this, 0, 0, 0, 0, 0, 0, 0, 0, 0);
165  if (_descriptor != NULL || _descriptor > 0)
166  _started = true;
167 }
168 
169 void zmq::thread_t::stop ()
170 {
171  if (_started)
172  while ((_descriptor != NULL || _descriptor > 0)
173  && taskIdVerify (_descriptor) == 0) {
174  }
175 }
176 
178 {
179  return taskIdSelf () == _descriptor;
180 }
181 
183  int priority_, int schedulingPolicy_, const std::set<int> &affinity_cpus_)
184 {
185  _thread_priority = priority_;
186  _thread_sched_policy = schedulingPolicy_;
187  _thread_affinity_cpus = affinity_cpus_;
188 }
189 
190 void zmq::thread_t::
191  applySchedulingParameters () // to be called in secondary thread context
192 {
193  int priority =
194  (_thread_priority >= 0 ? _thread_priority : DEFAULT_PRIORITY);
195  priority = (priority < UCHAR_MAX ? priority : DEFAULT_PRIORITY);
196  if (_descriptor != NULL || _descriptor > 0) {
197  taskPrioritySet (_descriptor, priority);
198  }
199 }
200 
201 void zmq::thread_t::
202  applyThreadName () // to be called in secondary thread context
203 {
204  // not implemented
205 }
206 
207 #else
208 
209 #include <signal.h>
210 #include <unistd.h>
211 #include <sys/time.h>
212 #include <sys/resource.h>
213 
214 extern "C" {
215 static void *thread_routine (void *arg_)
216 {
217 #if !defined ZMQ_HAVE_OPENVMS && !defined ZMQ_HAVE_ANDROID
218  // Following code will guarantee more predictable latencies as it'll
219  // disallow any signal handling in the I/O thread.
220  sigset_t signal_set;
221  int rc = sigfillset (&signal_set);
222  errno_assert (rc == 0);
223  rc = pthread_sigmask (SIG_BLOCK, &signal_set, NULL);
224  posix_assert (rc);
225 #endif
226  zmq::thread_t *self = (zmq::thread_t *) arg_;
227  self->applySchedulingParameters ();
228  self->applyThreadName ();
229  self->_tfn (self->_arg);
230  return NULL;
231 }
232 }
233 
234 void zmq::thread_t::start (thread_fn *tfn_, void *arg_, const char *name_)
235 {
236  _tfn = tfn_;
237  _arg = arg_;
238  if (name_)
239  strncpy (_name, name_, sizeof (_name) - 1);
240  int rc = pthread_create (&_descriptor, NULL, thread_routine, this);
241  posix_assert (rc);
242  _started = true;
243 }
244 
246 {
247  if (_started) {
248  int rc = pthread_join (_descriptor, NULL);
249  posix_assert (rc);
250  }
251 }
252 
254 {
255  return bool (pthread_equal (pthread_self (), _descriptor));
256 }
257 
259  int priority_, int scheduling_policy_, const std::set<int> &affinity_cpus_)
260 {
261  _thread_priority = priority_;
262  _thread_sched_policy = scheduling_policy_;
263  _thread_affinity_cpus = affinity_cpus_;
264 }
265 
266 void zmq::thread_t::
267  applySchedulingParameters () // to be called in secondary thread context
268 {
269 #if defined _POSIX_THREAD_PRIORITY_SCHEDULING \
270  && _POSIX_THREAD_PRIORITY_SCHEDULING >= 0
271  int policy = 0;
272  struct sched_param param;
273 
274 #if _POSIX_THREAD_PRIORITY_SCHEDULING == 0 \
275  && defined _SC_THREAD_PRIORITY_SCHEDULING
276  if (sysconf (_SC_THREAD_PRIORITY_SCHEDULING) < 0) {
277  return;
278  }
279 #endif
280  int rc = pthread_getschedparam (pthread_self (), &policy, &param);
281  posix_assert (rc);
282 
283  if (_thread_sched_policy != ZMQ_THREAD_SCHED_POLICY_DFLT) {
284  policy = _thread_sched_policy;
285  }
286 
287  /* Quoting docs:
288  "Linux allows the static priority range 1 to 99 for the SCHED_FIFO and
289  SCHED_RR policies, and the priority 0 for the remaining policies."
290  Other policies may use the "nice value" in place of the priority:
291  */
292  bool use_nice_instead_priority =
293  (policy != SCHED_FIFO) && (policy != SCHED_RR);
294 
295  if (use_nice_instead_priority)
296  param.sched_priority =
297  0; // this is the only supported priority for most scheduling policies
298  else if (_thread_priority != ZMQ_THREAD_PRIORITY_DFLT)
299  param.sched_priority =
300  _thread_priority; // user should provide a value between 1 and 99
301 
302 #ifdef __NetBSD__
303  if (policy == SCHED_OTHER)
304  param.sched_priority = -1;
305 #endif
306 
307  rc = pthread_setschedparam (pthread_self (), policy, &param);
308 
309 #if defined(__FreeBSD_kernel__) || defined(__FreeBSD__)
310  // If this feature is unavailable at run-time, don't abort.
311  if (rc == ENOSYS)
312  return;
313 #endif
314 
315  posix_assert (rc);
316 
317 #if !defined ZMQ_HAVE_VXWORKS
318  if (use_nice_instead_priority
319  && _thread_priority != ZMQ_THREAD_PRIORITY_DFLT
320  && _thread_priority > 0) {
321  // assume the user wants to decrease the thread's nice value
322  // i.e., increase the chance of this thread being scheduled: try setting that to
323  // maximum priority.
324  rc = nice (-20);
325 
326  errno_assert (rc != -1);
327  // IMPORTANT: EPERM is typically returned for unprivileged processes: that's because
328  // CAP_SYS_NICE capability is required or RLIMIT_NICE resource limit should be changed to avoid EPERM!
329  }
330 #endif
331 
332 #ifdef ZMQ_HAVE_PTHREAD_SET_AFFINITY
333  if (!_thread_affinity_cpus.empty ()) {
334  cpu_set_t cpuset;
335  CPU_ZERO (&cpuset);
336  for (std::set<int>::const_iterator it = _thread_affinity_cpus.begin (),
337  end = _thread_affinity_cpus.end ();
338  it != end; it++) {
339  CPU_SET ((int) (*it), &cpuset);
340  }
341  rc =
342  pthread_setaffinity_np (pthread_self (), sizeof (cpu_set_t), &cpuset);
343  posix_assert (rc);
344  }
345 #endif
346 #endif
347 }
348 
349 void zmq::thread_t::
350  applyThreadName () // to be called in secondary thread context
351 {
352  /* The thread name is a cosmetic string, added to ease debugging of
353  * multi-threaded applications. It is not a big issue if this value
354  * can not be set for any reason (such as Permission denied in some
355  * cases where the application changes its EUID, etc.) The value of
356  * "int rc" is retained where available, to help debuggers stepping
357  * through code to see its value - but otherwise it is ignored.
358  */
359  if (!_name[0])
360  return;
361 
362  /* Fails with permission denied on Android 5/6 */
363 #if defined(ZMQ_HAVE_ANDROID)
364  return;
365 #endif
366 
367 #if defined(ZMQ_HAVE_PTHREAD_SETNAME_1)
368  int rc = pthread_setname_np (_name);
369  if (rc)
370  return;
371 #elif defined(ZMQ_HAVE_PTHREAD_SETNAME_2)
372  int rc = pthread_setname_np (pthread_self (), _name);
373  if (rc)
374  return;
375 #elif defined(ZMQ_HAVE_PTHREAD_SETNAME_3)
376  int rc = pthread_setname_np (pthread_self (), _name, NULL);
377  if (rc)
378  return;
379 #elif defined(ZMQ_HAVE_PTHREAD_SET_NAME)
380  pthread_set_name_np (pthread_self (), _name);
381 #endif
382 }
383 
384 #endif
zmq::thread_t::stop
void stop()
Definition: thread.cpp:245
end
GLuint GLuint end
Definition: glcorearb.h:2858
NULL
NULL
Definition: test_security_zap.cpp:405
self
PHP_PROTO_OBJECT_FREE_END PHP_PROTO_OBJECT_DTOR_END intern self
Definition: php/ext/google/protobuf/map.c:543
zmq::thread_fn
void() thread_fn(void *)
Definition: thread.hpp:17
precompiled.hpp
zmq::thread_t::start
void start(thread_fn *tfn_, void *arg_, const char *name_)
Definition: thread.cpp:234
param
GLenum GLfloat param
Definition: glcorearb.h:2769
ZMQ_THREAD_SCHED_POLICY_DFLT
#define ZMQ_THREAD_SCHED_POLICY_DFLT
Definition: zmq.h:196
errno_assert
#define errno_assert(x)
Definition: err.hpp:113
macros.hpp
LIBZMQ_UNUSED
#define LIBZMQ_UNUSED(object)
Definition: macros.hpp:6
thread.hpp
zmq::thread_t
Definition: thread.hpp:26
name_
string name_
Definition: googletest.cc:182
zmq::thread_t::applyThreadName
void applyThreadName()
Definition: thread.cpp:350
zmq::thread_t::is_current_thread
bool is_current_thread() const
Definition: thread.cpp:253
zmq::thread_t::_started
bool _started
Definition: thread.hpp:79
posix_assert
#define posix_assert(x)
Definition: err.hpp:124
thread_routine
static void * thread_routine(void *arg_)
Definition: thread.cpp:215
err.hpp
HANDLE
void * HANDLE
Definition: wepoll.c:70
zmq::thread_t::applySchedulingParameters
void applySchedulingParameters()
Definition: thread.cpp:267
zmq::thread_t::get_started
bool get_started() const
Definition: thread.cpp:16
it
MapIter it
Definition: php/ext/google/protobuf/map.c:205
ZMQ_THREAD_PRIORITY_DFLT
#define ZMQ_THREAD_PRIORITY_DFLT
Definition: zmq.h:195
zmq::thread_t::setSchedulingParameters
void setSchedulingParameters(int priority_, int scheduling_policy_, const std::set< int > &affinity_cpus_)
Definition: thread.cpp:258


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:07:00