00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #pragma once
00021
00022 #include "namespace-inl.h"
00023 #include "client.h"
00024 #include "../bson/util/atomic_int.h"
00025 #include "../util/concurrency/spin_lock.h"
00026 #include "../util/time_support.h"
00027 #include "db.h"
00028 #include "../scripting/engine.h"
00029
00030 namespace mongo {
00031
00032
00033 class OpDebug {
00034 public:
00035 StringBuilder str;
00036 void reset() { str.reset(); }
00037 };
00038
00044 class CachedBSONObj {
00045 public:
00046 enum { TOO_BIG_SENTINEL = 1 } ;
00047 static BSONObj _tooBig;
00048
00049 CachedBSONObj() {
00050 _size = (int*)_buf;
00051 reset();
00052 }
00053
00054 void reset( int sz = 0 ) {
00055 _lock.lock();
00056 _reset( sz );
00057 _lock.unlock();
00058 }
00059
00060 void set( const BSONObj& o ) {
00061 _lock.lock();
00062 try {
00063 int sz = o.objsize();
00064
00065 if ( sz > (int) sizeof(_buf) ) {
00066 _reset(TOO_BIG_SENTINEL);
00067 }
00068 else {
00069 memcpy(_buf, o.objdata(), sz );
00070 }
00071
00072 _lock.unlock();
00073 }
00074 catch ( ... ) {
00075 _lock.unlock();
00076 throw;
00077 }
00078
00079 }
00080
00081 int size() const { return *_size; }
00082 bool have() const { return size() > 0; }
00083
00084 BSONObj get() {
00085 _lock.lock();
00086 BSONObj o;
00087 try {
00088 o = _get();
00089 _lock.unlock();
00090 }
00091 catch ( ... ) {
00092 _lock.unlock();
00093 throw;
00094 }
00095 return o;
00096 }
00097
00098 void append( BSONObjBuilder& b , const StringData& name ) {
00099 _lock.lock();
00100 try {
00101 BSONObj temp = _get();
00102 b.append( name , temp );
00103 _lock.unlock();
00104 }
00105 catch ( ... ) {
00106 _lock.unlock();
00107 throw;
00108 }
00109 }
00110
00111 private:
00113 BSONObj _get() {
00114 int sz = size();
00115 if ( sz == 0 )
00116 return BSONObj();
00117 if ( sz == TOO_BIG_SENTINEL )
00118 return _tooBig;
00119 return BSONObj( _buf ).copy();
00120 }
00121
00123 void _reset( int sz ) { _size[0] = sz; }
00124
00125 SpinLock _lock;
00126 int * _size;
00127 char _buf[512];
00128 };
00129
00130
00131
00132
00133 class CurOp : boost::noncopyable {
00134 public:
00135 CurOp( Client * client , CurOp * wrapped = 0 );
00136 ~CurOp();
00137
00138 bool haveQuery() const { return _query.have(); }
00139 BSONObj query() { return _query.get(); }
00140
00141 void ensureStarted() {
00142 if ( _start == 0 )
00143 _start = _checkpoint = curTimeMicros64();
00144 }
00145 void enter( Client::Context * context ) {
00146 ensureStarted();
00147 setNS( context->ns() );
00148 if ( context->_db && context->_db->profile > _dbprofile )
00149 _dbprofile = context->_db->profile;
00150 }
00151
00152 void leave( Client::Context * context ) {
00153 unsigned long long now = curTimeMicros64();
00154 Top::global.record( _ns , _op , _lockType , now - _checkpoint , _command );
00155 _checkpoint = now;
00156 }
00157
00158 void reset() {
00159 _reset();
00160 _start = _checkpoint = 0;
00161 _active = true;
00162 _opNum = _nextOpNum++;
00163 _ns[0] = '?';
00164 _debug.reset();
00165 _query.reset();
00166 }
00167
00168 void reset( const SockAddr & remote, int op ) {
00169 reset();
00170 _remote = remote;
00171 _op = op;
00172 }
00173
00174 void markCommand() { _command = true; }
00175
00176 void waitingForLock( int type ) {
00177 _waitingForLock = true;
00178 if ( type > 0 )
00179 _lockType = 1;
00180 else
00181 _lockType = -1;
00182 }
00183 void gotLock() { _waitingForLock = false; }
00184 OpDebug& debug() { return _debug; }
00185 int profileLevel() const { return _dbprofile; }
00186 const char * getNS() const { return _ns; }
00187
00188 bool shouldDBProfile( int ms ) const {
00189 if ( _dbprofile <= 0 )
00190 return false;
00191
00192 return _dbprofile >= 2 || ms >= cmdLine.slowMS;
00193 }
00194
00195 AtomicUInt opNum() const { return _opNum; }
00196
00198 bool active() const { return _active; }
00199
00200 int getLockType() const { return _lockType; }
00201 bool isWaitingForLock() const { return _waitingForLock; }
00202 int getOp() const { return _op; }
00203
00205 unsigned long long startTime() {
00206 ensureStarted();
00207 return _start;
00208 }
00209
00210 void done() {
00211 _active = false;
00212 _end = curTimeMicros64();
00213 }
00214
00215 unsigned long long totalTimeMicros() {
00216 massert( 12601 , "CurOp not marked done yet" , ! _active );
00217 return _end - startTime();
00218 }
00219
00220 int totalTimeMillis() { return (int) (totalTimeMicros() / 1000); }
00221
00222 int elapsedMillis() {
00223 unsigned long long total = curTimeMicros64() - startTime();
00224 return (int) (total / 1000);
00225 }
00226
00227 int elapsedSeconds() { return elapsedMillis() / 1000; }
00228
00229 void setQuery(const BSONObj& query) { _query.set( query ); }
00230
00231 Client * getClient() const { return _client; }
00232
00233 BSONObj info() {
00234 if( ! cc().getAuthenticationInfo()->isAuthorized("admin") ) {
00235 BSONObjBuilder b;
00236 b.append("err", "unauthorized");
00237 return b.obj();
00238 }
00239 return infoNoauth();
00240 }
00241
00242 BSONObj infoNoauth();
00243
00244 string getRemoteString( bool includePort = true ) { return _remote.toString(includePort); }
00245
00246 ProgressMeter& setMessage( const char * msg , unsigned long long progressMeterTotal = 0 , int secondsBetween = 3 ) {
00247 if ( progressMeterTotal ) {
00248 if ( _progressMeter.isActive() ) {
00249 cout << "about to assert, old _message: " << _message << " new message:" << msg << endl;
00250 assert( ! _progressMeter.isActive() );
00251 }
00252 _progressMeter.reset( progressMeterTotal , secondsBetween );
00253 }
00254 else {
00255 _progressMeter.finished();
00256 }
00257
00258 _message = msg;
00259
00260 return _progressMeter;
00261 }
00262
00263 string getMessage() const { return _message.toString(); }
00264 ProgressMeter& getProgressMeter() { return _progressMeter; }
00265 CurOp *parent() const { return _wrapped; }
00266 void kill() { _killed = true; }
00267 bool killed() const { return _killed; }
00268 void setNS(const char *ns) {
00269 strncpy(_ns, ns, Namespace::MaxNsLen);
00270 _ns[Namespace::MaxNsLen] = 0;
00271 }
00272 friend class Client;
00273
00274 private:
00275 static AtomicUInt _nextOpNum;
00276 Client * _client;
00277 CurOp * _wrapped;
00278 unsigned long long _start;
00279 unsigned long long _checkpoint;
00280 unsigned long long _end;
00281 bool _active;
00282 int _op;
00283 bool _command;
00284 int _lockType;
00285 bool _waitingForLock;
00286 int _dbprofile;
00287 AtomicUInt _opNum;
00288 char _ns[Namespace::MaxNsLen+2];
00289 struct SockAddr _remote;
00290 CachedBSONObj _query;
00291 OpDebug _debug;
00292 ThreadSafeString _message;
00293 ProgressMeter _progressMeter;
00294 volatile bool _killed;
00295
00296 void _reset() {
00297 _command = false;
00298 _lockType = 0;
00299 _dbprofile = 0;
00300 _end = 0;
00301 _waitingForLock = false;
00302 _message = "";
00303 _progressMeter.finished();
00304 _killed = false;
00305 }
00306 };
00307
00308
00309
00310
00311
00312
00313 extern class KillCurrentOp {
00314 public:
00315 void killAll();
00316 void kill(AtomicUInt i);
00317
00319 bool globalInterruptCheck() const { return _globalKill; }
00320
00321 void checkForInterrupt( bool heedMutex = true ) {
00322 if ( heedMutex && dbMutex.isWriteLocked() )
00323 return;
00324 if( _globalKill )
00325 uasserted(11600,"interrupted at shutdown");
00326 if( cc().curop()->killed() )
00327 uasserted(11601,"interrupted");
00328 }
00329
00331 const char *checkForInterruptNoAssert( bool heedMutex = true ) {
00332 if ( heedMutex && dbMutex.isWriteLocked() )
00333 return "";
00334 if( _globalKill )
00335 return "interrupted at shutdown";
00336 if( cc().curop()->killed() )
00337 return "interrupted";
00338 return "";
00339 }
00340
00341 private:
00342 void interruptJs( AtomicUInt *op );
00343 volatile bool _globalKill;
00344 } killCurrentOp;
00345
00346 }