00001 // @file deferredinvoker.h 00002 00019 #pragma once 00020 00021 #include "mongomutex.h" 00022 00023 namespace mongo { 00024 00041 template< class MT > 00042 class TaskQueue { 00043 public: 00044 TaskQueue() : _which(0), _invokeMutex("deferredinvoker") { } 00045 00046 void defer(MT mt) { 00047 // only one writer allowed. however the invoke processing below can occur concurrently with 00048 // writes (for the most part) 00049 DEV dbMutex.assertWriteLocked(); 00050 00051 _queues[_which].push_back(mt); 00052 } 00053 00070 void invoke() { 00071 mutex::scoped_lock lk2(_invokeMutex); 00072 int toDrain = 0; 00073 { 00074 // flip queueing to the other queue (we are double buffered) 00075 readlocktry lk("", 5); 00076 if( !lk.got() ) 00077 return; 00078 toDrain = _which; 00079 _which = _which ^ 1; 00080 wassert( _queues[_which].empty() ); // we are in dbMutex, so it should be/stay empty til we exit dbMutex 00081 } 00082 00083 _drain( _queues[toDrain] ); 00084 assert( _queues[toDrain].empty() ); 00085 } 00086 00087 private: 00088 int _which; // 0 or 1 00089 typedef vector< MT > Queue; 00090 Queue _queues[2]; 00091 00092 // lock order when multiple locks: dbMutex, _invokeMutex 00093 mongo::mutex _invokeMutex; 00094 00095 void _drain(Queue& queue) { 00096 unsigned oldCap = queue.capacity(); 00097 for( typename Queue::iterator i = queue.begin(); i != queue.end(); i++ ) { 00098 const MT& v = *i; 00099 MT::go(v); 00100 } 00101 queue.clear(); 00102 DEV assert( queue.capacity() == oldCap ); // just checking that clear() doesn't deallocate, we don't want that 00103 } 00104 }; 00105 00106 }