00001
00002
00019
00020
00021
00022
00023
00024 #pragma once
00025
00026 #include "pdfile.h"
00027 #include "db.h"
00028 #include "dbhelpers.h"
00029 #include "query.h"
00030 #include "queryoptimizer.h"
00031 #include "../client/dbclient.h"
00032 #include "../util/optime.h"
00033 #include "../util/timer.h"
00034
00035 namespace mongo {
00036
00037 void createOplog();
00038
00039 void _logOpObjRS(const BSONObj& op);
00040
00053 void logOp(const char *opstr, const char *ns, const BSONObj& obj, BSONObj *patt = 0, bool *b = 0);
00054
00055 void logKeepalive();
00056
00061 void logOpComment(const BSONObj& obj);
00062
00063 void oplogCheckCloseDatabase( Database * db );
00064
00065 extern int __findingStartInitialTimeout;
00066
00067 class FindingStartCursor {
00068 public:
00069 FindingStartCursor( const QueryPlan & qp ) :
00070 _qp( qp ),
00071 _findingStart( true ),
00072 _findingStartMode(),
00073 _findingStartTimer( 0 )
00074 { init(); }
00075 bool done() const { return !_findingStart; }
00076 shared_ptr<Cursor> cRelease() { return _c; }
00077 void next() {
00078 if ( !_findingStartCursor || !_findingStartCursor->ok() ) {
00079 _findingStart = false;
00080 _c = _qp.newCursor();
00081 destroyClientCursor();
00082 return;
00083 }
00084 switch( _findingStartMode ) {
00085 case Initial: {
00086 if ( !_matcher->matches( _findingStartCursor->currKey(), _findingStartCursor->currLoc() ) ) {
00087 _findingStart = false;
00088 _c = _qp.newCursor( _findingStartCursor->currLoc() );
00089 destroyClientCursor();
00090 return;
00091 }
00092 _findingStartCursor->advance();
00093 RARELY {
00094 if ( _findingStartTimer.seconds() >= __findingStartInitialTimeout ) {
00095 createClientCursor( startLoc( _findingStartCursor->currLoc() ) );
00096 _findingStartMode = FindExtent;
00097 return;
00098 }
00099 }
00100 return;
00101 }
00102 case FindExtent: {
00103 if ( !_matcher->matches( _findingStartCursor->currKey(), _findingStartCursor->currLoc() ) ) {
00104 _findingStartMode = InExtent;
00105 return;
00106 }
00107 DiskLoc prev = prevLoc( _findingStartCursor->currLoc() );
00108 if ( prev.isNull() ) {
00109 createClientCursor();
00110 _findingStartMode = InExtent;
00111 return;
00112 }
00113
00114
00115 createClientCursor( prev );
00116 return;
00117 }
00118 case InExtent: {
00119 if ( _matcher->matches( _findingStartCursor->currKey(), _findingStartCursor->currLoc() ) ) {
00120 _findingStart = false;
00121 _c = _qp.newCursor( _findingStartCursor->currLoc() );
00122 destroyClientCursor();
00123 return;
00124 }
00125 _findingStartCursor->advance();
00126 return;
00127 }
00128 default: {
00129 massert( 12600, "invalid _findingStartMode", false );
00130 }
00131 }
00132 }
00133 bool prepareToYield() {
00134 if ( _findingStartCursor ) {
00135 return _findingStartCursor->prepareToYield( _yieldData );
00136 }
00137 return true;
00138 }
00139 void recoverFromYield() {
00140 if ( _findingStartCursor ) {
00141 if ( !ClientCursor::recoverFromYield( _yieldData ) ) {
00142 _findingStartCursor.reset( 0 );
00143 }
00144 }
00145 }
00146 private:
00147 enum FindingStartMode { Initial, FindExtent, InExtent };
00148 const QueryPlan &_qp;
00149 bool _findingStart;
00150 FindingStartMode _findingStartMode;
00151 auto_ptr< CoveredIndexMatcher > _matcher;
00152 Timer _findingStartTimer;
00153 ClientCursor::CleanupPointer _findingStartCursor;
00154 shared_ptr<Cursor> _c;
00155 ClientCursor::YieldData _yieldData;
00156 DiskLoc startLoc( const DiskLoc &rec ) {
00157 Extent *e = rec.rec()->myExtent( rec );
00158 if ( !_qp.nsd()->capLooped() || ( e->myLoc != _qp.nsd()->capExtent ) )
00159 return e->firstRecord;
00160
00161
00162
00163 return _qp.nsd()->capFirstNewRecord;
00164 }
00165
00166
00167 DiskLoc prevLoc( const DiskLoc &rec ) {
00168 Extent *e = rec.rec()->myExtent( rec );
00169 if ( _qp.nsd()->capLooped() ) {
00170 if ( e->xprev.isNull() )
00171 e = _qp.nsd()->lastExtent.ext();
00172 else
00173 e = e->xprev.ext();
00174 if ( e->myLoc != _qp.nsd()->capExtent )
00175 return e->firstRecord;
00176 }
00177 else {
00178 if ( !e->xprev.isNull() ) {
00179 e = e->xprev.ext();
00180 return e->firstRecord;
00181 }
00182 }
00183 return DiskLoc();
00184 }
00185 void createClientCursor( const DiskLoc &startLoc = DiskLoc() ) {
00186 shared_ptr<Cursor> c = _qp.newCursor( startLoc );
00187 _findingStartCursor.reset( new ClientCursor(QueryOption_NoCursorTimeout, c, _qp.ns()) );
00188 }
00189 void destroyClientCursor() {
00190 _findingStartCursor.reset( 0 );
00191 }
00192 void init() {
00193
00194
00195 shared_ptr<Cursor> c = _qp.newReverseCursor();
00196 _findingStartCursor.reset( new ClientCursor(QueryOption_NoCursorTimeout, c, _qp.ns(), BSONObj()) );
00197 _findingStartTimer.reset();
00198 _findingStartMode = Initial;
00199 BSONElement tsElt = _qp.originalQuery()[ "ts" ];
00200 massert( 13044, "no ts field in query", !tsElt.eoo() );
00201 BSONObjBuilder b;
00202 b.append( tsElt );
00203 BSONObj tsQuery = b.obj();
00204 _matcher.reset(new CoveredIndexMatcher(tsQuery, _qp.indexKey()));
00205 }
00206 };
00207
00208 void pretouchOperation(const BSONObj& op);
00209 void pretouchN(vector<BSONObj>&, unsigned a, unsigned b);
00210
00216 void applyOperation_inlock(const BSONObj& op , bool fromRepl = true );
00217 }