$search
00001 // 00002 // task_io_service.hpp 00003 // ~~~~~~~~~~~~~~~~~~~ 00004 // 00005 // Copyright (c) 2003-2008 Christopher M. Kohlhoff (chris at kohlhoff dot com) 00006 // 00007 // Distributed under the Boost Software License, Version 1.0. (See accompanying 00008 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 00009 // 00010 00011 #ifndef ASIO_DETAIL_TASK_IO_SERVICE_HPP 00012 #define ASIO_DETAIL_TASK_IO_SERVICE_HPP 00013 00014 #if defined(_MSC_VER) && (_MSC_VER >= 1200) 00015 # pragma once 00016 #endif // defined(_MSC_VER) && (_MSC_VER >= 1200) 00017 00018 #if defined(ASIO_ENABLE_TWO_LOCK_QUEUE) 00019 #include "asio/detail/task_io_service_2lock.hpp" 00020 #else // defined(ASIO_ENABLE_TWO_LOCK_QUEUE) 00021 00022 #include "asio/detail/push_options.hpp" 00023 00024 #include "asio/error_code.hpp" 00025 #include "asio/io_service.hpp" 00026 #include "asio/detail/call_stack.hpp" 00027 #include "asio/detail/event.hpp" 00028 #include "asio/detail/handler_alloc_helpers.hpp" 00029 #include "asio/detail/handler_invoke_helpers.hpp" 00030 #include "asio/detail/handler_queue.hpp" 00031 #include "asio/detail/mutex.hpp" 00032 #include "asio/detail/service_base.hpp" 00033 #include "asio/detail/task_io_service_fwd.hpp" 00034 00035 namespace asio { 00036 namespace detail { 00037 00038 template <typename Task> 00039 class task_io_service 00040 : public asio::detail::service_base<task_io_service<Task> > 00041 { 00042 public: 00043 // Constructor. 00044 task_io_service(asio::io_service& io_service) 00045 : asio::detail::service_base<task_io_service<Task> >(io_service), 00046 mutex_(), 00047 task_(use_service<Task>(io_service)), 00048 task_interrupted_(true), 00049 outstanding_work_(0), 00050 stopped_(false), 00051 shutdown_(false), 00052 first_idle_thread_(0) 00053 { 00054 handler_queue_.push(&task_handler_); 00055 } 00056 00057 void init(size_t /*concurrency_hint*/) 00058 { 00059 } 00060 00061 // Destroy all user-defined handler objects owned by the service. 00062 void shutdown_service() 00063 { 00064 asio::detail::mutex::scoped_lock lock(mutex_); 00065 shutdown_ = true; 00066 lock.unlock(); 00067 00068 // Destroy handler objects. 00069 while (!handler_queue_.empty()) 00070 { 00071 handler_queue::handler* h = handler_queue_.front(); 00072 handler_queue_.pop(); 00073 if (h != &task_handler_) 00074 h->destroy(); 00075 } 00076 00077 // Reset handler queue to initial state. 00078 handler_queue_.push(&task_handler_); 00079 } 00080 00081 // Run the event loop until interrupted or no more work. 00082 size_t run(asio::error_code& ec) 00083 { 00084 typename call_stack<task_io_service>::context ctx(this); 00085 00086 idle_thread_info this_idle_thread; 00087 this_idle_thread.next = 0; 00088 00089 asio::detail::mutex::scoped_lock lock(mutex_); 00090 00091 size_t n = 0; 00092 while (do_one(lock, &this_idle_thread, ec)) 00093 if (n != (std::numeric_limits<size_t>::max)()) 00094 ++n; 00095 return n; 00096 } 00097 00098 // Run until interrupted or one operation is performed. 00099 size_t run_one(asio::error_code& ec) 00100 { 00101 typename call_stack<task_io_service>::context ctx(this); 00102 00103 idle_thread_info this_idle_thread; 00104 this_idle_thread.next = 0; 00105 00106 asio::detail::mutex::scoped_lock lock(mutex_); 00107 00108 return do_one(lock, &this_idle_thread, ec); 00109 } 00110 00111 // Poll for operations without blocking. 00112 size_t poll(asio::error_code& ec) 00113 { 00114 typename call_stack<task_io_service>::context ctx(this); 00115 00116 asio::detail::mutex::scoped_lock lock(mutex_); 00117 00118 size_t n = 0; 00119 while (do_one(lock, 0, ec)) 00120 if (n != (std::numeric_limits<size_t>::max)()) 00121 ++n; 00122 return n; 00123 } 00124 00125 // Poll for one operation without blocking. 00126 size_t poll_one(asio::error_code& ec) 00127 { 00128 typename call_stack<task_io_service>::context ctx(this); 00129 00130 asio::detail::mutex::scoped_lock lock(mutex_); 00131 00132 return do_one(lock, 0, ec); 00133 } 00134 00135 // Interrupt the event processing loop. 00136 void stop() 00137 { 00138 asio::detail::mutex::scoped_lock lock(mutex_); 00139 stop_all_threads(lock); 00140 } 00141 00142 // Reset in preparation for a subsequent run invocation. 00143 void reset() 00144 { 00145 asio::detail::mutex::scoped_lock lock(mutex_); 00146 stopped_ = false; 00147 } 00148 00149 // Notify that some work has started. 00150 void work_started() 00151 { 00152 asio::detail::mutex::scoped_lock lock(mutex_); 00153 ++outstanding_work_; 00154 } 00155 00156 // Notify that some work has finished. 00157 void work_finished() 00158 { 00159 asio::detail::mutex::scoped_lock lock(mutex_); 00160 if (--outstanding_work_ == 0) 00161 stop_all_threads(lock); 00162 } 00163 00164 // Request invocation of the given handler. 00165 template <typename Handler> 00166 void dispatch(Handler handler) 00167 { 00168 if (call_stack<task_io_service>::contains(this)) 00169 asio_handler_invoke_helpers::invoke(handler, &handler); 00170 else 00171 post(handler); 00172 } 00173 00174 // Request invocation of the given handler and return immediately. 00175 template <typename Handler> 00176 void post(Handler handler) 00177 { 00178 // Allocate and construct an operation to wrap the handler. 00179 handler_queue::scoped_ptr ptr(handler_queue::wrap(handler)); 00180 00181 asio::detail::mutex::scoped_lock lock(mutex_); 00182 00183 // If the service has been shut down we silently discard the handler. 00184 if (shutdown_) 00185 return; 00186 00187 // Add the handler to the end of the queue. 00188 handler_queue_.push(ptr.get()); 00189 ptr.release(); 00190 00191 // An undelivered handler is treated as unfinished work. 00192 ++outstanding_work_; 00193 00194 // Wake up a thread to execute the handler. 00195 if (!interrupt_one_idle_thread(lock)) 00196 { 00197 if (!task_interrupted_) 00198 { 00199 task_interrupted_ = true; 00200 task_.interrupt(); 00201 } 00202 } 00203 } 00204 00205 private: 00206 struct idle_thread_info; 00207 00208 size_t do_one(asio::detail::mutex::scoped_lock& lock, 00209 idle_thread_info* this_idle_thread, asio::error_code& ec) 00210 { 00211 if (outstanding_work_ == 0 && !stopped_) 00212 { 00213 stop_all_threads(lock); 00214 ec = asio::error_code(); 00215 return 0; 00216 } 00217 00218 bool polling = !this_idle_thread; 00219 bool task_has_run = false; 00220 while (!stopped_) 00221 { 00222 if (!handler_queue_.empty()) 00223 { 00224 // Prepare to execute first handler from queue. 00225 handler_queue::handler* h = handler_queue_.front(); 00226 handler_queue_.pop(); 00227 00228 if (h == &task_handler_) 00229 { 00230 bool more_handlers = (!handler_queue_.empty()); 00231 task_interrupted_ = more_handlers || polling; 00232 00233 // If the task has already run and we're polling then we're done. 00234 if (task_has_run && polling) 00235 { 00236 task_interrupted_ = true; 00237 handler_queue_.push(&task_handler_); 00238 ec = asio::error_code(); 00239 return 0; 00240 } 00241 task_has_run = true; 00242 00243 lock.unlock(); 00244 task_cleanup c(lock, *this); 00245 00246 // Run the task. May throw an exception. Only block if the handler 00247 // queue is empty and we have an idle_thread_info object, otherwise 00248 // we want to return as soon as possible. 00249 task_.run(!more_handlers && !polling); 00250 } 00251 else 00252 { 00253 lock.unlock(); 00254 handler_cleanup c(lock, *this); 00255 00256 // Invoke the handler. May throw an exception. 00257 h->invoke(); // invoke() deletes the handler object 00258 00259 ec = asio::error_code(); 00260 return 1; 00261 } 00262 } 00263 else if (this_idle_thread) 00264 { 00265 // Nothing to run right now, so just wait for work to do. 00266 this_idle_thread->next = first_idle_thread_; 00267 first_idle_thread_ = this_idle_thread; 00268 this_idle_thread->wakeup_event.clear(lock); 00269 this_idle_thread->wakeup_event.wait(lock); 00270 } 00271 else 00272 { 00273 ec = asio::error_code(); 00274 return 0; 00275 } 00276 } 00277 00278 ec = asio::error_code(); 00279 return 0; 00280 } 00281 00282 // Stop the task and all idle threads. 00283 void stop_all_threads( 00284 asio::detail::mutex::scoped_lock& lock) 00285 { 00286 stopped_ = true; 00287 interrupt_all_idle_threads(lock); 00288 if (!task_interrupted_) 00289 { 00290 task_interrupted_ = true; 00291 task_.interrupt(); 00292 } 00293 } 00294 00295 // Interrupt a single idle thread. Returns true if a thread was interrupted, 00296 // false if no running thread could be found to interrupt. 00297 bool interrupt_one_idle_thread( 00298 asio::detail::mutex::scoped_lock& lock) 00299 { 00300 if (first_idle_thread_) 00301 { 00302 idle_thread_info* idle_thread = first_idle_thread_; 00303 first_idle_thread_ = idle_thread->next; 00304 idle_thread->next = 0; 00305 idle_thread->wakeup_event.signal(lock); 00306 return true; 00307 } 00308 return false; 00309 } 00310 00311 // Interrupt all idle threads. 00312 void interrupt_all_idle_threads( 00313 asio::detail::mutex::scoped_lock& lock) 00314 { 00315 while (first_idle_thread_) 00316 { 00317 idle_thread_info* idle_thread = first_idle_thread_; 00318 first_idle_thread_ = idle_thread->next; 00319 idle_thread->next = 0; 00320 idle_thread->wakeup_event.signal(lock); 00321 } 00322 } 00323 00324 // Helper class to perform task-related operations on block exit. 00325 class task_cleanup; 00326 friend class task_cleanup; 00327 class task_cleanup 00328 { 00329 public: 00330 task_cleanup(asio::detail::mutex::scoped_lock& lock, 00331 task_io_service& task_io_svc) 00332 : lock_(lock), 00333 task_io_service_(task_io_svc) 00334 { 00335 } 00336 00337 ~task_cleanup() 00338 { 00339 // Reinsert the task at the end of the handler queue. 00340 lock_.lock(); 00341 task_io_service_.task_interrupted_ = true; 00342 task_io_service_.handler_queue_.push(&task_io_service_.task_handler_); 00343 } 00344 00345 private: 00346 asio::detail::mutex::scoped_lock& lock_; 00347 task_io_service& task_io_service_; 00348 }; 00349 00350 // Helper class to perform handler-related operations on block exit. 00351 class handler_cleanup; 00352 friend class handler_cleanup; 00353 class handler_cleanup 00354 { 00355 public: 00356 handler_cleanup(asio::detail::mutex::scoped_lock& lock, 00357 task_io_service& task_io_svc) 00358 : lock_(lock), 00359 task_io_service_(task_io_svc) 00360 { 00361 } 00362 00363 ~handler_cleanup() 00364 { 00365 lock_.lock(); 00366 if (--task_io_service_.outstanding_work_ == 0) 00367 task_io_service_.stop_all_threads(lock_); 00368 } 00369 00370 private: 00371 asio::detail::mutex::scoped_lock& lock_; 00372 task_io_service& task_io_service_; 00373 }; 00374 00375 // Mutex to protect access to internal data. 00376 asio::detail::mutex mutex_; 00377 00378 // The task to be run by this service. 00379 Task& task_; 00380 00381 // Handler object to represent the position of the task in the queue. 00382 class task_handler 00383 : public handler_queue::handler 00384 { 00385 public: 00386 task_handler() 00387 : handler_queue::handler(0, 0) 00388 { 00389 } 00390 } task_handler_; 00391 00392 // Whether the task has been interrupted. 00393 bool task_interrupted_; 00394 00395 // The count of unfinished work. 00396 int outstanding_work_; 00397 00398 // The queue of handlers that are ready to be delivered. 00399 handler_queue handler_queue_; 00400 00401 // Flag to indicate that the dispatcher has been stopped. 00402 bool stopped_; 00403 00404 // Flag to indicate that the dispatcher has been shut down. 00405 bool shutdown_; 00406 00407 // Structure containing information about an idle thread. 00408 struct idle_thread_info 00409 { 00410 event wakeup_event; 00411 idle_thread_info* next; 00412 }; 00413 00414 // The number of threads that are currently idle. 00415 idle_thread_info* first_idle_thread_; 00416 }; 00417 00418 } // namespace detail 00419 } // namespace asio 00420 00421 #include "asio/detail/pop_options.hpp" 00422 00423 #endif // defined(ASIO_ENABLE_TWO_LOCK_QUEUE) 00424 00425 #endif // ASIO_DETAIL_TASK_IO_SERVICE_HPP