00001 // Copyright 2017 The Abseil Authors. 00002 // 00003 // Licensed under the Apache License, Version 2.0 (the "License"); 00004 // you may not use this file except in compliance with the License. 00005 // You may obtain a copy of the License at 00006 // 00007 // https://www.apache.org/licenses/LICENSE-2.0 00008 // 00009 // Unless required by applicable law or agreed to in writing, software 00010 // distributed under the License is distributed on an "AS IS" BASIS, 00011 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 00012 // See the License for the specific language governing permissions and 00013 // limitations under the License. 00014 00015 #ifndef ABSL_SYNCHRONIZATION_INTERNAL_THREAD_POOL_H_ 00016 #define ABSL_SYNCHRONIZATION_INTERNAL_THREAD_POOL_H_ 00017 00018 #include <cassert> 00019 #include <cstddef> 00020 #include <functional> 00021 #include <queue> 00022 #include <thread> // NOLINT(build/c++11) 00023 #include <vector> 00024 00025 #include "absl/base/thread_annotations.h" 00026 #include "absl/synchronization/mutex.h" 00027 00028 namespace absl { 00029 namespace synchronization_internal { 00030 00031 // A simple ThreadPool implementation for tests. 00032 class ThreadPool { 00033 public: 00034 explicit ThreadPool(int num_threads) { 00035 for (int i = 0; i < num_threads; ++i) { 00036 threads_.push_back(std::thread(&ThreadPool::WorkLoop, this)); 00037 } 00038 } 00039 00040 ThreadPool(const ThreadPool &) = delete; 00041 ThreadPool &operator=(const ThreadPool &) = delete; 00042 00043 ~ThreadPool() { 00044 { 00045 absl::MutexLock l(&mu_); 00046 for (size_t i = 0; i < threads_.size(); i++) { 00047 queue_.push(nullptr); // Shutdown signal. 00048 } 00049 } 00050 for (auto &t : threads_) { 00051 t.join(); 00052 } 00053 } 00054 00055 // Schedule a function to be run on a ThreadPool thread immediately. 00056 void Schedule(std::function<void()> func) { 00057 assert(func != nullptr); 00058 absl::MutexLock l(&mu_); 00059 queue_.push(std::move(func)); 00060 } 00061 00062 private: 00063 bool WorkAvailable() const EXCLUSIVE_LOCKS_REQUIRED(mu_) { 00064 return !queue_.empty(); 00065 } 00066 00067 void WorkLoop() { 00068 while (true) { 00069 std::function<void()> func; 00070 { 00071 absl::MutexLock l(&mu_); 00072 mu_.Await(absl::Condition(this, &ThreadPool::WorkAvailable)); 00073 func = std::move(queue_.front()); 00074 queue_.pop(); 00075 } 00076 if (func == nullptr) { // Shutdown signal. 00077 break; 00078 } 00079 func(); 00080 } 00081 } 00082 00083 absl::Mutex mu_; 00084 std::queue<std::function<void()>> queue_ GUARDED_BY(mu_); 00085 std::vector<std::thread> threads_; 00086 }; 00087 00088 } // namespace synchronization_internal 00089 } // namespace absl 00090 00091 #endif // ABSL_SYNCHRONIZATION_INTERNAL_THREAD_POOL_H_