Go to the documentation of this file.
88 typedef std::map<unsigned int, std::shared_ptr<PooledTask>>
TaskSet;
90 typedef std::vector<std::shared_ptr<PooledTask>>
TaskList;
98 ::GetSystemInfo(&sysinfo);
99 return (
int) sysinfo.dwNumberOfProcessors;
101 return sysconf(_SC_NPROCESSORS_ONLN);
262 bool complete =
false;
265 if (
m_task->m_task->exec())
270 else if (
m_task->m_canceling)
277 catch (std::exception& e)
279 const std::type_info& et =
typeid(e);
282 char* realname = abi::__cxa_demangle(et.name(), 0, 0, &status);
283 fprintf(stderr,
"ThreadPool: Caught an unhandled %s(\"%s\")\n", realname, e.what());
286 fprintf(stderr,
"ThreadPool: Caught an unhandled %s: %s\n", et.name(), e.what());
294 fprintf(stderr,
"ThreadPool: Caught an unhandled unknown exception\n");
296 m_task->m_task->onError();
319 return (
m_task ==
nullptr);
364 , m_terminating(false)
402 std::shared_ptr<PooledTask> tmp = std::make_shared<PooledTask>();
412 #ifndef ADD_TASK_EXECUTE_NOW
415 std::shared_ptr<PooledTask> after =
findTask(afterId);
418 after->m_dependentTasks.push_back(tmp);
428 task->signalCompleted();
466 thread->stopThread();
474 for (
unsigned int i = (
unsigned int)
m_threads.size(); i < poolsize; ++i)
480 sprintf(bufje,
"Pooled Thread %p", t);
482 const char bufje[] =
"Pooled Thread";
488 throw XsException(
XRV_ERROR,
"Could not start thread for ThreadPool");
519 for (
auto const& tsk :
m_tasks)
523 return std::shared_ptr<PooledTask>();
533 return it->second->m_threadId;
548 Lock safety(&m_safe);
549 TaskSet::iterator it = m_executing.find(
id);
550 if (it != m_executing.end())
552 it->second->m_canceling =
true;
555 waitForCompletion(
id);
559 it = m_delaying.find(
id);
560 if (it != m_delaying.end())
562 reportTaskComplete(it->second);
563 m_delaying.erase(it);
566 it = m_tasksSearch.find(
id);
567 if (it != m_tasksSearch.end())
569 reportTaskComplete(it->second);
570 m_tasksSearch.erase(it);
573 for (
auto tskIt = m_tasks.begin(); tskIt != m_tasks.end(); ++tskIt)
575 if ((*tskIt)->m_id ==
id)
577 reportTaskComplete(*tskIt);
578 m_tasks.erase(tskIt);
588 std::shared_ptr<PooledTask> task =
findTask(
id);
590 task->waitForCompletion();
601 TaskSet::iterator it =
m_executing.find(task->m_id);
606 for (TaskList::iterator dep = task->m_dependentTasks.begin(); dep != task->m_dependentTasks.end(); ++dep)
616 task->signalCompleted();
626 std::shared_ptr<PooledTask> tmp =
m_tasks.front();
633 return std::shared_ptr<PooledTask>();
649 TaskSet::iterator it =
m_executing.find(task->m_id);
654 unsigned int waitForId = task->m_task->needToWaitFor();
657 std::shared_ptr<PooledTask> after =
findTask(waitForId);
660 after->m_dependentTasks.push_back(task);
678 Lock safety(&m_safe);
685 for (ThreadSet::const_iterator it = m_threads.begin(); it != m_threads.end(); ++it)
686 while ((*it)->isBusy())
705 for (ThreadSet::const_iterator it =
m_threads.begin(); it !=
m_threads.end(); ++it, ++i)
707 return (*it)->executedCount();
716 for (ThreadSet::const_iterator it =
m_threads.begin(); it !=
m_threads.end(); ++it, ++i)
718 return (*it)->completedCount();
727 for (ThreadSet::const_iterator it =
m_threads.begin(); it !=
m_threads.end(); ++it, ++i)
729 return (*it)->failedCount();
816 , m_waitList(waitlist)
volatile std::atomic_bool m_completed
unsigned int m_executed
The number of tasks that this thread has executed s o far, including incomplete tasks.
A generic task implementation for the thread pool.
bool isCanceling() const
Returns true if the task has been told to cancel itself.
void suspend(bool wait=false) noexcept
Suspend execution of tasks, any currently executing tasks will run to completion, but queued tasks wi...
std::vector< std::shared_ptr< PooledTask > > TaskList
std::set< PooledThread * > ThreadSet
ThreadPool()
Construct a threadpool with a number of threads equal to the number of cores on the PC.
virtual unsigned int needToWaitFor()
If there are wait tasks left, this function will return the id of the first in the list.
bool waitForCompletion(uint32_t timeout=UINT32_MAX)
unsigned int completedCount(unsigned int thread) const
Return the number of tasks completed by the given thread.
XsThreadId taskThreadId(TaskId id)
Find an XsThread with the specified id.
bool doesTaskExist(TaskId id)
Check if a task with the supplied id exists.
int32_t innerFunction(void) override
The inner function of the pooled thread.
std::map< TaskId, std::shared_ptr< PooledTask > > m_tasksSearch
PooledThread(ThreadPool *pool)
Constructor.
WaitCondition m_completedCondition
A class that contains a thread that runs in a ThreadPool to execute tasks.
std::set< PooledThread * > m_threads
bool lock()
Locks the unlocked mutex.
virtual unsigned int needToWaitFor()
This function gets called by PooledThread when the exec() function returns false to determine if we s...
ThreadPoolTask * m_task
The task that is to be executed.
#define xsYield()
Release the remainder of the timeslice so other operations can run.
void setPoolSize(unsigned int poolsize)
Set the number of threads in the ThreadPool.
~PooledThread()
Destructor.
volatile std::atomic_bool m_terminating
@ XRV_ERROR
256: A generic error occurred
virtual ~TaskCompletionWaiter()
Destructor.
void broadcast()
Unblock all waiting threads.
bool startThread(const char *name=NULL)
Starts the thread.
void cancelTask(TaskId id, bool wait=true) noexcept
Remove the task with the supplied id if it exists, waits for the task to be finished.
void reportTaskPaused(std::shared_ptr< PooledTask >)
Called by PooledThread to notify the ThreadPool that its running task has to wait for something.
unsigned int TaskId
A type definition of a task ID.
std::map< TaskId, std::shared_ptr< PooledTask > > m_executing
virtual bool exec()
task function, checks if there are tasks left that we should be waiting for
bool wait()
Wait until we're signalled to continue.
unsigned int count()
Return the number of tasks that are currently in the queue or being executed.
std::shared_ptr< PooledTask > getNextTask()
Return the next task that should be run and mark it as executing.
int processorCount()
Returns the number of processor cores in the current system.
bool isTerminating() volatile const noexcept
Returns whether the thread should (have) terminate(d)
static void destroy()
Destroy the global thread pool object.
std::list< unsigned int > m_waitList
bool isBusy() const
Return whether the thread is currently executing a task (true) or not (false)
unsigned int m_id
The id that was assigned to the task by the ThreadPool.
volatile std::atomic< bool > m_canceling
void addWaitId(unsigned int id)
Add the id of a task to wiat for to the list.
void signalCompleted() noexcept
TaskId addTask(ThreadPoolTask *task, TaskId afterId=0)
Add a task to be executed by the threadpool.
std::deque< std::shared_ptr< PooledTask > > m_tasks
void resume()
Resume execution of tasks.
unsigned int completedCount() const
Return the number of tasks successfully executed by the thread.
unsigned int executedCount() const
Return the number of tasks successfully or partially executed by the thread.
unsigned int taskId() const
Returns the task ID of the task or 0 if it doesn't have a proper ID (yet)
ThreadPool * m_pool
The pool that contains this thread.
void waitForCompletion(TaskId id)
Wait for the task with the given ID to complete.
unsigned int failedCount(unsigned int thread) const
Return the number of tasks that failed to execute in the given thread.
static ThreadPool * instance() noexcept
Return the global thread pool object, it will be created if it did not yet exist.
bool unlock() noexcept
Unlocks the locked mutex.
unsigned int failedCount() const
Return the number of tasks that failed to execute properly.
std::shared_ptr< PooledTask > m_task
The task that is currently being executed or NULL if the thread is idle.
A class for a standard thread that has to perform the same action repeatedly.
void stopThread(void) noexcept
Tells the thread to stop and waits for it to end.
static void setPool(ThreadPool *pool)
Set the threadpool to use.
TaskCompletionWaiter(ThreadPool *pool=ThreadPool::instance())
Constructor, sets up an empty waiter.
std::vector< std::shared_ptr< PooledTask > > m_dependentTasks
A list of tasks that are waiting for this task to complete.
unsigned int m_failed
The number of tasks that this thread has failed to complete so far due to an exception.
std::shared_ptr< PooledTask > findTask(TaskId id)
Find a task with the supplied id.
A platform independent wait condition implementation.
This class creates and maintains a number of threads that can execute finite-length tasks.
bool isIdle() const
Return whether the thread is currently executing a task (false) or not (true)
XsThreadId getThreadId(void) const
unsigned int m_completed
The number of tasks that this thread has completed so far, excluding incomplete tasks.
std::map< unsigned int, std::shared_ptr< PooledTask > > TaskSet
void reportTaskComplete(std::shared_ptr< PooledTask >)
Called by PooledThread to notify the ThreadPool that a task was completed.
friend class PooledThread
A class that contains a task and some administrative stuff.
unsigned int poolSize() const
Return the number of threads in the pool.
~ThreadPool()
Destructor, clears any pending tasks and destroys the threads.
unsigned int executedCount(unsigned int thread) const
Return the number of tasks executed (including paused) by the given thread.
std::map< TaskId, std::shared_ptr< PooledTask > > m_delaying