00001 // mvar.h 00002 00003 /* Copyright 2009 10gen Inc. 00004 * 00005 * Licensed under the Apache License, Version 2.0 (the "License"); 00006 * you may not use this file except in compliance with the License. 00007 * You may obtain a copy of the License at 00008 * 00009 * http://www.apache.org/licenses/LICENSE-2.0 00010 * 00011 * Unless required by applicable law or agreed to in writing, software 00012 * distributed under the License is distributed on an "AS IS" BASIS, 00013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 00014 * See the License for the specific language governing permissions and 00015 * limitations under the License. 00016 */ 00017 00018 namespace mongo { 00019 00020 /* This is based on haskell's MVar synchronization primitive: 00021 * http://www.haskell.org/ghc/docs/latest/html/libraries/base-4.2.0.0/Control-Concurrent-MVar.html 00022 * 00023 * It is a thread-safe queue that can hold at most one object. 00024 * You can also think of it as a box that can be either full or empty. 00025 */ 00026 00027 template <typename T> 00028 class MVar { 00029 public: 00030 enum State {EMPTY=0, FULL}; 00031 00032 // create an empty MVar 00033 MVar() 00034 : _state(EMPTY) 00035 {} 00036 00037 // creates a full MVar 00038 MVar(const T& val) 00039 : _state(FULL) 00040 , _value(val) 00041 {} 00042 00043 // puts val into the MVar and returns true or returns false if full 00044 // never blocks 00045 bool tryPut(const T& val) { 00046 // intentionally repeat test before and after lock 00047 if (_state == FULL) return false; 00048 Mutex::scoped_lock lock(_mutex); 00049 if (_state == FULL) return false; 00050 00051 _state = FULL; 00052 _value = val; 00053 00054 // unblock threads waiting to 'take' 00055 _condition.notify_all(); 00056 00057 return true; 00058 } 00059 00060 // puts val into the MVar 00061 // will block if the MVar is already full 00062 void put(const T& val) { 00063 Mutex::scoped_lock lock(_mutex); 00064 while (!tryPut(val)) { 00065 // unlocks lock while waiting and relocks before returning 00066 _condition.wait(lock); 00067 } 00068 } 00069 00070 // takes val out of the MVar and returns true or returns false if empty 00071 // never blocks 00072 bool tryTake(T& out) { 00073 // intentionally repeat test before and after lock 00074 if (_state == EMPTY) return false; 00075 Mutex::scoped_lock lock(_mutex); 00076 if (_state == EMPTY) return false; 00077 00078 _state = EMPTY; 00079 out = _value; 00080 00081 // unblock threads waiting to 'put' 00082 _condition.notify_all(); 00083 00084 return true; 00085 } 00086 00087 // takes val out of the MVar 00088 // will block if the MVar is empty 00089 T take() { 00090 T ret = T(); 00091 00092 Mutex::scoped_lock lock(_mutex); 00093 while (!tryTake(ret)) { 00094 // unlocks lock while waiting and relocks before returning 00095 _condition.wait(lock); 00096 } 00097 00098 return ret; 00099 } 00100 00101 00102 // Note: this is fast because there is no locking, but state could 00103 // change before you get a chance to act on it. 00104 // Mainly useful for sanity checks / asserts. 00105 State getState() { return _state; } 00106 00107 00108 private: 00109 State _state; 00110 T _value; 00111 typedef boost::recursive_mutex Mutex; 00112 Mutex _mutex; 00113 boost::condition _condition; 00114 }; 00115 00116 }