00001
00002
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028 #pragma once
00029
00030 #include "pdfile.h"
00031 #include "db.h"
00032 #include "dbhelpers.h"
00033 #include "query.h"
00034 #include "queryoptimizer.h"
00035 #include "../client/dbclient.h"
00036 #include "../util/optime.h"
00037 #include "oplog.h"
00038 #include "../util/concurrency/thread_pool.h"
00039 #include "oplogreader.h"
00040
00041 namespace mongo {
00042
00043
00044
00045
00046 typedef enum { NotSlave=0, SimpleSlave, ReplPairSlave } SlaveTypes;
00047
00048 class ReplSettings {
00049 public:
00050 SlaveTypes slave;
00051
00052
00053
00054
00055 bool master;
00056
00057 int opIdMem;
00058
00059 bool fastsync;
00060
00061 bool autoresync;
00062
00063 int slavedelay;
00064
00065 ReplSettings()
00066 : slave(NotSlave) , master(false) , opIdMem(100000000) , fastsync() , autoresync(false), slavedelay() {
00067 }
00068
00069 };
00070
00071 extern ReplSettings replSettings;
00072
00073 bool cloneFrom(const char *masterHost, string& errmsg, const string& fromdb, bool logForReplication,
00074 bool slaveOk, bool useReplAuth, bool snapshot);
00075
00076
00077 class SyncException : public DBException {
00078 public:
00079 SyncException() : DBException( "sync exception" , 10001 ) {}
00080 };
00081
00082
00083
00084
00085
00086
00087
00088
00089
00090
00091
00092 class ReplSource {
00093 auto_ptr<ThreadPool> tp;
00094
00095 bool resync(string db);
00096
00098 void sync_pullOpLog_applyOperation(BSONObj& op, OpTime *localLogTail, bool alreadyLocked);
00099
00100
00101
00102
00103 int sync_pullOpLog(int& nApplied);
00104
00105
00106
00107
00108
00109
00110 set<string> addDbNextPass;
00111
00112 set<string> incompleteCloneDbs;
00113
00114 ReplSource();
00115
00116
00117 string resyncDrop( const char *db, const char *requester );
00118
00119 static BSONObj idForOp( const BSONObj &op, bool &mod );
00120 static void updateSetsWithOp( const BSONObj &op, bool mayUpdateStorage );
00121
00122 void syncToTailOfRemoteLog();
00123
00124 OpTime nextLastSavedLocalTs() const;
00125 void setLastSavedLocalTs( const OpTime &nextLocalTs );
00126
00127 void resetSlave();
00128
00129
00130 bool updateSetsWithLocalOps( OpTime &localLogTail, bool mayUnlock );
00131 string ns() const { return string( "local.oplog.$" ) + sourceName(); }
00132 unsigned _sleepAdviceTime;
00133
00134 public:
00135 OplogReader oplogReader;
00136
00137 static void applyOperation(const BSONObj& op);
00138 bool replacing;
00139 bool paired;
00140 string hostName;
00141 string _sourceName;
00142 string sourceName() const { return _sourceName.empty() ? "main" : _sourceName; }
00143 string only;
00144
00145
00146 OpTime syncedTo;
00147
00148
00149
00150
00151
00152
00153
00154 OpTime _lastSavedLocalTs;
00155
00156 int nClonedThisPass;
00157
00158 typedef vector< shared_ptr< ReplSource > > SourceVector;
00159 static void loadAll(SourceVector&);
00160 explicit ReplSource(BSONObj);
00161
00162
00163 int sync(int& nApplied);
00164
00165 void save();
00166
00167
00168
00169 BSONObj jsobj();
00170
00171 bool operator==(const ReplSource&r) const {
00172 return hostName == r.hostName && sourceName() == r.sourceName();
00173 }
00174 string toString() const { return sourceName() + "@" + hostName; }
00175
00176 bool haveMoreDbsToSync() const { return !addDbNextPass.empty(); }
00177 int sleepAdvice() const {
00178 if ( !_sleepAdviceTime )
00179 return 0;
00180 int wait = _sleepAdviceTime - unsigned( time( 0 ) );
00181 return wait > 0 ? wait : 0;
00182 }
00183
00184 static bool throttledForceResyncDead( const char *requester );
00185 static void forceResyncDead( const char *requester );
00186 void forceResync( const char *requester );
00187 };
00188
00189
00190 class MemIds {
00191 public:
00192 MemIds() : size_() {}
00193 friend class IdTracker;
00194 void reset() {
00195 imp_.clear();
00196 size_ = 0;
00197 }
00198 bool get( const char *ns, const BSONObj &id ) { return imp_[ ns ].count( id ); }
00199 void set( const char *ns, const BSONObj &id, bool val ) {
00200 if ( val ) {
00201 if ( imp_[ ns ].insert( id.getOwned() ).second ) {
00202 size_ += id.objsize() + sizeof( BSONObj );
00203 }
00204 }
00205 else {
00206 if ( imp_[ ns ].erase( id ) == 1 ) {
00207 size_ -= id.objsize() + sizeof( BSONObj );
00208 }
00209 }
00210 }
00211 long long roughSize() const {
00212 return size_;
00213 }
00214 private:
00215 typedef map< string, BSONObjSetDefaultOrder > IdSets;
00216 IdSets imp_;
00217 long long size_;
00218 };
00219
00220
00221
00222 class DbIds {
00223 public:
00224 DbIds( const string & name ) : impl_( name, BSON( "ns" << 1 << "id" << 1 ) ) {}
00225 void reset() {
00226 impl_.reset();
00227 }
00228 bool get( const char *ns, const BSONObj &id ) {
00229 return impl_.get( key( ns, id ) );
00230 }
00231 void set( const char *ns, const BSONObj &id, bool val ) {
00232 impl_.set( key( ns, id ), val );
00233 }
00234 private:
00235 static BSONObj key( const char *ns, const BSONObj &id ) {
00236 BSONObjBuilder b;
00237 b << "ns" << ns;
00238
00239 b.appendAs( id.firstElement(), "id" );
00240 return b.obj();
00241 }
00242 DbSet impl_;
00243 };
00244
00245
00246
00247
00248
00249
00250 class IdTracker {
00251 public:
00252 IdTracker() :
00253 dbIds_( "local.temp.replIds" ),
00254 dbModIds_( "local.temp.replModIds" ),
00255 inMem_( true ),
00256 maxMem_( replSettings.opIdMem ) {
00257 }
00258 void reset( int maxMem = replSettings.opIdMem ) {
00259 memIds_.reset();
00260 memModIds_.reset();
00261 dbIds_.reset();
00262 dbModIds_.reset();
00263 maxMem_ = maxMem;
00264 inMem_ = true;
00265 }
00266 bool haveId( const char *ns, const BSONObj &id ) {
00267 if ( inMem_ )
00268 return get( memIds_, ns, id );
00269 else
00270 return get( dbIds_, ns, id );
00271 }
00272 bool haveModId( const char *ns, const BSONObj &id ) {
00273 if ( inMem_ )
00274 return get( memModIds_, ns, id );
00275 else
00276 return get( dbModIds_, ns, id );
00277 }
00278 void haveId( const char *ns, const BSONObj &id, bool val ) {
00279 if ( inMem_ )
00280 set( memIds_, ns, id, val );
00281 else
00282 set( dbIds_, ns, id, val );
00283 }
00284 void haveModId( const char *ns, const BSONObj &id, bool val ) {
00285 if ( inMem_ )
00286 set( memModIds_, ns, id, val );
00287 else
00288 set( dbModIds_, ns, id, val );
00289 }
00290
00291 void mayUpgradeStorage() {
00292 if ( !inMem_ || memIds_.roughSize() + memModIds_.roughSize() <= maxMem_ )
00293 return;
00294 log() << "saving master modified id information to collection" << endl;
00295 upgrade( memIds_, dbIds_ );
00296 upgrade( memModIds_, dbModIds_ );
00297 memIds_.reset();
00298 memModIds_.reset();
00299 inMem_ = false;
00300 }
00301 bool inMem() const { return inMem_; }
00302 private:
00303 template< class T >
00304 bool get( T &ids, const char *ns, const BSONObj &id ) {
00305 return ids.get( ns, id );
00306 }
00307 template< class T >
00308 void set( T &ids, const char *ns, const BSONObj &id, bool val ) {
00309 ids.set( ns, id, val );
00310 }
00311 void upgrade( MemIds &a, DbIds &b ) {
00312 for( MemIds::IdSets::const_iterator i = a.imp_.begin(); i != a.imp_.end(); ++i ) {
00313 for( BSONObjSetDefaultOrder::const_iterator j = i->second.begin(); j != i->second.end(); ++j ) {
00314 set( b, i->first.c_str(), *j, true );
00315 RARELY {
00316 dbtemprelease t;
00317 }
00318 }
00319 }
00320 }
00321 MemIds memIds_;
00322 MemIds memModIds_;
00323 DbIds dbIds_;
00324 DbIds dbModIds_;
00325 bool inMem_;
00326 int maxMem_;
00327 };
00328
00329 bool anyReplEnabled();
00330 void appendReplicationInfo( BSONObjBuilder& result , bool authed , int level = 0 );
00331
00332
00333 }