reactor_op_queue.hpp
Go to the documentation of this file.
00001 //
00002 // reactor_op_queue.hpp
00003 // ~~~~~~~~~~~~~~~~~~~~
00004 //
00005 // Copyright (c) 2003-2008 Christopher M. Kohlhoff (chris at kohlhoff dot com)
00006 //
00007 // Distributed under the Boost Software License, Version 1.0. (See accompanying
00008 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
00009 //
00010 
00011 #ifndef ASIO_DETAIL_REACTOR_OP_QUEUE_HPP
00012 #define ASIO_DETAIL_REACTOR_OP_QUEUE_HPP
00013 
00014 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
00015 # pragma once
00016 #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
00017 
00018 #include "asio/detail/push_options.hpp"
00019 
00020 #include "asio/detail/push_options.hpp"
00021 #include <memory>
00022 #include "asio/detail/pop_options.hpp"
00023 
00024 #include "asio/error.hpp"
00025 #include "asio/detail/handler_alloc_helpers.hpp"
00026 #include "asio/detail/hash_map.hpp"
00027 #include "asio/detail/noncopyable.hpp"
00028 
00029 namespace asio {
00030 namespace detail {
00031 
00032 template <typename Descriptor>
00033 class reactor_op_queue
00034   : private noncopyable
00035 {
00036 public:
00037   // Constructor.
00038   reactor_op_queue()
00039     : operations_(),
00040       cancelled_operations_(0),
00041       complete_operations_(0)
00042   {
00043   }
00044 
00045   // Add a new operation to the queue. Returns true if this is the only
00046   // operation for the given descriptor, in which case the reactor's event
00047   // demultiplexing function call may need to be interrupted and restarted.
00048   template <typename Operation>
00049   bool enqueue_operation(Descriptor descriptor, Operation operation)
00050   {
00051     // Allocate and construct an object to wrap the handler.
00052     typedef handler_alloc_traits<Operation, op<Operation> > alloc_traits;
00053     raw_handler_ptr<alloc_traits> raw_ptr(operation);
00054     handler_ptr<alloc_traits> ptr(raw_ptr, descriptor, operation);
00055 
00056     typedef typename operation_map::iterator iterator;
00057     typedef typename operation_map::value_type value_type;
00058     std::pair<iterator, bool> entry =
00059       operations_.insert(value_type(descriptor, ptr.get()));
00060     if (entry.second)
00061     {
00062       ptr.release();
00063       return true;
00064     }
00065 
00066     op_base* current_op = entry.first->second;
00067     while (current_op->next_)
00068       current_op = current_op->next_;
00069     current_op->next_ = ptr.release();
00070 
00071     return false;
00072   }
00073 
00074   // Cancel all operations associated with the descriptor. Any operations
00075   // pending for the descriptor will be notified that they have been cancelled
00076   // next time perform_cancellations is called. Returns true if any operations
00077   // were cancelled, in which case the reactor's event demultiplexing function
00078   // may need to be interrupted and restarted.
00079   bool cancel_operations(Descriptor descriptor)
00080   {
00081     typename operation_map::iterator i = operations_.find(descriptor);
00082     if (i != operations_.end())
00083     {
00084       op_base* last_op = i->second;
00085       while (last_op->next_)
00086         last_op = last_op->next_;
00087       last_op->next_ = cancelled_operations_;
00088       cancelled_operations_ = i->second;
00089       operations_.erase(i);
00090       return true;
00091     }
00092 
00093     return false;
00094   }
00095 
00096   // Whether there are no operations in the queue.
00097   bool empty() const
00098   {
00099     return operations_.empty();
00100   }
00101 
00102   // Determine whether there are any operations associated with the descriptor.
00103   bool has_operation(Descriptor descriptor) const
00104   {
00105     return operations_.find(descriptor) != operations_.end();
00106   }
00107 
00108   // Perform the first operation corresponding to the descriptor. Returns true
00109   // if there are more operations queued for the descriptor.
00110   bool perform_operation(Descriptor descriptor,
00111       const asio::error_code& result)
00112   {
00113     typename operation_map::iterator i = operations_.find(descriptor);
00114     if (i != operations_.end())
00115     {
00116       op_base* this_op = i->second;
00117       i->second = this_op->next_;
00118       this_op->next_ = complete_operations_;
00119       complete_operations_ = this_op;
00120       bool done = this_op->perform(result);
00121       if (done)
00122       {
00123         // Operation has finished.
00124         if (i->second)
00125         {
00126           return true;
00127         }
00128         else
00129         {
00130           operations_.erase(i);
00131           return false;
00132         }
00133       }
00134       else
00135       {
00136         // Operation wants to be called again. Leave it at the front of the
00137         // queue for this descriptor, and remove from the completed list.
00138         complete_operations_ = this_op->next_;
00139         this_op->next_ = i->second;
00140         i->second = this_op;
00141         return true;
00142       }
00143     }
00144     return false;
00145   }
00146 
00147   // Perform all operations corresponding to the descriptor.
00148   void perform_all_operations(Descriptor descriptor,
00149       const asio::error_code& result)
00150   {
00151     typename operation_map::iterator i = operations_.find(descriptor);
00152     if (i != operations_.end())
00153     {
00154       while (i->second)
00155       {
00156         op_base* this_op = i->second;
00157         i->second = this_op->next_;
00158         this_op->next_ = complete_operations_;
00159         complete_operations_ = this_op;
00160         bool done = this_op->perform(result);
00161         if (!done)
00162         {
00163           // Operation has not finished yet, so leave at front of queue, and
00164           // remove from the completed list.
00165           complete_operations_ = this_op->next_;
00166           this_op->next_ = i->second;
00167           i->second = this_op;
00168           return;
00169         }
00170       }
00171       operations_.erase(i);
00172     }
00173   }
00174 
00175   // Fill a descriptor set with the descriptors corresponding to each active
00176   // operation.
00177   template <typename Descriptor_Set>
00178   void get_descriptors(Descriptor_Set& descriptors)
00179   {
00180     typename operation_map::iterator i = operations_.begin();
00181     while (i != operations_.end())
00182     {
00183       Descriptor descriptor = i->first;
00184       ++i;
00185       if (!descriptors.set(descriptor))
00186       {
00187         asio::error_code ec(error::fd_set_failure);
00188         perform_all_operations(descriptor, ec);
00189       }
00190     }
00191   }
00192 
00193   // Perform the operations corresponding to the ready file descriptors
00194   // contained in the given descriptor set.
00195   template <typename Descriptor_Set>
00196   void perform_operations_for_descriptors(const Descriptor_Set& descriptors,
00197       const asio::error_code& result)
00198   {
00199     typename operation_map::iterator i = operations_.begin();
00200     while (i != operations_.end())
00201     {
00202       typename operation_map::iterator op_iter = i++;
00203       if (descriptors.is_set(op_iter->first))
00204       {
00205         op_base* this_op = op_iter->second;
00206         op_iter->second = this_op->next_;
00207         this_op->next_ = complete_operations_;
00208         complete_operations_ = this_op;
00209         bool done = this_op->perform(result);
00210         if (done)
00211         {
00212           if (!op_iter->second)
00213             operations_.erase(op_iter);
00214         }
00215         else
00216         {
00217           // Operation has not finished yet, so leave at front of queue, and
00218           // remove from the completed list.
00219           complete_operations_ = this_op->next_;
00220           this_op->next_ = op_iter->second;
00221           op_iter->second = this_op;
00222         }
00223       }
00224     }
00225   }
00226 
00227   // Perform any pending cancels for operations.
00228   void perform_cancellations()
00229   {
00230     while (cancelled_operations_)
00231     {
00232       op_base* this_op = cancelled_operations_;
00233       cancelled_operations_ = this_op->next_;
00234       this_op->next_ = complete_operations_;
00235       complete_operations_ = this_op;
00236       this_op->perform(asio::error::operation_aborted);
00237     }
00238   }
00239 
00240   // Complete all operations that are waiting to be completed.
00241   void complete_operations()
00242   {
00243     while (complete_operations_)
00244     {
00245       op_base* next_op = complete_operations_->next_;
00246       complete_operations_->next_ = 0;
00247       complete_operations_->complete();
00248       complete_operations_ = next_op;
00249     }
00250   }
00251 
00252   // Destroy all operations owned by the queue.
00253   void destroy_operations()
00254   {
00255     while (cancelled_operations_)
00256     {
00257       op_base* next_op = cancelled_operations_->next_;
00258       cancelled_operations_->next_ = 0;
00259       cancelled_operations_->destroy();
00260       cancelled_operations_ = next_op;
00261     }
00262 
00263     while (complete_operations_)
00264     {
00265       op_base* next_op = complete_operations_->next_;
00266       complete_operations_->next_ = 0;
00267       complete_operations_->destroy();
00268       complete_operations_ = next_op;
00269     }
00270 
00271     typename operation_map::iterator i = operations_.begin();
00272     while (i != operations_.end())
00273     {
00274       typename operation_map::iterator op_iter = i++;
00275       op_base* curr_op = op_iter->second;
00276       operations_.erase(op_iter);
00277       while (curr_op)
00278       {
00279         op_base* next_op = curr_op->next_;
00280         curr_op->next_ = 0;
00281         curr_op->destroy();
00282         curr_op = next_op;
00283       }
00284     }
00285   }
00286 
00287 private:
00288   // Base class for reactor operations. A function pointer is used instead of
00289   // virtual functions to avoid the associated overhead.
00290   class op_base
00291   {
00292   public:
00293     // Get the descriptor associated with the operation.
00294     Descriptor descriptor() const
00295     {
00296       return descriptor_;
00297     }
00298 
00299     // Perform the operation.
00300     bool perform(const asio::error_code& result)
00301     {
00302       result_ = result;
00303       return perform_func_(this, result_, bytes_transferred_);
00304     }
00305 
00306     // Destroy the operation and post the handler.
00307     void complete()
00308     {
00309       complete_func_(this, result_, bytes_transferred_);
00310     }
00311 
00312     // Destroy the operation.
00313     void destroy()
00314     {
00315       destroy_func_(this);
00316     }
00317 
00318   protected:
00319     typedef bool (*perform_func_type)(op_base*,
00320         asio::error_code&, std::size_t&);
00321     typedef void (*complete_func_type)(op_base*,
00322         const asio::error_code&, std::size_t);
00323     typedef void (*destroy_func_type)(op_base*);
00324 
00325     // Construct an operation for the given descriptor.
00326     op_base(perform_func_type perform_func, complete_func_type complete_func,
00327         destroy_func_type destroy_func, Descriptor descriptor)
00328       : perform_func_(perform_func),
00329         complete_func_(complete_func),
00330         destroy_func_(destroy_func),
00331         descriptor_(descriptor),
00332         result_(),
00333         bytes_transferred_(0),
00334         next_(0)
00335     {
00336     }
00337 
00338     // Prevent deletion through this type.
00339     ~op_base()
00340     {
00341     }
00342 
00343   private:
00344     friend class reactor_op_queue<Descriptor>;
00345 
00346     // The function to be called to perform the operation.
00347     perform_func_type perform_func_;
00348 
00349     // The function to be called to delete the operation and post the handler.
00350     complete_func_type complete_func_;
00351 
00352     // The function to be called to delete the operation.
00353     destroy_func_type destroy_func_;
00354 
00355     // The descriptor associated with the operation.
00356     Descriptor descriptor_;
00357 
00358     // The result of the operation.
00359     asio::error_code result_;
00360 
00361     // The number of bytes transferred in the operation.
00362     std::size_t bytes_transferred_;
00363 
00364     // The next operation for the same file descriptor.
00365     op_base* next_;
00366   };
00367 
00368   // Adaptor class template for operations.
00369   template <typename Operation>
00370   class op
00371     : public op_base
00372   {
00373   public:
00374     // Constructor.
00375     op(Descriptor descriptor, Operation operation)
00376       : op_base(&op<Operation>::do_perform, &op<Operation>::do_complete,
00377           &op<Operation>::do_destroy, descriptor),
00378         operation_(operation)
00379     {
00380     }
00381 
00382     // Perform the operation.
00383     static bool do_perform(op_base* base,
00384         asio::error_code& result, std::size_t& bytes_transferred)
00385     {
00386       return static_cast<op<Operation>*>(base)->operation_.perform(
00387           result, bytes_transferred);
00388     }
00389 
00390     // Destroy the operation and post the handler.
00391     static void do_complete(op_base* base,
00392         const asio::error_code& result, std::size_t bytes_transferred)
00393     {
00394       // Take ownership of the operation object.
00395       typedef op<Operation> this_type;
00396       this_type* this_op(static_cast<this_type*>(base));
00397       typedef handler_alloc_traits<Operation, this_type> alloc_traits;
00398       handler_ptr<alloc_traits> ptr(this_op->operation_, this_op);
00399 
00400       // Make a copy of the error_code and the operation so that the memory can
00401       // be deallocated before the upcall is made.
00402       asio::error_code ec(result);
00403       Operation operation(this_op->operation_);
00404 
00405       // Free the memory associated with the operation.
00406       ptr.reset();
00407 
00408       // Make the upcall.
00409       operation.complete(ec, bytes_transferred);
00410     }
00411 
00412     // Destroy the operation.
00413     static void do_destroy(op_base* base)
00414     {
00415       // Take ownership of the operation object.
00416       typedef op<Operation> this_type;
00417       this_type* this_op(static_cast<this_type*>(base));
00418       typedef handler_alloc_traits<Operation, this_type> alloc_traits;
00419       handler_ptr<alloc_traits> ptr(this_op->operation_, this_op);
00420 
00421       // A sub-object of the operation may be the true owner of the memory
00422       // associated with the operation. Consequently, a local copy of the
00423       // operation is required to ensure that any owning sub-object remains
00424       // valid until after we have deallocated the memory here.
00425       Operation operation(this_op->operation_);
00426       (void)operation;
00427 
00428       // Free the memory associated with the operation.
00429       ptr.reset();
00430     }
00431 
00432   private:
00433     Operation operation_;
00434   };
00435 
00436   // The type for a map of operations.
00437   typedef hash_map<Descriptor, op_base*> operation_map;
00438 
00439   // The operations that are currently executing asynchronously.
00440   operation_map operations_;
00441 
00442   // The list of operations that have been cancelled.
00443   op_base* cancelled_operations_;
00444 
00445   // The list of operations waiting to be completed.
00446   op_base* complete_operations_;
00447 };
00448 
00449 } // namespace detail
00450 } // namespace asio
00451 
00452 #include "asio/detail/pop_options.hpp"
00453 
00454 #endif // ASIO_DETAIL_REACTOR_OP_QUEUE_HPP
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines


Castor
Author(s): Carpe Noctem
autogenerated on Fri Nov 8 2013 11:05:39