00001
00002
00003
00020 #pragma once
00021
00022 #include "../util/alignedbuilder.h"
00023 #include "../util/mongoutils/hash.h"
00024 #include "../util/concurrency/synchronization.h"
00025 #include "cmdline.h"
00026 #include "durop.h"
00027 #include "dur.h"
00028 #include "taskqueue.h"
00029
00030
00031
00032 namespace mongo {
00033 namespace dur {
00034
00040 struct WriteIntent {
00041 WriteIntent() : w_ptr(0), p(0) { }
00042 WriteIntent(void *a, unsigned b) : w_ptr(0), p((char*)a+b), len(b) { }
00043
00044 void* start() const { return (char*)p - len; }
00045 void* end() const { return p; }
00046 unsigned length() const { return len; }
00047
00048 bool operator < (const WriteIntent& rhs) const { return end() < rhs.end(); }
00049
00050
00051 bool overlaps(const WriteIntent& rhs) const {
00052 return (start() <= rhs.end() && end() >= rhs.start());
00053 }
00054
00055
00056 bool contains(const WriteIntent& rhs) const {
00057 return (start() <= rhs.start() && end() >= rhs.end());
00058 }
00059
00060
00061 void absorb(const WriteIntent& other);
00062
00063 friend ostream& operator << (ostream& out, const WriteIntent& wi) {
00064 return (out << "p: " << wi.p << " end: " << wi.end() << " len: " << wi.len);
00065 }
00066
00067 mutable void *w_ptr;
00068
00069 #if defined(_EXPERIMENTAL)
00070 mutable unsigned ofsInJournalBuffer;
00071 #endif
00072 private:
00073 void *p;
00074 unsigned len;
00075 };
00076
00080 template<int Prime>
00081 class Already : boost::noncopyable {
00082 public:
00083 Already() { clear(); }
00084 void clear() { memset(this, 0, sizeof(*this)); }
00085
00086
00087
00088
00089
00090 bool checkAndSet(void* p, int len) {
00091 unsigned x = mongoutils::hashPointer(p);
00092 pair<void*, int> nd = nodes[x % N];
00093 if( nd.first == p ) {
00094 if( nd.second < len ) {
00095 nd.second = len;
00096 return false;
00097 }
00098 return true;
00099 }
00100 nd.first = p;
00101 nd.second = len;
00102 return false;
00103 }
00104
00105 private:
00106 enum { N = Prime };
00107 pair<void*,int> nodes[N];
00108 };
00109
00111 class Writes : boost::noncopyable {
00112 struct D {
00113 void *p;
00114 unsigned len;
00115 static void go(const D& d);
00116 };
00117 public:
00118 TaskQueue<D> _deferred;
00119 Already<127> _alreadyNoted;
00120 set<WriteIntent> _writes;
00121 vector< shared_ptr<DurOp> > _ops;
00122 bool _drained;
00123
00125 void clear();
00126
00128 void _insertWriteIntent(void* p, int len);
00129
00130 void insertWriteIntent(void* p, int len) {
00131 #if defined(DEBUG_WRITE_INTENT)
00132 if( _debug[p] < len )
00133 _debug[p] = len;
00134 #endif
00135 D d;
00136 d.p = p;
00137 d.len = len;
00138 _deferred.defer(d);
00139 }
00140
00141 #ifdef _DEBUG
00142 WriteIntent _last;
00143 #endif
00144 #if defined(DEBUG_WRITE_INTENT)
00145 map<void*,int> _debug;
00146 #endif
00147 };
00148
00149 #if defined(DEBUG_WRITE_INTENT)
00150 void assertAlreadyDeclared(void *, int len);
00151 #else
00152 inline void assertAlreadyDeclared(void *, int len) { }
00153 #endif
00154
00161 class CommitJob : boost::noncopyable {
00162 public:
00163 AlignedBuilder _ab;
00164
00165 CommitJob();
00166
00168 void note(void* p, int len);
00169
00171 void noteOp(shared_ptr<DurOp> p);
00172
00173 set<WriteIntent>& writes() {
00174 if( !_wi._drained ) {
00175
00176
00177 assert(false);
00178 }
00179 return _wi._writes;
00180 }
00181
00182 vector< shared_ptr<DurOp> >& ops() { return _wi._ops; }
00183
00187 bool hasWritten() const { return _hasWritten; }
00188
00190 void reset();
00191
00193 void notifyCommitted() { _notify.notifyAll(); }
00194
00196 void awaitNextCommit() {
00197 if( hasWritten() )
00198 _notify.wait();
00199 }
00200
00202 size_t bytes() const { return _bytes; }
00203
00204 #if defined(_DEBUG)
00205 const WriteIntent& lastWrite() const { return _wi._last; }
00206 #endif
00207
00208 Writes& wi() { return _wi; }
00209 private:
00210 bool _hasWritten;
00211 Writes _wi;
00212 size_t _bytes;
00213 NotifyAll _notify;
00214 public:
00215 unsigned _nSinceCommitIfNeededCall;
00216 };
00217
00218 extern CommitJob commitJob;
00219
00220 }
00221 }