xsens_threadpool.cpp
Go to the documentation of this file.
1 
2 // Copyright (c) 2003-2021 Xsens Technologies B.V. or subsidiaries worldwide.
3 // All rights reserved.
4 //
5 // Redistribution and use in source and binary forms, with or without modification,
6 // are permitted provided that the following conditions are met:
7 //
8 // 1. Redistributions of source code must retain the above copyright notice,
9 // this list of conditions, and the following disclaimer.
10 //
11 // 2. Redistributions in binary form must reproduce the above copyright notice,
12 // this list of conditions, and the following disclaimer in the documentation
13 // and/or other materials provided with the distribution.
14 //
15 // 3. Neither the names of the copyright holders nor the names of their contributors
16 // may be used to endorse or promote products derived from this software without
17 // specific prior written permission.
18 //
19 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
20 // EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
21 // MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
22 // THE COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23 // SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
24 // OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
25 // HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY OR
26 // TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
27 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.THE LAWS OF THE NETHERLANDS
28 // SHALL BE EXCLUSIVELY APPLICABLE AND ANY DISPUTES SHALL BE FINALLY SETTLED UNDER THE RULES
29 // OF ARBITRATION OF THE INTERNATIONAL CHAMBER OF COMMERCE IN THE HAGUE BY ONE OR MORE
30 // ARBITRATORS APPOINTED IN ACCORDANCE WITH SAID RULES.
31 //
32 
33 
34 // Copyright (c) 2003-2021 Xsens Technologies B.V. or subsidiaries worldwide.
35 // All rights reserved.
36 //
37 // Redistribution and use in source and binary forms, with or without modification,
38 // are permitted provided that the following conditions are met:
39 //
40 // 1. Redistributions of source code must retain the above copyright notice,
41 // this list of conditions, and the following disclaimer.
42 //
43 // 2. Redistributions in binary form must reproduce the above copyright notice,
44 // this list of conditions, and the following disclaimer in the documentation
45 // and/or other materials provided with the distribution.
46 //
47 // 3. Neither the names of the copyright holders nor the names of their contributors
48 // may be used to endorse or promote products derived from this software without
49 // specific prior written permission.
50 //
51 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
52 // EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
53 // MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
54 // THE COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
55 // SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
56 // OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
57 // HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY OR
58 // TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
59 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.THE LAWS OF THE NETHERLANDS
60 // SHALL BE EXCLUSIVELY APPLICABLE AND ANY DISPUTES SHALL BE FINALLY SETTLED UNDER THE RULES
61 // OF ARBITRATION OF THE INTERNATIONAL CHAMBER OF COMMERCE IN THE HAGUE BY ONE OR MORE
62 // ARBITRATORS APPOINTED IN ACCORDANCE WITH SAID RULES.
63 //
64 
65 #include "xsens_threadpool.h"
66 #ifndef _MSC_VER
67  #include <unistd.h>
68  #include <cxxabi.h>
69 #endif
70 #ifdef XSENS_DEBUG
71  #include <typeinfo>
72 #endif
73 
74 #include <vector>
75 #include <xstypes/xsexception.h>
76 #include "threading.h"
77 #include <atomic>
78 
79 // Enable this if you want to disable multithreading
80 //#define ADD_TASK_EXECUTE_NOW
81 
82 namespace xsens
83 {
87 
88 typedef std::map<unsigned int, std::shared_ptr<PooledTask>> TaskSet;
89 typedef std::set<PooledThread*> ThreadSet;
90 typedef std::vector<std::shared_ptr<PooledTask>> TaskList;
91 
95 {
96 #if defined(_WIN32)
97  SYSTEM_INFO sysinfo;
98  ::GetSystemInfo(&sysinfo);
99  return (int) sysinfo.dwNumberOfProcessors;
100 #else
101  return sysconf(_SC_NPROCESSORS_ONLN);
102  // see http://stackoverflow.com/questions/150355/programmatically-find-the-number-of-cores-on-a-machine
103  // for OS-specific code
104 #endif
105 }
106 
110 
120 {
121  return 0;
122 }
123 
127 
131 {
132 public:
134  unsigned int m_id;
135  std::vector<std::shared_ptr<PooledTask>> m_dependentTasks;
137  volatile std::atomic<bool> m_canceling;
138 
140  : m_task(nullptr)
141  , m_id(0)
142  , m_threadId(0)
143  , m_canceling(false)
144  , m_completed(false)
145  , m_completedMutex()
147  {
148  }
149 
151  {
152  signalCompleted();
153  delete m_task;
154  }
155 
157  {
158  Lock locker(&m_completedMutex);
159  if (!m_completed)
160  {
161  // This may look like a dead-lock situation with signalCompleted(), but it isn't
162  if (timeout == UINT32_MAX)
164  else
165  m_completedCondition.wait(timeout);
166  }
167  return m_completed;
168  }
169 
170  void signalCompleted() noexcept
171  {
172  Lock locker(&m_completedMutex);
173  if (!m_completed)
174  {
175  m_completed = true;
176  locker.unlock();
178  }
179  }
180 
181 private:
182  volatile std::atomic_bool m_completed;
185 };
186 
189 {
190  if (m_container)
191  return m_container->m_canceling;
192  return false;
193 }
194 
196 unsigned int ThreadPoolTask::taskId() const
197 {
198  if (m_container)
199  return m_container->m_id;
200  return 0;
201 }
202 
206 
211 {
212 public:
213  PooledThread(ThreadPool* pool);
214  ~PooledThread();
215  bool isIdle() const;
216  bool isBusy() const;
217  unsigned int executedCount() const;
218  unsigned int completedCount() const;
219  unsigned int failedCount() const;
220 
221 protected:
223  std::shared_ptr<PooledTask> m_task;
224  unsigned int m_executed;
225  unsigned int m_completed;
226  unsigned int m_failed;
227 
228  int32_t innerFunction(void) override;
229 };
230 
233  : StandardThread()
234  , m_pool(pool)
235  , m_task(nullptr)
236  , m_executed(0)
237  , m_completed(0)
238  , m_failed(0)
239 {
240 }
241 
244 {
245  stopThread();
246  m_pool = nullptr;
247 }
248 
255 {
256  if (!m_task)
258 
259  while (m_task && !isTerminating())
260  {
261  m_task->m_threadId = getThreadId();
262  bool complete = false;
263  try
264  {
265  if (m_task->m_task->exec())
266  {
267  ++m_completed;
268  complete = true;
269  }
270  else if (m_task->m_canceling)
271  {
272  ++m_failed;
273  complete = true;
274  }
275  }
276 #ifdef XSENS_DEBUG
277  catch (std::exception& e)
278  {
279  const std::type_info& et = typeid(e);
280 #ifdef __GNUC__
281  int status;
282  char* realname = abi::__cxa_demangle(et.name(), 0, 0, &status);
283  fprintf(stderr, "ThreadPool: Caught an unhandled %s(\"%s\")\n", realname, e.what());
284  free(realname);
285 #else
286  fprintf(stderr, "ThreadPool: Caught an unhandled %s: %s\n", et.name(), e.what());
287 #endif
288  complete = true;
289  }
290 #endif
291  catch (...)
292  {
293 #ifdef XSENS_DEBUG
294  fprintf(stderr, "ThreadPool: Caught an unhandled unknown exception\n");
295 #endif
296  m_task->m_task->onError();
297  ++m_failed;
298  complete = true;
299  }
300 
301  ++m_executed;
302  if (complete)
304  else
305  {
306  m_task->m_threadId = 0;
308  }
309 
311  }
312  return 1;
313 }
314 
318 {
319  return (m_task == nullptr);
320 }
321 
325 {
326  return !isIdle();
327 }
328 
331 unsigned int PooledThread::executedCount() const
332 {
333  return m_executed;
334 }
335 
338 unsigned int PooledThread::completedCount() const
339 {
340  return m_completed;
341 }
342 
345 unsigned int PooledThread::failedCount() const
346 {
347  return m_failed;
348 }
349 
353 
362  : m_nextId(1)
363  , m_suspended(false)
364  , m_terminating(false)
365 {
366  setPoolSize(0);
367 }
368 
372 {
373  m_terminating = true;
374  suspend(true);
375 
376  // all threads should now be idle
377  m_tasks.clear();
378  m_tasksSearch.clear();
379  m_executing.clear();
380  m_delaying.clear();
381 
382  try
383  {
384  for (ThreadSet::iterator it = m_threads.begin(); it != m_threads.end(); ++it)
385  delete *it;
386  }
387  catch (...)
388  {
389  // nothing much we can do about this...
390  }
391 }
392 
400 {
401  assert(!m_terminating);
402  std::shared_ptr<PooledTask> tmp = std::make_shared<PooledTask>();
403  tmp->m_task = task;
404  Lock safety(&m_safe);
405  tmp->m_id = m_nextId;
406  task->m_container = tmp.get();
407 
408  // determine next ID
409  if (!++m_nextId)
410  m_nextId = 1;
411 
412 #ifndef ADD_TASK_EXECUTE_NOW
413  if (afterId)
414  {
415  std::shared_ptr<PooledTask> after = findTask(afterId);
416  if (after)
417  {
418  after->m_dependentTasks.push_back(tmp);
419  m_delaying[tmp->m_id] = tmp;
420  return tmp->m_id;
421  }
422  }
423 
424  m_tasks.push_back(tmp);
425  m_tasksSearch[tmp->m_id] = tmp;
426 #else
427  task->exec();
428  task->signalCompleted();
429 #endif
430 
431  return tmp->m_id;
432 }
433 
436 unsigned int ThreadPool::count()
437 {
438  Lock safety(&m_safe);
439  return (unsigned int)(m_tasks.size() + m_delaying.size() + m_executing.size());
440 }
441 
445 void ThreadPool::setPoolSize(unsigned int poolsize)
446 {
447  if (poolsize == 0)
448  {
449  //int pc = processorCount();
450  //poolsize = ::std::max(4, pc*2);
451  poolsize = 12; // fixed count to prevent thread starvation on low-core PCs and overthreading on high-core PCs
452  }
453 
454  bool wasSuspended = m_suspended;
455  suspend(poolsize < m_threads.size());
456  Lock safety(&m_safe);
457 
458  // reduce size if pool is too large
459  if (poolsize < m_threads.size())
460  {
461  while (poolsize < m_threads.size())
462  {
463  auto it = m_threads.begin();
464  auto thread = *it;
465  safety.unlock();
466  thread->stopThread();
467  safety.lock();
468  delete thread;
469  m_threads.erase(it);
470  }
471  }
472 
473  // increase size if pool is too small
474  for (unsigned int i = (unsigned int) m_threads.size(); i < poolsize; ++i)
475  {
476  PooledThread* t = new PooledThread(this);
477  m_threads.insert(t);
478 #ifdef XSENS_DEBUG
479  char bufje[64];
480  sprintf(bufje, "Pooled Thread %p", t);
481 #else
482  const char bufje[] = "Pooled Thread";
483 #endif
484  if (!t->startThread(bufje))
485  {
486  m_threads.erase(m_threads.find(t));
487  delete t;
488  throw XsException(XRV_ERROR, "Could not start thread for ThreadPool");
489  }
490  }
491 
492  if (!wasSuspended)
493  resume();
494 }
495 
497 unsigned int ThreadPool::poolSize() const
498 {
499  return (unsigned int) m_threads.size();
500 }
501 
504 std::shared_ptr<PooledTask> ThreadPool::findTask(ThreadPool::TaskId id)
505 {
506  Lock safety(&m_safe);
507  TaskSet::iterator it = m_executing.find(id);
508  if (it != m_executing.end())
509  return it->second;
510 
511  it = m_delaying.find(id);
512  if (it != m_delaying.end())
513  return it->second;
514 
515  it = m_tasksSearch.find(id);
516  if (it != m_tasksSearch.end())
517  return it->second;
518 
519  for (auto const& tsk : m_tasks)
520  if (tsk->m_id == id)
521  return tsk;
522 
523  return std::shared_ptr<PooledTask>();
524 }
525 
529 {
530  Lock safety(&m_safe);
531  TaskSet::iterator it = m_executing.find(id);
532  if (it != m_executing.end())
533  return it->second->m_threadId;
534  return 0;
535 }
536 
540 {
541  return findTask(id) != nullptr;
542 }
543 
546 void ThreadPool::cancelTask(ThreadPool::TaskId id, bool wait) noexcept
547 {
548  Lock safety(&m_safe);
549  TaskSet::iterator it = m_executing.find(id);
550  if (it != m_executing.end())
551  {
552  it->second->m_canceling = true;
553  safety.unlock();
554  if (wait)
555  waitForCompletion(id);
556  return;
557  }
558 
559  it = m_delaying.find(id);
560  if (it != m_delaying.end())
561  {
562  reportTaskComplete(it->second);
563  m_delaying.erase(it);
564  }
565 
566  it = m_tasksSearch.find(id);
567  if (it != m_tasksSearch.end())
568  {
569  reportTaskComplete(it->second);
570  m_tasksSearch.erase(it);
571  }
572 
573  for (auto tskIt = m_tasks.begin(); tskIt != m_tasks.end(); ++tskIt)
574  {
575  if ((*tskIt)->m_id == id)
576  {
577  reportTaskComplete(*tskIt);
578  m_tasks.erase(tskIt);
579  return;
580  }
581  }
582 }
583 
587 {
588  std::shared_ptr<PooledTask> task = findTask(id);
589  if (task != nullptr)
590  task->waitForCompletion();
591 }
592 
596 void ThreadPool::reportTaskComplete(std::shared_ptr<PooledTask> task)
597 {
598  Lock safety(&m_safe);
599 
600  // remove task from 'executing' list
601  TaskSet::iterator it = m_executing.find(task->m_id);
602  if (it != m_executing.end())
603  m_executing.erase(it);
604 
605  // notify dependent tasks that their dependency has been fulfilled
606  for (TaskList::iterator dep = task->m_dependentTasks.begin(); dep != task->m_dependentTasks.end(); ++dep)
607  {
608  it = m_delaying.find((*dep)->m_id);
609  if (it != m_delaying.end())
610  {
611  m_tasks.push_back(it->second);
612  m_tasksSearch[it->second->m_id] = it->second;
613  m_delaying.erase(it);
614  }
615  }
616  task->signalCompleted();
617 }
618 
621 std::shared_ptr<PooledTask> ThreadPool::getNextTask()
622 {
623  Lock safety(&m_safe);
624  if (!m_suspended && !m_tasks.empty())
625  {
626  std::shared_ptr<PooledTask> tmp = m_tasks.front();
627  m_tasks.pop_front();
628  m_tasksSearch.erase(tmp->m_id);
629  m_executing[tmp->m_id] = tmp;
630  return tmp;
631  }
632 
633  return std::shared_ptr<PooledTask>();
634 }
635 
644 void ThreadPool::reportTaskPaused(std::shared_ptr<PooledTask> task)
645 {
646  Lock safety(&m_safe);
647 
648  // remove task from 'executing' list
649  TaskSet::iterator it = m_executing.find(task->m_id);
650  if (it != m_executing.end())
651  m_executing.erase(it);
652 
653  // add task back into either the delaying list if it should wait for something
654  unsigned int waitForId = task->m_task->needToWaitFor();
655  if (waitForId)
656  {
657  std::shared_ptr<PooledTask> after = findTask(waitForId);
658  if (after)
659  {
660  after->m_dependentTasks.push_back(task);
661  m_delaying[task->m_id] = task;
662  return;
663  }
664  }
665 
666  // add task into ready queue
667  m_tasks.push_back(task);
668  m_tasksSearch[task->m_id] = task;
669 }
670 
676 void ThreadPool::suspend(bool wait) noexcept
677 {
678  Lock safety(&m_safe);
679  m_suspended = true;
680 
681  if (wait)
682  {
683  safety.unlock();
684 
685  for (ThreadSet::const_iterator it = m_threads.begin(); it != m_threads.end(); ++it)
686  while ((*it)->isBusy())
687  xsYield();
688  }
689 }
690 
695 {
696  Lock safety(&m_safe);
697  m_suspended = false;
698 }
699 
702 unsigned int ThreadPool::executedCount(unsigned int thread) const
703 {
704  unsigned int i = 0;
705  for (ThreadSet::const_iterator it = m_threads.begin(); it != m_threads.end(); ++it, ++i)
706  if (i == thread)
707  return (*it)->executedCount();
708  return 0;
709 }
710 
713 unsigned int ThreadPool::completedCount(unsigned int thread) const
714 {
715  unsigned int i = 0;
716  for (ThreadSet::const_iterator it = m_threads.begin(); it != m_threads.end(); ++it, ++i)
717  if (i == thread)
718  return (*it)->completedCount();
719  return 0;
720 }
721 
724 unsigned int ThreadPool::failedCount(unsigned int thread) const
725 {
726  unsigned int i = 0;
727  for (ThreadSet::const_iterator it = m_threads.begin(); it != m_threads.end(); ++it, ++i)
728  if (i == thread)
729  return (*it)->failedCount();
730  return 0;
731 }
732 
734 bool gManagePool = true;
735 
738 {
739  if (gPool)
740  return gPool;
741  try
742  {
743  gPool = new ThreadPool;
744  gManagePool = true;
745  }
746  catch (...)
747  {
748  gPool = nullptr;
749  gManagePool = false;
750  }
751  return gPool;
752 }
753 
761 {
762  if (gPool && gManagePool)
763  delete gPool;
764  gPool = NULL;
765  gManagePool = true;
766 }
767 
777 {
778  if (pool != gPool)
779  {
780  destroy();
781  if (pool)
782  {
783  gPool = pool;
784  gManagePool = false;
785  }
786  }
787 }
788 
792 
808  : ThreadPoolTask()
809  , m_pool(pool)
810 {}
811 
813 TaskCompletionWaiter::TaskCompletionWaiter(const std::list<unsigned int>& waitlist, ThreadPool* pool)
814  : ThreadPoolTask()
815  , m_pool(pool)
816  , m_waitList(waitlist)
817 {}
818 
821 {}
822 
828 {
829  while (m_waitList.size())
830  {
831  // if the task exists, reschedule this task to recheck after the task completes
832  if (m_pool->doesTaskExist(*m_waitList.begin()))
833  return false;
834 
835  m_waitList.erase(m_waitList.begin());
836  }
837  return true;
838 }
839 
844 {
845  if (m_waitList.size())
846  return *m_waitList.begin();
847  return 0;
848 }
849 
854 void TaskCompletionWaiter::addWaitId(unsigned int id)
855 {
856  m_waitList.push_back(id);
857 }
858 } // namespace xsens
xsens::PooledTask::m_completed
volatile std::atomic_bool m_completed
Definition: xsens_threadpool.cpp:182
xsens::PooledTask::m_threadId
XsThreadId m_threadId
Definition: xsens_threadpool.cpp:136
xsens::PooledThread::m_executed
unsigned int m_executed
The number of tasks that this thread has executed s o far, including incomplete tasks.
Definition: xsens_threadpool.cpp:224
xsens::ThreadPool::m_suspended
bool m_suspended
Definition: xsens_threadpool.h:117
xsexception.h
xsens::ThreadPoolTask
A generic task implementation for the thread pool.
Definition: xsens_threadpool.h:82
xsens::PooledTask::PooledTask
PooledTask()
Definition: xsens_threadpool.cpp:139
XsThreadId
pthread_t XsThreadId
Definition: xsthread.h:187
xsens::ThreadPoolTask::isCanceling
bool isCanceling() const
Returns true if the task has been told to cancel itself.
Definition: xsens_threadpool.cpp:188
xsens::ThreadPool::suspend
void suspend(bool wait=false) noexcept
Suspend execution of tasks, any currently executing tasks will run to completion, but queued tasks wi...
Definition: xsens_threadpool.cpp:676
xsens::TaskList
std::vector< std::shared_ptr< PooledTask > > TaskList
Definition: xsens_threadpool.cpp:90
xsens::ThreadSet
std::set< PooledThread * > ThreadSet
Definition: xsens_threadpool.cpp:89
xsens::ThreadPool::ThreadPool
ThreadPool()
Construct a threadpool with a number of threads equal to the number of cores on the PC.
Definition: xsens_threadpool.cpp:361
xsens::TaskCompletionWaiter::needToWaitFor
virtual unsigned int needToWaitFor()
If there are wait tasks left, this function will return the id of the first in the list.
Definition: xsens_threadpool.cpp:843
xsens::PooledTask::waitForCompletion
bool waitForCompletion(uint32_t timeout=UINT32_MAX)
Definition: xsens_threadpool.cpp:156
xsens::ThreadPool::completedCount
unsigned int completedCount(unsigned int thread) const
Return the number of tasks completed by the given thread.
Definition: xsens_threadpool.cpp:713
xsens::ThreadPool::taskThreadId
XsThreadId taskThreadId(TaskId id)
Find an XsThread with the specified id.
Definition: xsens_threadpool.cpp:528
xsens::ThreadPool::doesTaskExist
bool doesTaskExist(TaskId id)
Check if a task with the supplied id exists.
Definition: xsens_threadpool.cpp:539
xsens::PooledThread::innerFunction
int32_t innerFunction(void) override
The inner function of the pooled thread.
Definition: xsens_threadpool.cpp:254
xsens::ThreadPool::m_tasksSearch
std::map< TaskId, std::shared_ptr< PooledTask > > m_tasksSearch
Definition: xsens_threadpool.h:112
xsens::PooledThread::PooledThread
PooledThread(ThreadPool *pool)
Constructor.
Definition: xsens_threadpool.cpp:232
xsens::PooledTask::~PooledTask
~PooledTask()
Definition: xsens_threadpool.cpp:150
UINT32_MAX
#define UINT32_MAX
Definition: pstdint.h:481
xsens::PooledTask::m_completedCondition
WaitCondition m_completedCondition
Definition: xsens_threadpool.cpp:184
xsens::PooledThread
A class that contains a thread that runs in a ThreadPool to execute tasks.
Definition: xsens_threadpool.cpp:210
xsens::ThreadPool::m_threads
std::set< PooledThread * > m_threads
Definition: xsens_threadpool.h:110
xsens::Lock::lock
bool lock()
Locks the unlocked mutex.
Definition: xsens_mutex.h:1019
xsens::PooledTask::m_completedMutex
Mutex m_completedMutex
Definition: xsens_threadpool.cpp:183
xsens::ThreadPoolTask::needToWaitFor
virtual unsigned int needToWaitFor()
This function gets called by PooledThread when the exec() function returns false to determine if we s...
Definition: xsens_threadpool.cpp:119
xsens::PooledTask::m_task
ThreadPoolTask * m_task
The task that is to be executed.
Definition: xsens_threadpool.cpp:133
xsYield
#define xsYield()
Release the remainder of the timeslice so other operations can run.
Definition: xsthread.h:182
xsens::ThreadPool::setPoolSize
void setPoolSize(unsigned int poolsize)
Set the number of threads in the ThreadPool.
Definition: xsens_threadpool.cpp:445
xsens::PooledThread::~PooledThread
~PooledThread()
Destructor.
Definition: xsens_threadpool.cpp:243
xsens::ThreadPool::m_terminating
volatile std::atomic_bool m_terminating
Definition: xsens_threadpool.h:118
XRV_ERROR
@ XRV_ERROR
256: A generic error occurred
Definition: xsresultvalue.h:126
xsens::TaskCompletionWaiter::~TaskCompletionWaiter
virtual ~TaskCompletionWaiter()
Destructor.
Definition: xsens_threadpool.cpp:820
xsens::WaitCondition::broadcast
void broadcast()
Unblock all waiting threads.
Definition: threading.cpp:977
xsens::StandardThread::startThread
bool startThread(const char *name=NULL)
Starts the thread.
Definition: threading.cpp:281
xsens::ThreadPool::cancelTask
void cancelTask(TaskId id, bool wait=true) noexcept
Remove the task with the supplied id if it exists, waits for the task to be finished.
Definition: xsens_threadpool.cpp:546
xsens::ThreadPool::reportTaskPaused
void reportTaskPaused(std::shared_ptr< PooledTask >)
Called by PooledThread to notify the ThreadPool that its running task has to wait for something.
Definition: xsens_threadpool.cpp:644
xsens::ThreadPool::TaskId
unsigned int TaskId
A type definition of a task ID.
Definition: xsens_threadpool.h:102
xsens::TaskCompletionWaiter::m_pool
ThreadPool * m_pool
Definition: xsens_threadpool.h:158
xsens::ThreadPool::m_executing
std::map< TaskId, std::shared_ptr< PooledTask > > m_executing
Definition: xsens_threadpool.h:113
xsens::TaskCompletionWaiter::exec
virtual bool exec()
task function, checks if there are tasks left that we should be waiting for
Definition: xsens_threadpool.cpp:827
uint32_t
unsigned int uint32_t
Definition: pstdint.h:485
xsens::WaitCondition::wait
bool wait()
Wait until we're signalled to continue.
Definition: threading.cpp:994
xsens::ThreadPool::count
unsigned int count()
Return the number of tasks that are currently in the queue or being executed.
Definition: xsens_threadpool.cpp:436
xsens::ThreadPool::getNextTask
std::shared_ptr< PooledTask > getNextTask()
Return the next task that should be run and mark it as executing.
Definition: xsens_threadpool.cpp:621
xsens::ThreadPool::m_safe
Mutex m_safe
Definition: xsens_threadpool.h:115
xsens::processorCount
int processorCount()
Returns the number of processor cores in the current system.
Definition: xsens_threadpool.cpp:94
xsens::StandardThread::isTerminating
bool isTerminating() volatile const noexcept
Returns whether the thread should (have) terminate(d)
Definition: threading.cpp:182
xsens::ThreadPool::destroy
static void destroy()
Destroy the global thread pool object.
Definition: xsens_threadpool.cpp:760
xsens::TaskCompletionWaiter::m_waitList
std::list< unsigned int > m_waitList
Definition: xsens_threadpool.h:159
xsens::PooledThread::isBusy
bool isBusy() const
Return whether the thread is currently executing a task (true) or not (false)
Definition: xsens_threadpool.cpp:324
xsens::PooledTask::m_id
unsigned int m_id
The id that was assigned to the task by the ThreadPool.
Definition: xsens_threadpool.cpp:134
xsens::Mutex
A base mutex class.
Definition: xsens_mutex.h:132
xsens::PooledTask::m_canceling
volatile std::atomic< bool > m_canceling
Definition: xsens_threadpool.cpp:137
xsens::TaskCompletionWaiter::addWaitId
void addWaitId(unsigned int id)
Add the id of a task to wiat for to the list.
Definition: xsens_threadpool.cpp:854
xsens::PooledTask::signalCompleted
void signalCompleted() noexcept
Definition: xsens_threadpool.cpp:170
xsens::ThreadPool::addTask
TaskId addTask(ThreadPoolTask *task, TaskId afterId=0)
Add a task to be executed by the threadpool.
Definition: xsens_threadpool.cpp:399
xsens::ThreadPool::m_tasks
std::deque< std::shared_ptr< PooledTask > > m_tasks
Definition: xsens_threadpool.h:111
xsens::ThreadPool::resume
void resume()
Resume execution of tasks.
Definition: xsens_threadpool.cpp:694
xsens::PooledThread::completedCount
unsigned int completedCount() const
Return the number of tasks successfully executed by the thread.
Definition: xsens_threadpool.cpp:338
xsens::PooledThread::executedCount
unsigned int executedCount() const
Return the number of tasks successfully or partially executed by the thread.
Definition: xsens_threadpool.cpp:331
xsens::ThreadPoolTask::taskId
unsigned int taskId() const
Returns the task ID of the task or 0 if it doesn't have a proper ID (yet)
Definition: xsens_threadpool.cpp:196
xsens::PooledThread::m_pool
ThreadPool * m_pool
The pool that contains this thread.
Definition: xsens_threadpool.cpp:222
xsens::ThreadPool::waitForCompletion
void waitForCompletion(TaskId id)
Wait for the task with the given ID to complete.
Definition: xsens_threadpool.cpp:586
xsens::ThreadPool::failedCount
unsigned int failedCount(unsigned int thread) const
Return the number of tasks that failed to execute in the given thread.
Definition: xsens_threadpool.cpp:724
xsens::gManagePool
bool gManagePool
Definition: xsens_threadpool.cpp:734
xsens::ThreadPool::instance
static ThreadPool * instance() noexcept
Return the global thread pool object, it will be created if it did not yet exist.
Definition: xsens_threadpool.cpp:737
xsens::Lock::unlock
bool unlock() noexcept
Unlocks the locked mutex.
Definition: xsens_mutex.h:1032
threading.h
xsens::PooledThread::failedCount
unsigned int failedCount() const
Return the number of tasks that failed to execute properly.
Definition: xsens_threadpool.cpp:345
xsens::PooledThread::m_task
std::shared_ptr< PooledTask > m_task
The task that is currently being executed or NULL if the thread is idle.
Definition: xsens_threadpool.cpp:223
xsens::StandardThread
A class for a standard thread that has to perform the same action repeatedly.
Definition: threading.h:83
xsens::ThreadPoolTask::m_container
PooledTask * m_container
Definition: xsens_threadpool.h:95
xsens::StandardThread::stopThread
void stopThread(void) noexcept
Tells the thread to stop and waits for it to end.
Definition: threading.cpp:334
xsens::ThreadPool::setPool
static void setPool(ThreadPool *pool)
Set the threadpool to use.
Definition: xsens_threadpool.cpp:776
xsens::TaskCompletionWaiter::TaskCompletionWaiter
TaskCompletionWaiter(ThreadPool *pool=ThreadPool::instance())
Constructor, sets up an empty waiter.
Definition: xsens_threadpool.cpp:807
xsens::Lock
A base class for a Lock.
Definition: xsens_mutex.h:947
xsens::PooledTask::m_dependentTasks
std::vector< std::shared_ptr< PooledTask > > m_dependentTasks
A list of tasks that are waiting for this task to complete.
Definition: xsens_threadpool.cpp:135
int32_t
signed int int32_t
Definition: pstdint.h:515
xsens::PooledThread::m_failed
unsigned int m_failed
The number of tasks that this thread has failed to complete so far due to an exception.
Definition: xsens_threadpool.cpp:226
xsens::ThreadPool::findTask
std::shared_ptr< PooledTask > findTask(TaskId id)
Find a task with the supplied id.
Definition: xsens_threadpool.cpp:504
xsens::WaitCondition
A platform independent wait condition implementation.
Definition: xsens_mutex.h:1806
xsens::gPool
ThreadPool * gPool
Definition: xsens_threadpool.cpp:733
xsens::ThreadPool
This class creates and maintains a number of threads that can execute finite-length tasks.
Definition: xsens_threadpool.h:99
xsens_threadpool.h
xsens::PooledThread::isIdle
bool isIdle() const
Return whether the thread is currently executing a task (false) or not (true)
Definition: xsens_threadpool.cpp:317
xsens::StandardThread::getThreadId
XsThreadId getThreadId(void) const
Definition: threading.h:142
xsens::PooledThread::m_completed
unsigned int m_completed
The number of tasks that this thread has completed so far, excluding incomplete tasks.
Definition: xsens_threadpool.cpp:225
xsens::TaskSet
std::map< unsigned int, std::shared_ptr< PooledTask > > TaskSet
Definition: xsens_threadpool.cpp:88
xsens::ThreadPool::reportTaskComplete
void reportTaskComplete(std::shared_ptr< PooledTask >)
Called by PooledThread to notify the ThreadPool that a task was completed.
Definition: xsens_threadpool.cpp:596
xsens::ThreadPool::PooledThread
friend class PooledThread
Definition: xsens_threadpool.h:108
xsens::PooledTask
A class that contains a task and some administrative stuff.
Definition: xsens_threadpool.cpp:130
xsens::ThreadPoolTask::exec
virtual bool exec()=0
xsens
Definition: threading.cpp:78
xsens::ThreadPool::poolSize
unsigned int poolSize() const
Return the number of threads in the pool.
Definition: xsens_threadpool.cpp:497
xsens::ThreadPool::~ThreadPool
~ThreadPool()
Destructor, clears any pending tasks and destroys the threads.
Definition: xsens_threadpool.cpp:371
xsens::ThreadPool::executedCount
unsigned int executedCount(unsigned int thread) const
Return the number of tasks executed (including paused) by the given thread.
Definition: xsens_threadpool.cpp:702
xsens::ThreadPool::m_delaying
std::map< TaskId, std::shared_ptr< PooledTask > > m_delaying
Definition: xsens_threadpool.h:114
xsens::ThreadPool::m_nextId
TaskId m_nextId
Definition: xsens_threadpool.h:116


xsens_mti_driver
Author(s):
autogenerated on Sun Sep 3 2023 02:43:20