00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011 #ifndef ASIO_DETAIL_WIN_IOCP_IO_SERVICE_HPP
00012 #define ASIO_DETAIL_WIN_IOCP_IO_SERVICE_HPP
00013
00014 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
00015 # pragma once
00016 #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
00017
00018 #include "asio/detail/push_options.hpp"
00019
00020 #include "asio/detail/win_iocp_io_service_fwd.hpp"
00021
00022 #if defined(ASIO_HAS_IOCP)
00023
00024 #include "asio/detail/push_options.hpp"
00025 #include <limits>
00026 #include <boost/throw_exception.hpp>
00027 #include "asio/detail/pop_options.hpp"
00028
00029 #include "asio/io_service.hpp"
00030 #include "asio/system_error.hpp"
00031 #include "asio/detail/call_stack.hpp"
00032 #include "asio/detail/handler_alloc_helpers.hpp"
00033 #include "asio/detail/handler_invoke_helpers.hpp"
00034 #include "asio/detail/service_base.hpp"
00035 #include "asio/detail/socket_types.hpp"
00036 #include "asio/detail/timer_queue.hpp"
00037 #include "asio/detail/mutex.hpp"
00038
00039 namespace asio {
00040 namespace detail {
00041
00042 class win_iocp_io_service
00043 : public asio::detail::service_base<win_iocp_io_service>
00044 {
00045 public:
00046
00047
00048
00049
00050
00051
00052 class operation
00053 : public OVERLAPPED
00054 {
00055 public:
00056 typedef void (*invoke_func_type)(operation*, DWORD, size_t);
00057 typedef void (*destroy_func_type)(operation*);
00058
00059 operation(win_iocp_io_service& iocp_service,
00060 invoke_func_type invoke_func, destroy_func_type destroy_func)
00061 : outstanding_operations_(&iocp_service.outstanding_operations_),
00062 invoke_func_(invoke_func),
00063 destroy_func_(destroy_func)
00064 {
00065 Internal = 0;
00066 InternalHigh = 0;
00067 Offset = 0;
00068 OffsetHigh = 0;
00069 hEvent = 0;
00070
00071 ::InterlockedIncrement(outstanding_operations_);
00072 }
00073
00074 void do_completion(DWORD last_error, size_t bytes_transferred)
00075 {
00076 invoke_func_(this, last_error, bytes_transferred);
00077 }
00078
00079 void destroy()
00080 {
00081 destroy_func_(this);
00082 }
00083
00084 protected:
00085
00086 ~operation()
00087 {
00088 ::InterlockedDecrement(outstanding_operations_);
00089 }
00090
00091 private:
00092 long* outstanding_operations_;
00093 invoke_func_type invoke_func_;
00094 destroy_func_type destroy_func_;
00095 };
00096
00097
00098
00099 win_iocp_io_service(asio::io_service& io_service)
00100 : asio::detail::service_base<win_iocp_io_service>(io_service),
00101 iocp_(),
00102 outstanding_work_(0),
00103 outstanding_operations_(0),
00104 stopped_(0),
00105 shutdown_(0),
00106 timer_thread_(0),
00107 timer_interrupt_issued_(false)
00108 {
00109 }
00110
00111 void init(size_t concurrency_hint)
00112 {
00113 iocp_.handle = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0,
00114 static_cast<DWORD>((std::min<size_t>)(concurrency_hint, DWORD(~0))));
00115 if (!iocp_.handle)
00116 {
00117 DWORD last_error = ::GetLastError();
00118 asio::system_error e(
00119 asio::error_code(last_error,
00120 asio::error::get_system_category()),
00121 "iocp");
00122 boost::throw_exception(e);
00123 }
00124 }
00125
00126
00127 void shutdown_service()
00128 {
00129 ::InterlockedExchange(&shutdown_, 1);
00130
00131 while (::InterlockedExchangeAdd(&outstanding_operations_, 0) > 0)
00132 {
00133 DWORD bytes_transferred = 0;
00134 #if (WINVER < 0x0500)
00135 DWORD completion_key = 0;
00136 #else
00137 DWORD_PTR completion_key = 0;
00138 #endif
00139 LPOVERLAPPED overlapped = 0;
00140 ::GetQueuedCompletionStatus(iocp_.handle, &bytes_transferred,
00141 &completion_key, &overlapped, INFINITE);
00142 if (overlapped)
00143 static_cast<operation*>(overlapped)->destroy();
00144 }
00145
00146 for (std::size_t i = 0; i < timer_queues_.size(); ++i)
00147 timer_queues_[i]->destroy_timers();
00148 timer_queues_.clear();
00149 }
00150
00151
00152 asio::error_code register_handle(
00153 HANDLE handle, asio::error_code& ec)
00154 {
00155 if (::CreateIoCompletionPort(handle, iocp_.handle, 0, 0) == 0)
00156 {
00157 DWORD last_error = ::GetLastError();
00158 ec = asio::error_code(last_error,
00159 asio::error::get_system_category());
00160 }
00161 else
00162 {
00163 ec = asio::error_code();
00164 }
00165 return ec;
00166 }
00167
00168
00169 size_t run(asio::error_code& ec)
00170 {
00171 if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
00172 {
00173 ec = asio::error_code();
00174 return 0;
00175 }
00176
00177 call_stack<win_iocp_io_service>::context ctx(this);
00178
00179 size_t n = 0;
00180 while (do_one(true, ec))
00181 if (n != (std::numeric_limits<size_t>::max)())
00182 ++n;
00183 return n;
00184 }
00185
00186
00187 size_t run_one(asio::error_code& ec)
00188 {
00189 if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
00190 {
00191 ec = asio::error_code();
00192 return 0;
00193 }
00194
00195 call_stack<win_iocp_io_service>::context ctx(this);
00196
00197 return do_one(true, ec);
00198 }
00199
00200
00201 size_t poll(asio::error_code& ec)
00202 {
00203 if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
00204 {
00205 ec = asio::error_code();
00206 return 0;
00207 }
00208
00209 call_stack<win_iocp_io_service>::context ctx(this);
00210
00211 size_t n = 0;
00212 while (do_one(false, ec))
00213 if (n != (std::numeric_limits<size_t>::max)())
00214 ++n;
00215 return n;
00216 }
00217
00218
00219 size_t poll_one(asio::error_code& ec)
00220 {
00221 if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
00222 {
00223 ec = asio::error_code();
00224 return 0;
00225 }
00226
00227 call_stack<win_iocp_io_service>::context ctx(this);
00228
00229 return do_one(false, ec);
00230 }
00231
00232
00233 void stop()
00234 {
00235 if (::InterlockedExchange(&stopped_, 1) == 0)
00236 {
00237 if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, 0))
00238 {
00239 DWORD last_error = ::GetLastError();
00240 asio::system_error e(
00241 asio::error_code(last_error,
00242 asio::error::get_system_category()),
00243 "pqcs");
00244 boost::throw_exception(e);
00245 }
00246 }
00247 }
00248
00249
00250 void reset()
00251 {
00252 ::InterlockedExchange(&stopped_, 0);
00253 }
00254
00255
00256 void work_started()
00257 {
00258 ::InterlockedIncrement(&outstanding_work_);
00259 }
00260
00261
00262 void work_finished()
00263 {
00264 if (::InterlockedDecrement(&outstanding_work_) == 0)
00265 stop();
00266 }
00267
00268
00269 template <typename Handler>
00270 void dispatch(Handler handler)
00271 {
00272 if (call_stack<win_iocp_io_service>::contains(this))
00273 asio_handler_invoke_helpers::invoke(handler, &handler);
00274 else
00275 post(handler);
00276 }
00277
00278
00279 template <typename Handler>
00280 void post(Handler handler)
00281 {
00282
00283 if (::InterlockedExchangeAdd(&shutdown_, 0) != 0)
00284 return;
00285
00286
00287 typedef handler_operation<Handler> value_type;
00288 typedef handler_alloc_traits<Handler, value_type> alloc_traits;
00289 raw_handler_ptr<alloc_traits> raw_ptr(handler);
00290 handler_ptr<alloc_traits> ptr(raw_ptr, *this, handler);
00291
00292
00293 if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, ptr.get()))
00294 {
00295 DWORD last_error = ::GetLastError();
00296 asio::system_error e(
00297 asio::error_code(last_error,
00298 asio::error::get_system_category()),
00299 "pqcs");
00300 boost::throw_exception(e);
00301 }
00302
00303
00304 ptr.release();
00305 }
00306
00307
00308 void post_completion(operation* op, DWORD op_last_error,
00309 DWORD bytes_transferred)
00310 {
00311
00312 if (!::PostQueuedCompletionStatus(iocp_.handle,
00313 bytes_transferred, op_last_error, op))
00314 {
00315 DWORD last_error = ::GetLastError();
00316 asio::system_error e(
00317 asio::error_code(last_error,
00318 asio::error::get_system_category()),
00319 "pqcs");
00320 boost::throw_exception(e);
00321 }
00322 }
00323
00324
00325 template <typename Time_Traits>
00326 void add_timer_queue(timer_queue<Time_Traits>& timer_queue)
00327 {
00328 asio::detail::mutex::scoped_lock lock(timer_mutex_);
00329 timer_queues_.push_back(&timer_queue);
00330 }
00331
00332
00333 template <typename Time_Traits>
00334 void remove_timer_queue(timer_queue<Time_Traits>& timer_queue)
00335 {
00336 asio::detail::mutex::scoped_lock lock(timer_mutex_);
00337 for (std::size_t i = 0; i < timer_queues_.size(); ++i)
00338 {
00339 if (timer_queues_[i] == &timer_queue)
00340 {
00341 timer_queues_.erase(timer_queues_.begin() + i);
00342 return;
00343 }
00344 }
00345 }
00346
00347
00348
00349 template <typename Time_Traits, typename Handler>
00350 void schedule_timer(timer_queue<Time_Traits>& timer_queue,
00351 const typename Time_Traits::time_type& time, Handler handler, void* token)
00352 {
00353
00354 if (::InterlockedExchangeAdd(&shutdown_, 0) != 0)
00355 return;
00356
00357 asio::detail::mutex::scoped_lock lock(timer_mutex_);
00358 if (timer_queue.enqueue_timer(time, handler, token))
00359 {
00360 if (!timer_interrupt_issued_)
00361 {
00362 timer_interrupt_issued_ = true;
00363 lock.unlock();
00364 ::PostQueuedCompletionStatus(iocp_.handle,
00365 0, steal_timer_dispatching, 0);
00366 }
00367 }
00368 }
00369
00370
00371
00372 template <typename Time_Traits>
00373 std::size_t cancel_timer(timer_queue<Time_Traits>& timer_queue, void* token)
00374 {
00375
00376 if (::InterlockedExchangeAdd(&shutdown_, 0) != 0)
00377 return 0;
00378
00379 asio::detail::mutex::scoped_lock lock(timer_mutex_);
00380 std::size_t n = timer_queue.cancel_timer(token);
00381 if (n > 0 && !timer_interrupt_issued_)
00382 {
00383 timer_interrupt_issued_ = true;
00384 lock.unlock();
00385 ::PostQueuedCompletionStatus(iocp_.handle,
00386 0, steal_timer_dispatching, 0);
00387 }
00388 return n;
00389 }
00390
00391 private:
00392
00393
00394
00395 size_t do_one(bool block, asio::error_code& ec)
00396 {
00397 long this_thread_id = static_cast<long>(::GetCurrentThreadId());
00398
00399 for (;;)
00400 {
00401
00402 bool dispatching_timers = (::InterlockedCompareExchange(
00403 &timer_thread_, this_thread_id, 0) == 0);
00404
00405
00406 DWORD timeout = max_timeout;
00407 if (dispatching_timers)
00408 {
00409 asio::detail::mutex::scoped_lock lock(timer_mutex_);
00410 timer_interrupt_issued_ = false;
00411 timeout = get_timeout();
00412 }
00413
00414
00415 DWORD bytes_transferred = 0;
00416 #if (WINVER < 0x0500)
00417 DWORD completion_key = 0;
00418 #else
00419 DWORD_PTR completion_key = 0;
00420 #endif
00421 LPOVERLAPPED overlapped = 0;
00422 ::SetLastError(0);
00423 BOOL ok = ::GetQueuedCompletionStatus(iocp_.handle, &bytes_transferred,
00424 &completion_key, &overlapped, block ? timeout : 0);
00425 DWORD last_error = ::GetLastError();
00426
00427
00428 if (dispatching_timers)
00429 {
00430 try
00431 {
00432 asio::detail::mutex::scoped_lock lock(timer_mutex_);
00433 timer_queues_copy_ = timer_queues_;
00434 for (std::size_t i = 0; i < timer_queues_copy_.size(); ++i)
00435 {
00436 timer_queues_copy_[i]->dispatch_timers();
00437 timer_queues_copy_[i]->dispatch_cancellations();
00438 timer_queues_copy_[i]->complete_timers();
00439 }
00440 }
00441 catch (...)
00442 {
00443
00444 if (::InterlockedCompareExchange(&timer_thread_,
00445 0, this_thread_id) == this_thread_id)
00446 {
00447 ::PostQueuedCompletionStatus(iocp_.handle,
00448 0, transfer_timer_dispatching, 0);
00449 }
00450
00451 throw;
00452 }
00453 }
00454
00455 if (!ok && overlapped == 0)
00456 {
00457 if (block && last_error == WAIT_TIMEOUT)
00458 {
00459
00460 if (dispatching_timers)
00461 {
00462 ::InterlockedCompareExchange(&timer_thread_, 0, this_thread_id);
00463 }
00464
00465 continue;
00466 }
00467
00468
00469 if (dispatching_timers && ::InterlockedCompareExchange(
00470 &timer_thread_, 0, this_thread_id) == this_thread_id)
00471 {
00472 ::PostQueuedCompletionStatus(iocp_.handle,
00473 0, transfer_timer_dispatching, 0);
00474 }
00475
00476 ec = asio::error_code();
00477 return 0;
00478 }
00479 else if (overlapped)
00480 {
00481
00482 if (last_error == 0)
00483 {
00484 last_error = completion_key;
00485 }
00486
00487
00488 if (dispatching_timers && ::InterlockedCompareExchange(
00489 &timer_thread_, 0, this_thread_id) == this_thread_id)
00490 {
00491 ::PostQueuedCompletionStatus(iocp_.handle,
00492 0, transfer_timer_dispatching, 0);
00493 }
00494
00495
00496
00497 auto_work work(*this);
00498
00499
00500 operation* op = static_cast<operation*>(overlapped);
00501 op->do_completion(last_error, bytes_transferred);
00502
00503 ec = asio::error_code();
00504 return 1;
00505 }
00506 else if (completion_key == transfer_timer_dispatching)
00507 {
00508
00509 ::InterlockedCompareExchange(&timer_thread_, 0, this_thread_id);
00510 }
00511 else if (completion_key == steal_timer_dispatching)
00512 {
00513
00514 ::InterlockedExchange(&timer_thread_, 0);
00515 }
00516 else
00517 {
00518
00519
00520
00521 if (dispatching_timers)
00522 {
00523 ::InterlockedCompareExchange(&timer_thread_, 0, this_thread_id);
00524 }
00525
00526
00527
00528 if (::InterlockedExchangeAdd(&stopped_, 0) != 0)
00529 {
00530
00531 if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, 0))
00532 {
00533 DWORD last_error = ::GetLastError();
00534 ec = asio::error_code(last_error,
00535 asio::error::get_system_category());
00536 return 0;
00537 }
00538
00539 ec = asio::error_code();
00540 return 0;
00541 }
00542 }
00543 }
00544 }
00545
00546
00547 bool all_timer_queues_are_empty() const
00548 {
00549 for (std::size_t i = 0; i < timer_queues_.size(); ++i)
00550 if (!timer_queues_[i]->empty())
00551 return false;
00552 return true;
00553 }
00554
00555
00556
00557
00558 DWORD get_timeout()
00559 {
00560 if (all_timer_queues_are_empty())
00561 return max_timeout;
00562
00563 boost::posix_time::time_duration minimum_wait_duration
00564 = boost::posix_time::milliseconds(max_timeout);
00565
00566 for (std::size_t i = 0; i < timer_queues_.size(); ++i)
00567 {
00568 boost::posix_time::time_duration wait_duration
00569 = timer_queues_[i]->wait_duration();
00570 if (wait_duration < minimum_wait_duration)
00571 minimum_wait_duration = wait_duration;
00572 }
00573
00574 if (minimum_wait_duration > boost::posix_time::time_duration())
00575 {
00576 int milliseconds = minimum_wait_duration.total_milliseconds();
00577 return static_cast<DWORD>(milliseconds > 0 ? milliseconds : 1);
00578 }
00579 else
00580 {
00581 return 0;
00582 }
00583 }
00584
00585 struct auto_work
00586 {
00587 auto_work(win_iocp_io_service& io_service)
00588 : io_service_(io_service)
00589 {
00590 io_service_.work_started();
00591 }
00592
00593 ~auto_work()
00594 {
00595 io_service_.work_finished();
00596 }
00597
00598 private:
00599 win_iocp_io_service& io_service_;
00600 };
00601
00602 template <typename Handler>
00603 struct handler_operation
00604 : public operation
00605 {
00606 handler_operation(win_iocp_io_service& io_service,
00607 Handler handler)
00608 : operation(io_service, &handler_operation<Handler>::do_completion_impl,
00609 &handler_operation<Handler>::destroy_impl),
00610 io_service_(io_service),
00611 handler_(handler)
00612 {
00613 io_service_.work_started();
00614 }
00615
00616 ~handler_operation()
00617 {
00618 io_service_.work_finished();
00619 }
00620
00621 private:
00622
00623 handler_operation(const handler_operation&);
00624 void operator=(const handler_operation&);
00625
00626 static void do_completion_impl(operation* op, DWORD, size_t)
00627 {
00628
00629 typedef handler_operation<Handler> op_type;
00630 op_type* handler_op(static_cast<op_type*>(op));
00631 typedef handler_alloc_traits<Handler, op_type> alloc_traits;
00632 handler_ptr<alloc_traits> ptr(handler_op->handler_, handler_op);
00633
00634
00635
00636 Handler handler(handler_op->handler_);
00637
00638
00639 ptr.reset();
00640
00641
00642 asio_handler_invoke_helpers::invoke(handler, &handler);
00643 }
00644
00645 static void destroy_impl(operation* op)
00646 {
00647
00648 typedef handler_operation<Handler> op_type;
00649 op_type* handler_op(static_cast<op_type*>(op));
00650 typedef handler_alloc_traits<Handler, op_type> alloc_traits;
00651 handler_ptr<alloc_traits> ptr(handler_op->handler_, handler_op);
00652
00653
00654
00655
00656
00657 Handler handler(handler_op->handler_);
00658 (void)handler;
00659
00660
00661 ptr.reset();
00662 }
00663
00664 win_iocp_io_service& io_service_;
00665 Handler handler_;
00666 };
00667
00668
00669 struct iocp_holder
00670 {
00671 HANDLE handle;
00672 iocp_holder() : handle(0) {}
00673 ~iocp_holder() { if (handle) ::CloseHandle(handle); }
00674 } iocp_;
00675
00676
00677 long outstanding_work_;
00678
00679
00680 long outstanding_operations_;
00681 friend class operation;
00682
00683
00684 long stopped_;
00685
00686
00687 long shutdown_;
00688
00689 enum
00690 {
00691
00692 max_timeout = 500,
00693
00694
00695
00696 transfer_timer_dispatching = 1,
00697
00698
00699
00700 steal_timer_dispatching = 2
00701 };
00702
00703
00704 long timer_thread_;
00705
00706
00707 mutex timer_mutex_;
00708
00709
00710 bool timer_interrupt_issued_;
00711
00712
00713 std::vector<timer_queue_base*> timer_queues_;
00714
00715
00716
00717
00718 std::vector<timer_queue_base*> timer_queues_copy_;
00719 };
00720
00721 }
00722 }
00723
00724 #endif // defined(ASIO_HAS_IOCP)
00725
00726 #include "asio/detail/pop_options.hpp"
00727
00728 #endif // ASIO_DETAIL_WIN_IOCP_IO_SERVICE_HPP