$search
00001 // 00002 // task_io_service_2lock.hpp 00003 // ~~~~~~~~~~~~~~~~~~~~~~~~~ 00004 // 00005 // Copyright (c) 2003-2008 Christopher M. Kohlhoff (chris at kohlhoff dot com) 00006 // 00007 // Distributed under the Boost Software License, Version 1.0. (See accompanying 00008 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 00009 // 00010 00011 #ifndef ASIO_DETAIL_TASK_IO_SERVICE_2LOCK_HPP 00012 #define ASIO_DETAIL_TASK_IO_SERVICE_2LOCK_HPP 00013 00014 #if defined(_MSC_VER) && (_MSC_VER >= 1200) 00015 # pragma once 00016 #endif // defined(_MSC_VER) && (_MSC_VER >= 1200) 00017 00018 #include "asio/detail/push_options.hpp" 00019 00020 #include "asio/error_code.hpp" 00021 #include "asio/io_service.hpp" 00022 #include "asio/detail/call_stack.hpp" 00023 #include "asio/detail/event.hpp" 00024 #include "asio/detail/handler_alloc_helpers.hpp" 00025 #include "asio/detail/handler_invoke_helpers.hpp" 00026 #include "asio/detail/indirect_handler_queue.hpp" 00027 #include "asio/detail/mutex.hpp" 00028 #include "asio/detail/service_base.hpp" 00029 #include "asio/detail/task_io_service_fwd.hpp" 00030 00031 #include "asio/detail/push_options.hpp" 00032 #include <boost/detail/atomic_count.hpp> 00033 #include "asio/detail/pop_options.hpp" 00034 00035 namespace asio { 00036 namespace detail { 00037 00038 // An alternative task_io_service implementation based on a two-lock queue. 00039 00040 template <typename Task> 00041 class task_io_service 00042 : public asio::detail::service_base<task_io_service<Task> > 00043 { 00044 public: 00045 typedef indirect_handler_queue handler_queue; 00046 00047 // Constructor. 00048 task_io_service(asio::io_service& io_service) 00049 : asio::detail::service_base<task_io_service<Task> >(io_service), 00050 front_mutex_(), 00051 back_mutex_(), 00052 task_(use_service<Task>(io_service)), 00053 outstanding_work_(0), 00054 front_stopped_(false), 00055 back_stopped_(false), 00056 back_shutdown_(false), 00057 back_first_idle_thread_(0), 00058 back_task_thread_(0) 00059 { 00060 handler_queue_.push(&task_handler_); 00061 } 00062 00063 void init(size_t /*concurrency_hint*/) 00064 { 00065 } 00066 00067 // Destroy all user-defined handler objects owned by the service. 00068 void shutdown_service() 00069 { 00070 asio::detail::mutex::scoped_lock back_lock(back_mutex_); 00071 back_shutdown_ = true; 00072 back_lock.unlock(); 00073 00074 // Destroy handler objects. 00075 while (handler_queue::handler* h = handler_queue_.pop()) 00076 if (h != &task_handler_) 00077 h->destroy(); 00078 00079 // Reset handler queue to initial state. 00080 handler_queue_.push(&task_handler_); 00081 } 00082 00083 // Run the event loop until interrupted or no more work. 00084 size_t run(asio::error_code& ec) 00085 { 00086 if (outstanding_work_ == 0) 00087 { 00088 stop(); 00089 ec = asio::error_code(); 00090 return 0; 00091 } 00092 00093 typename call_stack<task_io_service>::context ctx(this); 00094 00095 idle_thread_info this_idle_thread; 00096 this_idle_thread.next = 0; 00097 00098 size_t n = 0; 00099 while (do_one(&this_idle_thread, ec)) 00100 if (n != (std::numeric_limits<size_t>::max)()) 00101 ++n; 00102 return n; 00103 } 00104 00105 // Run until interrupted or one operation is performed. 00106 size_t run_one(asio::error_code& ec) 00107 { 00108 if (outstanding_work_ == 0) 00109 { 00110 stop(); 00111 ec = asio::error_code(); 00112 return 0; 00113 } 00114 00115 typename call_stack<task_io_service>::context ctx(this); 00116 00117 idle_thread_info this_idle_thread; 00118 this_idle_thread.next = 0; 00119 00120 return do_one(&this_idle_thread, ec); 00121 } 00122 00123 // Poll for operations without blocking. 00124 size_t poll(asio::error_code& ec) 00125 { 00126 if (outstanding_work_ == 0) 00127 { 00128 stop(); 00129 ec = asio::error_code(); 00130 return 0; 00131 } 00132 00133 typename call_stack<task_io_service>::context ctx(this); 00134 00135 size_t n = 0; 00136 while (do_one(0, ec)) 00137 if (n != (std::numeric_limits<size_t>::max)()) 00138 ++n; 00139 return n; 00140 } 00141 00142 // Poll for one operation without blocking. 00143 size_t poll_one(asio::error_code& ec) 00144 { 00145 if (outstanding_work_ == 0) 00146 { 00147 stop(); 00148 ec = asio::error_code(); 00149 return 0; 00150 } 00151 00152 typename call_stack<task_io_service>::context ctx(this); 00153 00154 return do_one(0, ec); 00155 } 00156 00157 // Interrupt the event processing loop. 00158 void stop() 00159 { 00160 asio::detail::mutex::scoped_lock front_lock(front_mutex_); 00161 front_stopped_ = true; 00162 front_lock.unlock(); 00163 00164 asio::detail::mutex::scoped_lock back_lock(back_mutex_); 00165 back_stopped_ = true; 00166 interrupt_all_idle_threads(back_lock); 00167 } 00168 00169 // Reset in preparation for a subsequent run invocation. 00170 void reset() 00171 { 00172 asio::detail::mutex::scoped_lock front_lock(front_mutex_); 00173 front_stopped_ = false; 00174 front_lock.unlock(); 00175 00176 asio::detail::mutex::scoped_lock back_lock(back_mutex_); 00177 back_stopped_ = false; 00178 } 00179 00180 // Notify that some work has started. 00181 void work_started() 00182 { 00183 ++outstanding_work_; 00184 } 00185 00186 // Notify that some work has finished. 00187 void work_finished() 00188 { 00189 if (--outstanding_work_ == 0) 00190 stop(); 00191 } 00192 00193 // Request invocation of the given handler. 00194 template <typename Handler> 00195 void dispatch(Handler handler) 00196 { 00197 if (call_stack<task_io_service>::contains(this)) 00198 asio_handler_invoke_helpers::invoke(handler, &handler); 00199 else 00200 post(handler); 00201 } 00202 00203 // Request invocation of the given handler and return immediately. 00204 template <typename Handler> 00205 void post(Handler handler) 00206 { 00207 // Allocate and construct an operation to wrap the handler. 00208 handler_queue::scoped_ptr ptr(handler_queue::wrap(handler)); 00209 00210 asio::detail::mutex::scoped_lock back_lock(back_mutex_); 00211 00212 // If the service has been shut down we silently discard the handler. 00213 if (back_shutdown_) 00214 return; 00215 00216 // Add the handler to the end of the queue. 00217 handler_queue_.push(ptr.get()); 00218 ptr.release(); 00219 00220 // An undelivered handler is treated as unfinished work. 00221 ++outstanding_work_; 00222 00223 // Wake up a thread to execute the handler. 00224 interrupt_one_idle_thread(back_lock); 00225 } 00226 00227 private: 00228 struct idle_thread_info; 00229 00230 size_t do_one(idle_thread_info* this_idle_thread, 00231 asio::error_code& ec) 00232 { 00233 bool task_has_run = false; 00234 for (;;) 00235 { 00236 // The front lock must be held before we can pop items from the queue. 00237 asio::detail::mutex::scoped_lock front_lock(front_mutex_); 00238 if (front_stopped_) 00239 { 00240 ec = asio::error_code(); 00241 return 0; 00242 } 00243 00244 if (handler_queue::handler* h = handler_queue_.pop()) 00245 { 00246 if (h == &task_handler_) 00247 { 00248 bool more_handlers = handler_queue_.poppable(); 00249 unsigned long front_version = handler_queue_.front_version(); 00250 front_lock.unlock(); 00251 00252 // The task is always added to the back of the queue when we exit 00253 // this block. 00254 task_cleanup c(*this); 00255 00256 // If we're polling and the task has already run then we're done. 00257 bool polling = !this_idle_thread; 00258 if (task_has_run && polling) 00259 { 00260 ec = asio::error_code(); 00261 return 0; 00262 } 00263 00264 // If we're considering going idle we need to check whether the queue 00265 // is still empty. If it is, add the thread to the list of idle 00266 // threads. 00267 if (!more_handlers && !polling) 00268 { 00269 asio::detail::mutex::scoped_lock back_lock(back_mutex_); 00270 if (back_stopped_) 00271 { 00272 ec = asio::error_code(); 00273 return 0; 00274 } 00275 else if (front_version == handler_queue_.back_version()) 00276 { 00277 back_task_thread_ = this_idle_thread; 00278 } 00279 else 00280 { 00281 more_handlers = true; 00282 } 00283 } 00284 00285 // Run the task. May throw an exception. Only block if the handler 00286 // queue is empty and we're not polling, otherwise we want to return 00287 // as soon as possible. 00288 task_has_run = true; 00289 task_.run(!more_handlers && !polling); 00290 } 00291 else 00292 { 00293 front_lock.unlock(); 00294 handler_cleanup c(*this); 00295 00296 // Invoke the handler. May throw an exception. 00297 h->invoke(); // invoke() deletes the handler object 00298 00299 ec = asio::error_code(); 00300 return 1; 00301 } 00302 } 00303 else if (this_idle_thread) 00304 { 00305 unsigned long front_version = handler_queue_.front_version(); 00306 front_lock.unlock(); 00307 00308 // If we're considering going idle we need to check whether the queue 00309 // is still empty. If it is, add the thread to the list of idle 00310 // threads. 00311 asio::detail::mutex::scoped_lock back_lock(back_mutex_); 00312 if (back_stopped_) 00313 { 00314 ec = asio::error_code(); 00315 return 0; 00316 } 00317 else if (front_version == handler_queue_.back_version()) 00318 { 00319 this_idle_thread->next = back_first_idle_thread_; 00320 back_first_idle_thread_ = this_idle_thread; 00321 this_idle_thread->wakeup_event.clear(back_lock); 00322 this_idle_thread->wakeup_event.wait(back_lock); 00323 } 00324 } 00325 else 00326 { 00327 ec = asio::error_code(); 00328 return 0; 00329 } 00330 } 00331 } 00332 00333 // Interrupt a single idle thread. 00334 void interrupt_one_idle_thread( 00335 asio::detail::mutex::scoped_lock& back_lock) 00336 { 00337 if (back_first_idle_thread_) 00338 { 00339 idle_thread_info* idle_thread = back_first_idle_thread_; 00340 back_first_idle_thread_ = idle_thread->next; 00341 idle_thread->next = 0; 00342 idle_thread->wakeup_event.signal(back_lock); 00343 } 00344 else if (back_task_thread_) 00345 { 00346 back_task_thread_ = 0; 00347 task_.interrupt(); 00348 } 00349 } 00350 00351 // Interrupt all idle threads. 00352 void interrupt_all_idle_threads( 00353 asio::detail::mutex::scoped_lock& back_lock) 00354 { 00355 while (back_first_idle_thread_) 00356 { 00357 idle_thread_info* idle_thread = back_first_idle_thread_; 00358 back_first_idle_thread_ = idle_thread->next; 00359 idle_thread->next = 0; 00360 idle_thread->wakeup_event.signal(back_lock); 00361 } 00362 00363 if (back_task_thread_) 00364 { 00365 back_task_thread_ = 0; 00366 task_.interrupt(); 00367 } 00368 } 00369 00370 // Helper class to perform task-related operations on block exit. 00371 class task_cleanup; 00372 friend class task_cleanup; 00373 class task_cleanup 00374 { 00375 public: 00376 task_cleanup(task_io_service& task_io_svc) 00377 : task_io_service_(task_io_svc) 00378 { 00379 } 00380 00381 ~task_cleanup() 00382 { 00383 // Reinsert the task at the end of the handler queue. 00384 asio::detail::mutex::scoped_lock back_lock( 00385 task_io_service_.back_mutex_); 00386 task_io_service_.back_task_thread_ = 0; 00387 task_io_service_.handler_queue_.push(&task_io_service_.task_handler_); 00388 } 00389 00390 private: 00391 task_io_service& task_io_service_; 00392 }; 00393 00394 // Helper class to perform handler-related operations on block exit. 00395 class handler_cleanup 00396 { 00397 public: 00398 handler_cleanup(task_io_service& task_io_svc) 00399 : task_io_service_(task_io_svc) 00400 { 00401 } 00402 00403 ~handler_cleanup() 00404 { 00405 task_io_service_.work_finished(); 00406 } 00407 00408 private: 00409 task_io_service& task_io_service_; 00410 }; 00411 00412 // Mutexes to protect access to internal data. 00413 asio::detail::mutex front_mutex_; 00414 asio::detail::mutex back_mutex_; 00415 00416 // The task to be run by this service. 00417 Task& task_; 00418 00419 // Handler object to represent the position of the task in the queue. 00420 class task_handler 00421 : public handler_queue::handler 00422 { 00423 public: 00424 task_handler() 00425 : handler_queue::handler(0, 0) 00426 { 00427 } 00428 } task_handler_; 00429 00430 // The count of unfinished work. 00431 boost::detail::atomic_count outstanding_work_; 00432 00433 // The queue of handlers that are ready to be delivered. 00434 handler_queue handler_queue_; 00435 00436 // Flag to indicate that the dispatcher has been stopped. 00437 bool front_stopped_; 00438 bool back_stopped_; 00439 00440 // Flag to indicate that the dispatcher has been shut down. 00441 bool back_shutdown_; 00442 00443 // Structure containing information about an idle thread. 00444 struct idle_thread_info 00445 { 00446 event wakeup_event; 00447 idle_thread_info* next; 00448 }; 00449 00450 // The number of threads that are currently idle. 00451 idle_thread_info* back_first_idle_thread_; 00452 00453 // The thread that is currently blocked on the task. 00454 idle_thread_info* back_task_thread_; 00455 }; 00456 00457 } // namespace detail 00458 } // namespace asio 00459 00460 #include "asio/detail/pop_options.hpp" 00461 00462 #endif // ASIO_DETAIL_TASK_IO_SERVICE_2LOCK_HPP