$search
00001 // 00002 // dev_poll_reactor.hpp 00003 // ~~~~~~~~~~~~~~~~~~~~ 00004 // 00005 // Copyright (c) 2003-2008 Christopher M. Kohlhoff (chris at kohlhoff dot com) 00006 // 00007 // Distributed under the Boost Software License, Version 1.0. (See accompanying 00008 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 00009 // 00010 00011 #ifndef ASIO_DETAIL_DEV_POLL_REACTOR_HPP 00012 #define ASIO_DETAIL_DEV_POLL_REACTOR_HPP 00013 00014 #if defined(_MSC_VER) && (_MSC_VER >= 1200) 00015 # pragma once 00016 #endif // defined(_MSC_VER) && (_MSC_VER >= 1200) 00017 00018 #include "asio/detail/push_options.hpp" 00019 00020 #include "asio/detail/dev_poll_reactor_fwd.hpp" 00021 00022 #if defined(ASIO_HAS_DEV_POLL) 00023 00024 #include "asio/detail/push_options.hpp" 00025 #include <cstddef> 00026 #include <vector> 00027 #include <boost/config.hpp> 00028 #include <boost/date_time/posix_time/posix_time_types.hpp> 00029 #include <boost/throw_exception.hpp> 00030 #include <sys/devpoll.h> 00031 #include "asio/detail/pop_options.hpp" 00032 00033 #include "asio/error.hpp" 00034 #include "asio/io_service.hpp" 00035 #include "asio/system_error.hpp" 00036 #include "asio/detail/bind_handler.hpp" 00037 #include "asio/detail/hash_map.hpp" 00038 #include "asio/detail/mutex.hpp" 00039 #include "asio/detail/task_io_service.hpp" 00040 #include "asio/detail/thread.hpp" 00041 #include "asio/detail/reactor_op_queue.hpp" 00042 #include "asio/detail/select_interrupter.hpp" 00043 #include "asio/detail/service_base.hpp" 00044 #include "asio/detail/signal_blocker.hpp" 00045 #include "asio/detail/socket_types.hpp" 00046 #include "asio/detail/timer_queue.hpp" 00047 00048 namespace asio { 00049 namespace detail { 00050 00051 template <bool Own_Thread> 00052 class dev_poll_reactor 00053 : public asio::detail::service_base<dev_poll_reactor<Own_Thread> > 00054 { 00055 public: 00056 // Per-descriptor data. 00057 struct per_descriptor_data 00058 { 00059 }; 00060 00061 // Constructor. 00062 dev_poll_reactor(asio::io_service& io_service) 00063 : asio::detail::service_base< 00064 dev_poll_reactor<Own_Thread> >(io_service), 00065 mutex_(), 00066 dev_poll_fd_(do_dev_poll_create()), 00067 wait_in_progress_(false), 00068 interrupter_(), 00069 read_op_queue_(), 00070 write_op_queue_(), 00071 except_op_queue_(), 00072 pending_cancellations_(), 00073 stop_thread_(false), 00074 thread_(0), 00075 shutdown_(false) 00076 { 00077 // Start the reactor's internal thread only if needed. 00078 if (Own_Thread) 00079 { 00080 asio::detail::signal_blocker sb; 00081 thread_ = new asio::detail::thread( 00082 bind_handler(&dev_poll_reactor::call_run_thread, this)); 00083 } 00084 00085 // Add the interrupter's descriptor to /dev/poll. 00086 ::pollfd ev = { 0 }; 00087 ev.fd = interrupter_.read_descriptor(); 00088 ev.events = POLLIN | POLLERR; 00089 ev.revents = 0; 00090 ::write(dev_poll_fd_, &ev, sizeof(ev)); 00091 } 00092 00093 // Destructor. 00094 ~dev_poll_reactor() 00095 { 00096 shutdown_service(); 00097 ::close(dev_poll_fd_); 00098 } 00099 00100 // Destroy all user-defined handler objects owned by the service. 00101 void shutdown_service() 00102 { 00103 asio::detail::mutex::scoped_lock lock(mutex_); 00104 shutdown_ = true; 00105 stop_thread_ = true; 00106 lock.unlock(); 00107 00108 if (thread_) 00109 { 00110 interrupter_.interrupt(); 00111 thread_->join(); 00112 delete thread_; 00113 thread_ = 0; 00114 } 00115 00116 read_op_queue_.destroy_operations(); 00117 write_op_queue_.destroy_operations(); 00118 except_op_queue_.destroy_operations(); 00119 00120 for (std::size_t i = 0; i < timer_queues_.size(); ++i) 00121 timer_queues_[i]->destroy_timers(); 00122 timer_queues_.clear(); 00123 } 00124 00125 // Register a socket with the reactor. Returns 0 on success, system error 00126 // code on failure. 00127 int register_descriptor(socket_type, per_descriptor_data&) 00128 { 00129 return 0; 00130 } 00131 00132 // Start a new read operation. The handler object will be invoked when the 00133 // given descriptor is ready to be read, or an error has occurred. 00134 template <typename Handler> 00135 void start_read_op(socket_type descriptor, per_descriptor_data&, 00136 Handler handler, bool allow_speculative_read = true) 00137 { 00138 asio::detail::mutex::scoped_lock lock(mutex_); 00139 00140 if (shutdown_) 00141 return; 00142 00143 if (allow_speculative_read) 00144 { 00145 if (!read_op_queue_.has_operation(descriptor)) 00146 { 00147 asio::error_code ec; 00148 std::size_t bytes_transferred = 0; 00149 if (handler.perform(ec, bytes_transferred)) 00150 { 00151 handler.complete(ec, bytes_transferred); 00152 return; 00153 } 00154 } 00155 } 00156 00157 if (read_op_queue_.enqueue_operation(descriptor, handler)) 00158 { 00159 ::pollfd& ev = add_pending_event_change(descriptor); 00160 ev.events = POLLIN | POLLERR | POLLHUP; 00161 if (write_op_queue_.has_operation(descriptor)) 00162 ev.events |= POLLOUT; 00163 if (except_op_queue_.has_operation(descriptor)) 00164 ev.events |= POLLPRI; 00165 interrupter_.interrupt(); 00166 } 00167 } 00168 00169 // Start a new write operation. The handler object will be invoked when the 00170 // given descriptor is ready to be written, or an error has occurred. 00171 template <typename Handler> 00172 void start_write_op(socket_type descriptor, per_descriptor_data&, 00173 Handler handler, bool allow_speculative_write = true) 00174 { 00175 asio::detail::mutex::scoped_lock lock(mutex_); 00176 00177 if (shutdown_) 00178 return; 00179 00180 if (allow_speculative_write) 00181 { 00182 if (!write_op_queue_.has_operation(descriptor)) 00183 { 00184 asio::error_code ec; 00185 std::size_t bytes_transferred = 0; 00186 if (handler.perform(ec, bytes_transferred)) 00187 { 00188 handler.complete(ec, bytes_transferred); 00189 return; 00190 } 00191 } 00192 } 00193 00194 if (write_op_queue_.enqueue_operation(descriptor, handler)) 00195 { 00196 ::pollfd& ev = add_pending_event_change(descriptor); 00197 ev.events = POLLOUT | POLLERR | POLLHUP; 00198 if (read_op_queue_.has_operation(descriptor)) 00199 ev.events |= POLLIN; 00200 if (except_op_queue_.has_operation(descriptor)) 00201 ev.events |= POLLPRI; 00202 interrupter_.interrupt(); 00203 } 00204 } 00205 00206 // Start a new exception operation. The handler object will be invoked when 00207 // the given descriptor has exception information, or an error has occurred. 00208 template <typename Handler> 00209 void start_except_op(socket_type descriptor, 00210 per_descriptor_data&, Handler handler) 00211 { 00212 asio::detail::mutex::scoped_lock lock(mutex_); 00213 00214 if (shutdown_) 00215 return; 00216 00217 if (except_op_queue_.enqueue_operation(descriptor, handler)) 00218 { 00219 ::pollfd& ev = add_pending_event_change(descriptor); 00220 ev.events = POLLPRI | POLLERR | POLLHUP; 00221 if (read_op_queue_.has_operation(descriptor)) 00222 ev.events |= POLLIN; 00223 if (write_op_queue_.has_operation(descriptor)) 00224 ev.events |= POLLOUT; 00225 interrupter_.interrupt(); 00226 } 00227 } 00228 00229 // Start a new write operation. The handler object will be invoked when the 00230 // information available, or an error has occurred. 00231 template <typename Handler> 00232 void start_connect_op(socket_type descriptor, 00233 per_descriptor_data&, Handler handler) 00234 { 00235 asio::detail::mutex::scoped_lock lock(mutex_); 00236 00237 if (shutdown_) 00238 return; 00239 00240 if (write_op_queue_.enqueue_operation(descriptor, handler)) 00241 { 00242 ::pollfd& ev = add_pending_event_change(descriptor); 00243 ev.events = POLLOUT | POLLERR | POLLHUP; 00244 if (read_op_queue_.has_operation(descriptor)) 00245 ev.events |= POLLIN; 00246 if (except_op_queue_.has_operation(descriptor)) 00247 ev.events |= POLLPRI; 00248 interrupter_.interrupt(); 00249 } 00250 } 00251 00252 // Cancel all operations associated with the given descriptor. The 00253 // handlers associated with the descriptor will be invoked with the 00254 // operation_aborted error. 00255 void cancel_ops(socket_type descriptor, per_descriptor_data&) 00256 { 00257 asio::detail::mutex::scoped_lock lock(mutex_); 00258 cancel_ops_unlocked(descriptor); 00259 } 00260 00261 // Cancel any operations that are running against the descriptor and remove 00262 // its registration from the reactor. 00263 void close_descriptor(socket_type descriptor, per_descriptor_data&) 00264 { 00265 asio::detail::mutex::scoped_lock lock(mutex_); 00266 00267 // Remove the descriptor from /dev/poll. 00268 ::pollfd& ev = add_pending_event_change(descriptor); 00269 ev.events = POLLREMOVE; 00270 interrupter_.interrupt(); 00271 00272 // Cancel any outstanding operations associated with the descriptor. 00273 cancel_ops_unlocked(descriptor); 00274 } 00275 00276 // Add a new timer queue to the reactor. 00277 template <typename Time_Traits> 00278 void add_timer_queue(timer_queue<Time_Traits>& timer_queue) 00279 { 00280 asio::detail::mutex::scoped_lock lock(mutex_); 00281 timer_queues_.push_back(&timer_queue); 00282 } 00283 00284 // Remove a timer queue from the reactor. 00285 template <typename Time_Traits> 00286 void remove_timer_queue(timer_queue<Time_Traits>& timer_queue) 00287 { 00288 asio::detail::mutex::scoped_lock lock(mutex_); 00289 for (std::size_t i = 0; i < timer_queues_.size(); ++i) 00290 { 00291 if (timer_queues_[i] == &timer_queue) 00292 { 00293 timer_queues_.erase(timer_queues_.begin() + i); 00294 return; 00295 } 00296 } 00297 } 00298 00299 // Schedule a timer in the given timer queue to expire at the specified 00300 // absolute time. The handler object will be invoked when the timer expires. 00301 template <typename Time_Traits, typename Handler> 00302 void schedule_timer(timer_queue<Time_Traits>& timer_queue, 00303 const typename Time_Traits::time_type& time, Handler handler, void* token) 00304 { 00305 asio::detail::mutex::scoped_lock lock(mutex_); 00306 if (!shutdown_) 00307 if (timer_queue.enqueue_timer(time, handler, token)) 00308 interrupter_.interrupt(); 00309 } 00310 00311 // Cancel the timer associated with the given token. Returns the number of 00312 // handlers that have been posted or dispatched. 00313 template <typename Time_Traits> 00314 std::size_t cancel_timer(timer_queue<Time_Traits>& timer_queue, void* token) 00315 { 00316 asio::detail::mutex::scoped_lock lock(mutex_); 00317 std::size_t n = timer_queue.cancel_timer(token); 00318 if (n > 0) 00319 interrupter_.interrupt(); 00320 return n; 00321 } 00322 00323 private: 00324 friend class task_io_service<dev_poll_reactor<Own_Thread> >; 00325 00326 // Run /dev/poll once until interrupted or events are ready to be dispatched. 00327 void run(bool block) 00328 { 00329 asio::detail::mutex::scoped_lock lock(mutex_); 00330 00331 // Dispatch any operation cancellations that were made while the select 00332 // loop was not running. 00333 read_op_queue_.perform_cancellations(); 00334 write_op_queue_.perform_cancellations(); 00335 except_op_queue_.perform_cancellations(); 00336 for (std::size_t i = 0; i < timer_queues_.size(); ++i) 00337 timer_queues_[i]->dispatch_cancellations(); 00338 00339 // Check if the thread is supposed to stop. 00340 if (stop_thread_) 00341 { 00342 complete_operations_and_timers(lock); 00343 return; 00344 } 00345 00346 // We can return immediately if there's no work to do and the reactor is 00347 // not supposed to block. 00348 if (!block && read_op_queue_.empty() && write_op_queue_.empty() 00349 && except_op_queue_.empty() && all_timer_queues_are_empty()) 00350 { 00351 complete_operations_and_timers(lock); 00352 return; 00353 } 00354 00355 // Write the pending event registration changes to the /dev/poll descriptor. 00356 std::size_t events_size = sizeof(::pollfd) * pending_event_changes_.size(); 00357 errno = 0; 00358 int result = ::write(dev_poll_fd_, 00359 &pending_event_changes_[0], events_size); 00360 if (result != static_cast<int>(events_size)) 00361 { 00362 for (std::size_t i = 0; i < pending_event_changes_.size(); ++i) 00363 { 00364 int descriptor = pending_event_changes_[i].fd; 00365 asio::error_code ec = asio::error_code( 00366 errno, asio::error::get_system_category()); 00367 read_op_queue_.perform_all_operations(descriptor, ec); 00368 write_op_queue_.perform_all_operations(descriptor, ec); 00369 except_op_queue_.perform_all_operations(descriptor, ec); 00370 } 00371 } 00372 pending_event_changes_.clear(); 00373 pending_event_change_index_.clear(); 00374 00375 int timeout = block ? get_timeout() : 0; 00376 wait_in_progress_ = true; 00377 lock.unlock(); 00378 00379 // Block on the /dev/poll descriptor. 00380 ::pollfd events[128] = { { 0 } }; 00381 ::dvpoll dp = { 0 }; 00382 dp.dp_fds = events; 00383 dp.dp_nfds = 128; 00384 dp.dp_timeout = timeout; 00385 int num_events = ::ioctl(dev_poll_fd_, DP_POLL, &dp); 00386 00387 lock.lock(); 00388 wait_in_progress_ = false; 00389 00390 // Block signals while performing operations. 00391 asio::detail::signal_blocker sb; 00392 00393 // Dispatch the waiting events. 00394 for (int i = 0; i < num_events; ++i) 00395 { 00396 int descriptor = events[i].fd; 00397 if (descriptor == interrupter_.read_descriptor()) 00398 { 00399 interrupter_.reset(); 00400 } 00401 else 00402 { 00403 bool more_reads = false; 00404 bool more_writes = false; 00405 bool more_except = false; 00406 asio::error_code ec; 00407 00408 // Exception operations must be processed first to ensure that any 00409 // out-of-band data is read before normal data. 00410 if (events[i].events & (POLLPRI | POLLERR | POLLHUP)) 00411 more_except = except_op_queue_.perform_operation(descriptor, ec); 00412 else 00413 more_except = except_op_queue_.has_operation(descriptor); 00414 00415 if (events[i].events & (POLLIN | POLLERR | POLLHUP)) 00416 more_reads = read_op_queue_.perform_operation(descriptor, ec); 00417 else 00418 more_reads = read_op_queue_.has_operation(descriptor); 00419 00420 if (events[i].events & (POLLOUT | POLLERR | POLLHUP)) 00421 more_writes = write_op_queue_.perform_operation(descriptor, ec); 00422 else 00423 more_writes = write_op_queue_.has_operation(descriptor); 00424 00425 if ((events[i].events & (POLLERR | POLLHUP)) != 0 00426 && (events[i].events & ~(POLLERR | POLLHUP)) == 0 00427 && !more_except && !more_reads && !more_writes) 00428 { 00429 // If we have an event and no operations associated with the 00430 // descriptor then we need to delete the descriptor from /dev/poll. 00431 // The poll operation can produce POLLHUP or POLLERR events when there 00432 // is no operation pending, so if we do not remove the descriptor we 00433 // can end up in a tight polling loop. 00434 ::pollfd ev = { 0 }; 00435 ev.fd = descriptor; 00436 ev.events = POLLREMOVE; 00437 ev.revents = 0; 00438 ::write(dev_poll_fd_, &ev, sizeof(ev)); 00439 } 00440 else 00441 { 00442 ::pollfd ev = { 0 }; 00443 ev.fd = descriptor; 00444 ev.events = POLLERR | POLLHUP; 00445 if (more_reads) 00446 ev.events |= POLLIN; 00447 if (more_writes) 00448 ev.events |= POLLOUT; 00449 if (more_except) 00450 ev.events |= POLLPRI; 00451 ev.revents = 0; 00452 int result = ::write(dev_poll_fd_, &ev, sizeof(ev)); 00453 if (result != sizeof(ev)) 00454 { 00455 ec = asio::error_code(errno, 00456 asio::error::get_system_category()); 00457 read_op_queue_.perform_all_operations(descriptor, ec); 00458 write_op_queue_.perform_all_operations(descriptor, ec); 00459 except_op_queue_.perform_all_operations(descriptor, ec); 00460 } 00461 } 00462 } 00463 } 00464 read_op_queue_.perform_cancellations(); 00465 write_op_queue_.perform_cancellations(); 00466 except_op_queue_.perform_cancellations(); 00467 for (std::size_t i = 0; i < timer_queues_.size(); ++i) 00468 { 00469 timer_queues_[i]->dispatch_timers(); 00470 timer_queues_[i]->dispatch_cancellations(); 00471 } 00472 00473 // Issue any pending cancellations. 00474 for (size_t i = 0; i < pending_cancellations_.size(); ++i) 00475 cancel_ops_unlocked(pending_cancellations_[i]); 00476 pending_cancellations_.clear(); 00477 00478 complete_operations_and_timers(lock); 00479 } 00480 00481 // Run the select loop in the thread. 00482 void run_thread() 00483 { 00484 asio::detail::mutex::scoped_lock lock(mutex_); 00485 while (!stop_thread_) 00486 { 00487 lock.unlock(); 00488 run(true); 00489 lock.lock(); 00490 } 00491 } 00492 00493 // Entry point for the select loop thread. 00494 static void call_run_thread(dev_poll_reactor* reactor) 00495 { 00496 reactor->run_thread(); 00497 } 00498 00499 // Interrupt the select loop. 00500 void interrupt() 00501 { 00502 interrupter_.interrupt(); 00503 } 00504 00505 // Create the /dev/poll file descriptor. Throws an exception if the descriptor 00506 // cannot be created. 00507 static int do_dev_poll_create() 00508 { 00509 int fd = ::open("/dev/poll", O_RDWR); 00510 if (fd == -1) 00511 { 00512 boost::throw_exception( 00513 asio::system_error( 00514 asio::error_code(errno, 00515 asio::error::get_system_category()), 00516 "/dev/poll")); 00517 } 00518 return fd; 00519 } 00520 00521 // Check if all timer queues are empty. 00522 bool all_timer_queues_are_empty() const 00523 { 00524 for (std::size_t i = 0; i < timer_queues_.size(); ++i) 00525 if (!timer_queues_[i]->empty()) 00526 return false; 00527 return true; 00528 } 00529 00530 // Get the timeout value for the /dev/poll DP_POLL operation. The timeout 00531 // value is returned as a number of milliseconds. A return value of -1 00532 // indicates that the poll should block indefinitely. 00533 int get_timeout() 00534 { 00535 if (all_timer_queues_are_empty()) 00536 return -1; 00537 00538 // By default we will wait no longer than 5 minutes. This will ensure that 00539 // any changes to the system clock are detected after no longer than this. 00540 boost::posix_time::time_duration minimum_wait_duration 00541 = boost::posix_time::minutes(5); 00542 00543 for (std::size_t i = 0; i < timer_queues_.size(); ++i) 00544 { 00545 boost::posix_time::time_duration wait_duration 00546 = timer_queues_[i]->wait_duration(); 00547 if (wait_duration < minimum_wait_duration) 00548 minimum_wait_duration = wait_duration; 00549 } 00550 00551 if (minimum_wait_duration > boost::posix_time::time_duration()) 00552 { 00553 int milliseconds = minimum_wait_duration.total_milliseconds(); 00554 return milliseconds > 0 ? milliseconds : 1; 00555 } 00556 else 00557 { 00558 return 0; 00559 } 00560 } 00561 00562 // Cancel all operations associated with the given descriptor. The do_cancel 00563 // function of the handler objects will be invoked. This function does not 00564 // acquire the dev_poll_reactor's mutex. 00565 void cancel_ops_unlocked(socket_type descriptor) 00566 { 00567 bool interrupt = read_op_queue_.cancel_operations(descriptor); 00568 interrupt = write_op_queue_.cancel_operations(descriptor) || interrupt; 00569 interrupt = except_op_queue_.cancel_operations(descriptor) || interrupt; 00570 if (interrupt) 00571 interrupter_.interrupt(); 00572 } 00573 00574 // Clean up operations and timers. We must not hold the lock since the 00575 // destructors may make calls back into this reactor. We make a copy of the 00576 // vector of timer queues since the original may be modified while the lock 00577 // is not held. 00578 void complete_operations_and_timers( 00579 asio::detail::mutex::scoped_lock& lock) 00580 { 00581 timer_queues_for_cleanup_ = timer_queues_; 00582 lock.unlock(); 00583 read_op_queue_.complete_operations(); 00584 write_op_queue_.complete_operations(); 00585 except_op_queue_.complete_operations(); 00586 for (std::size_t i = 0; i < timer_queues_for_cleanup_.size(); ++i) 00587 timer_queues_for_cleanup_[i]->complete_timers(); 00588 } 00589 00590 // Add a pending event entry for the given descriptor. 00591 ::pollfd& add_pending_event_change(int descriptor) 00592 { 00593 hash_map<int, std::size_t>::iterator iter 00594 = pending_event_change_index_.find(descriptor); 00595 if (iter == pending_event_change_index_.end()) 00596 { 00597 std::size_t index = pending_event_changes_.size(); 00598 pending_event_changes_.reserve(pending_event_changes_.size() + 1); 00599 pending_event_change_index_.insert(std::make_pair(descriptor, index)); 00600 pending_event_changes_.push_back(::pollfd()); 00601 pending_event_changes_[index].fd = descriptor; 00602 pending_event_changes_[index].revents = 0; 00603 return pending_event_changes_[index]; 00604 } 00605 else 00606 { 00607 return pending_event_changes_[iter->second]; 00608 } 00609 } 00610 00611 // Mutex to protect access to internal data. 00612 asio::detail::mutex mutex_; 00613 00614 // The /dev/poll file descriptor. 00615 int dev_poll_fd_; 00616 00617 // Vector of /dev/poll events waiting to be written to the descriptor. 00618 std::vector< ::pollfd> pending_event_changes_; 00619 00620 // Hash map to associate a descriptor with a pending event change index. 00621 hash_map<int, std::size_t> pending_event_change_index_; 00622 00623 // Whether the DP_POLL operation is currently in progress 00624 bool wait_in_progress_; 00625 00626 // The interrupter is used to break a blocking DP_POLL operation. 00627 select_interrupter interrupter_; 00628 00629 // The queue of read operations. 00630 reactor_op_queue<socket_type> read_op_queue_; 00631 00632 // The queue of write operations. 00633 reactor_op_queue<socket_type> write_op_queue_; 00634 00635 // The queue of except operations. 00636 reactor_op_queue<socket_type> except_op_queue_; 00637 00638 // The timer queues. 00639 std::vector<timer_queue_base*> timer_queues_; 00640 00641 // A copy of the timer queues, used when cleaning up timers. The copy is 00642 // stored as a class data member to avoid unnecessary memory allocation. 00643 std::vector<timer_queue_base*> timer_queues_for_cleanup_; 00644 00645 // The descriptors that are pending cancellation. 00646 std::vector<socket_type> pending_cancellations_; 00647 00648 // Does the reactor loop thread need to stop. 00649 bool stop_thread_; 00650 00651 // The thread that is running the reactor loop. 00652 asio::detail::thread* thread_; 00653 00654 // Whether the service has been shut down. 00655 bool shutdown_; 00656 }; 00657 00658 } // namespace detail 00659 } // namespace asio 00660 00661 #endif // defined(ASIO_HAS_DEV_POLL) 00662 00663 #include "asio/detail/pop_options.hpp" 00664 00665 #endif // ASIO_DETAIL_DEV_POLL_REACTOR_HPP