00001
00002
00019
00020
00021
00022
00023
00024
00025 #pragma once
00026
00027 #include "../pch.h"
00028 #include "cursor.h"
00029 #include "jsobj.h"
00030 #include "../util/message.h"
00031 #include "../util/background.h"
00032 #include "diskloc.h"
00033 #include "dbhelpers.h"
00034 #include "matcher.h"
00035 #include "../client/dbclient.h"
00036 #include "projection.h"
00037
00038 namespace mongo {
00039
00040 typedef long long CursorId;
00041 class Cursor;
00042 class ClientCursor;
00043 class ParsedQuery;
00044
00045 struct ByLocKey {
00046
00047 ByLocKey( const DiskLoc & l , const CursorId& i ) : loc(l), id(i) {}
00048
00049 static ByLocKey min( const DiskLoc& l ) { return ByLocKey( l , numeric_limits<long long>::min() ); }
00050 static ByLocKey max( const DiskLoc& l ) { return ByLocKey( l , numeric_limits<long long>::max() ); }
00051
00052 bool operator<( const ByLocKey &other ) const {
00053 int x = loc.compare( other.loc );
00054 if ( x )
00055 return x < 0;
00056 return id < other.id;
00057 }
00058
00059 DiskLoc loc;
00060 CursorId id;
00061
00062 };
00063
00064
00065
00066
00067 typedef map<CursorId, ClientCursor*> CCById;
00068 typedef map<ByLocKey, ClientCursor*> CCByLoc;
00069
00070 extern BSONObj id_obj;
00071
00072 class ClientCursor {
00073 friend class CmdCursorInfo;
00074 public:
00075 static void assertNoCursors();
00076
00077
00078
00079
00080
00081
00082
00083 class Pointer : boost::noncopyable {
00084 ClientCursor *_c;
00085 public:
00086 ClientCursor * c() { return _c; }
00087 void release() {
00088 if( _c ) {
00089 assert( _c->_pinValue >= 100 );
00090 _c->_pinValue -= 100;
00091 _c = 0;
00092 }
00093 }
00098 void deleted() {
00099 _c = 0;
00100 }
00101 ~Pointer() { release(); }
00102 Pointer(long long cursorid) {
00103 recursive_scoped_lock lock(ccmutex);
00104 _c = ClientCursor::find_inlock(cursorid, true);
00105 if( _c ) {
00106 if( _c->_pinValue >= 100 ) {
00107 _c = 0;
00108 uasserted(12051, "clientcursor already in use? driver problem?");
00109 }
00110 _c->_pinValue += 100;
00111 }
00112 }
00113 };
00114
00115
00116
00117
00118 class CleanupPointer : boost::noncopyable {
00119 public:
00120 CleanupPointer() : _c( 0 ), _id( -1 ) {}
00121 void reset( ClientCursor *c = 0 ) {
00122 if ( c == _c )
00123 return;
00124 if ( _c ) {
00125
00126 ClientCursor::erase( _id );
00127 }
00128 if ( c ) {
00129 _c = c;
00130 _id = c->_cursorid;
00131 }
00132 else {
00133 _c = 0;
00134 _id = -1;
00135 }
00136 }
00137 ~CleanupPointer() {
00138 DESTRUCTOR_GUARD ( reset(); );
00139 }
00140 operator bool() { return _c; }
00141 ClientCursor * operator-> () { return _c; }
00142 private:
00143 ClientCursor *_c;
00144 CursorId _id;
00145 };
00146
00147 ClientCursor(int queryOptions, const shared_ptr<Cursor>& c, const string& ns, BSONObj query = BSONObj() );
00148
00149 ~ClientCursor();
00150
00151
00152
00153 CursorId cursorid() const { return _cursorid; }
00154 string ns() const { return _ns; }
00155 Database * db() const { return _db; }
00156 const BSONObj& query() const { return _query; }
00157 int queryOptions() const { return _queryOptions; }
00158
00159 DiskLoc lastLoc() const { return _lastLoc; }
00160
00161
00162
00163
00164 static void invalidate(const char *nsPrefix);
00165
00177 bool yield( int microsToSleep = -1 );
00178
00182 bool yieldSometimes();
00183
00184 static int yieldSuggest();
00185 static void staticYield( int micros , const StringData& ns );
00186
00187 struct YieldData { CursorId _id; bool _doingDeletes; };
00188 bool prepareToYield( YieldData &data );
00189 static bool recoverFromYield( const YieldData &data );
00190
00191 struct YieldLock : boost::noncopyable {
00192 explicit YieldLock( ptr<ClientCursor> cc )
00193 : _canYield(cc->_c->supportYields()) {
00194 if ( _canYield ) {
00195 cc->prepareToYield( _data );
00196 _unlock.reset(new dbtempreleasecond());
00197 }
00198 }
00199 ~YieldLock() {
00200 if ( _unlock ) {
00201 log( LL_WARNING ) << "ClientCursor::YieldLock not closed properly" << endl;
00202 relock();
00203 }
00204 }
00205 bool stillOk() {
00206 if ( ! _canYield )
00207 return true;
00208 relock();
00209 return ClientCursor::recoverFromYield( _data );
00210 }
00211 void relock() {
00212 _unlock.reset();
00213 }
00214 private:
00215 const bool _canYield;
00216 YieldData _data;
00217 scoped_ptr<dbtempreleasecond> _unlock;
00218 };
00219
00220
00221
00222 Cursor* c() const { return _c.get(); }
00223 int pos() const { return _pos; }
00224
00225 void incPos( int n ) { _pos += n; }
00226 void setPos( int n ) { _pos = n; }
00227
00228 BSONObj indexKeyPattern() { return _c->indexKeyPattern(); }
00229 bool modifiedKeys() const { return _c->modifiedKeys(); }
00230 bool isMultiKey() const { return _c->isMultiKey(); }
00231
00232 bool ok() { return _c->ok(); }
00233 bool advance() { return _c->advance(); }
00234 BSONObj current() { return _c->current(); }
00235 DiskLoc currLoc() { return _c->currLoc(); }
00236 BSONObj currKey() const { return _c->currKey(); }
00237
00238
00244 bool getFieldsDotted( const string& name, BSONElementSet &ret );
00245
00251 BSONElement getFieldDotted( const string& name , bool * fromKey = 0 );
00252
00253 bool currentIsDup() { return _c->getsetdup( _c->currLoc() ); }
00254
00255 bool currentMatches() {
00256 if ( ! _c->matcher() )
00257 return true;
00258 return _c->matcher()->matchesCurrent( _c.get() );
00259 }
00260
00261 private:
00262 void setLastLoc_inlock(DiskLoc);
00263
00264 static ClientCursor* find_inlock(CursorId id, bool warn = true) {
00265 CCById::iterator it = clientCursorsById.find(id);
00266 if ( it == clientCursorsById.end() ) {
00267 if ( warn )
00268 OCCASIONALLY out() << "ClientCursor::find(): cursor not found in map " << id << " (ok after a drop)\n";
00269 return 0;
00270 }
00271 return it->second;
00272 }
00273 public:
00274 static ClientCursor* find(CursorId id, bool warn = true) {
00275 recursive_scoped_lock lock(ccmutex);
00276 ClientCursor *c = find_inlock(id, warn);
00277
00278
00279 massert( 12521, "internal error: use of an unlocked ClientCursor", c == 0 || c->_pinValue );
00280 return c;
00281 }
00282
00283 static bool erase(CursorId id) {
00284 recursive_scoped_lock lock(ccmutex);
00285 ClientCursor *cc = find_inlock(id);
00286 if ( cc ) {
00287 assert( cc->_pinValue < 100 );
00288 delete cc;
00289 return true;
00290 }
00291 return false;
00292 }
00293
00297 static int erase( int n , long long * ids );
00298
00299
00300
00301
00302
00303 void updateLocation();
00304
00305 void mayUpgradeStorage() {
00306
00307
00308
00309
00310
00311 }
00312
00316 bool shouldTimeout( unsigned millis );
00317
00318 void storeOpForSlave( DiskLoc last );
00319 void updateSlaveLocation( CurOp& curop );
00320
00321 unsigned idleTime() const { return _idleAgeMillis; }
00322
00323 void setDoingDeletes( bool doingDeletes ) {_doingDeletes = doingDeletes; }
00324
00325 void slaveReadTill( const OpTime& t ) { _slaveReadTill = t; }
00326
00327 public:
00328
00329 static void idleTimeReport(unsigned millis);
00330
00331 static void appendStats( BSONObjBuilder& result );
00332 static unsigned numCursors() { return clientCursorsById.size(); }
00333 static void informAboutToDeleteBucket(const DiskLoc& b);
00334 static void aboutToDelete(const DiskLoc& dl);
00335 static void find( const string& ns , set<CursorId>& all );
00336
00337
00338 private:
00339
00340
00341
00342 void noTimeout() { _pinValue++; }
00343
00344 CCByLoc& byLoc() { return _db->ccByLoc; }
00345
00346 private:
00347
00348 CursorId _cursorid;
00349
00350 const string _ns;
00351 Database * _db;
00352
00353 const shared_ptr<Cursor> _c;
00354 map<string,int> _indexedFields;
00355 int _pos;
00356
00357 const BSONObj _query;
00358 int _queryOptions;
00359
00360 OpTime _slaveReadTill;
00361
00362 DiskLoc _lastLoc;
00363 unsigned _idleAgeMillis;
00364
00365
00366
00367
00368
00369 unsigned _pinValue;
00370
00371 bool _doingDeletes;
00372 ElapsedTracker _yieldSometimesTracker;
00373
00374 public:
00375 shared_ptr<ParsedQuery> pq;
00376 shared_ptr<Projection> fields;
00377 Message originalMessage;
00378
00379
00380
00381 private:
00382
00383 static CCById clientCursorsById;
00384 static long long numberTimedOut;
00385 static boost::recursive_mutex ccmutex;
00386 static CursorId allocCursorId_inlock();
00387
00388 };
00389
00390 class ClientCursorMonitor : public BackgroundJob {
00391 public:
00392 string name() const { return "ClientCursorMonitor"; }
00393 void run();
00394 };
00395
00396 extern ClientCursorMonitor clientCursorMonitor;
00397
00398 }