00001
00002
00019 #pragma once
00020
00021 #include "../../util/concurrency/list.h"
00022 #include "../../util/concurrency/value.h"
00023 #include "../../util/concurrency/msg.h"
00024 #include "../../util/hostandport.h"
00025 #include "../commands.h"
00026 #include "rs_exception.h"
00027 #include "rs_optime.h"
00028 #include "rs_member.h"
00029 #include "rs_config.h"
00030
00031 namespace mongo {
00032
00033 struct HowToFixUp;
00034 struct Target;
00035 class DBClientConnection;
00036 class ReplSetImpl;
00037 class OplogReader;
00038 extern bool replSet;
00039 extern class ReplSet *theReplSet;
00040 extern Tee *rsLog;
00041
00042
00043 class Member : public List1<Member>::Base {
00044 public:
00045 Member(HostAndPort h, unsigned ord, const ReplSetConfig::MemberCfg *c, bool self);
00046
00047 string fullName() const { return h().toString(); }
00048 const ReplSetConfig::MemberCfg& config() const { return _config; }
00049 const HeartbeatInfo& hbinfo() const { return _hbinfo; }
00050 HeartbeatInfo& get_hbinfo() { return _hbinfo; }
00051 string lhb() const { return _hbinfo.lastHeartbeatMsg; }
00052 MemberState state() const { return _hbinfo.hbstate; }
00053 const HostAndPort& h() const { return _h; }
00054 unsigned id() const { return _hbinfo.id(); }
00055
00056 bool potentiallyHot() const { return _config.potentiallyHot(); }
00057 void summarizeMember(stringstream& s) const;
00058
00059 private:
00060 friend class ReplSetImpl;
00061 const ReplSetConfig::MemberCfg _config;
00062 const HostAndPort _h;
00063 HeartbeatInfo _hbinfo;
00064 };
00065
00066 class Manager : public task::Server {
00067 ReplSetImpl *rs;
00068 bool busyWithElectSelf;
00069 int _primary;
00070
00075 const Member* findOtherPrimary(bool& two);
00076
00077 void noteARemoteIsPrimary(const Member *);
00078 virtual void starting();
00079 public:
00080 Manager(ReplSetImpl *rs);
00081 virtual ~Manager();
00082 void msgReceivedNewConfig(BSONObj);
00083 void msgCheckNewState();
00084 };
00085
00086 struct Target;
00087
00088 class Consensus {
00089 ReplSetImpl &rs;
00090 struct LastYea {
00091 LastYea() : when(0), who(0xffffffff) { }
00092 time_t when;
00093 unsigned who;
00094 };
00095 Atomic<LastYea> ly;
00096 unsigned yea(unsigned memberId);
00097 void electionFailed(unsigned meid);
00098 void _electSelf();
00099 bool weAreFreshest(bool& allUp, int& nTies);
00100 bool sleptLast;
00101 public:
00102 Consensus(ReplSetImpl *t) : rs(*t) {
00103 sleptLast = false;
00104 steppedDown = 0;
00105 }
00106
00107
00108
00109
00110 time_t steppedDown;
00111
00112 int totalVotes() const;
00113 bool aMajoritySeemsToBeUp() const;
00114 bool shouldRelinquish() const;
00115 void electSelf();
00116 void electCmdReceived(BSONObj, BSONObjBuilder*);
00117 void multiCommand(BSONObj cmd, list<Target>& L);
00118 };
00119
00121 class RSBase : boost::noncopyable {
00122 public:
00123 const unsigned magic;
00124 void assertValid() { assert( magic == 0x12345677 ); }
00125 private:
00126 mongo::mutex m;
00127 int _locked;
00128 ThreadLocalValue<bool> _lockedByMe;
00129 protected:
00130 RSBase() : magic(0x12345677), m("RSBase"), _locked(0) { }
00131 ~RSBase() {
00132
00133 log() << "replSet ~RSBase called" << rsLog;
00134 }
00135
00136 class lock {
00137 RSBase& rsbase;
00138 auto_ptr<scoped_lock> sl;
00139 public:
00140 lock(RSBase* b) : rsbase(*b) {
00141 if( rsbase._lockedByMe.get() )
00142 return;
00143
00144 sl.reset( new scoped_lock(rsbase.m) );
00145 DEV assert(rsbase._locked == 0);
00146 rsbase._locked++;
00147 rsbase._lockedByMe.set(true);
00148 }
00149 ~lock() {
00150 if( sl.get() ) {
00151 assert( rsbase._lockedByMe.get() );
00152 DEV assert(rsbase._locked == 1);
00153 rsbase._lockedByMe.set(false);
00154 rsbase._locked--;
00155 }
00156 }
00157 };
00158
00159 public:
00160
00161 bool locked() const { return _locked != 0; }
00162
00163
00164
00165
00166
00167 bool lockedByMe() { return _lockedByMe.get(); }
00168 };
00169
00170 class ReplSetHealthPollTask;
00171
00172
00173 class StateBox : boost::noncopyable {
00174 public:
00175 struct SP {
00176 SP() : state(MemberState::RS_STARTUP), primary(0) { }
00177 MemberState state;
00178 const Member *primary;
00179 };
00180 const SP get() {
00181 scoped_lock lk(m);
00182 return sp;
00183 }
00184 MemberState getState() const { return sp.state; }
00185 const Member* getPrimary() const { return sp.primary; }
00186 void change(MemberState s, const Member *self) {
00187 scoped_lock lk(m);
00188 if( sp.state != s ) {
00189 log() << "replSet " << s.toString() << rsLog;
00190 }
00191 sp.state = s;
00192 if( s.primary() ) {
00193 sp.primary = self;
00194 }
00195 else {
00196 if( self == sp.primary )
00197 sp.primary = 0;
00198 }
00199 }
00200 void set(MemberState s, const Member *p) {
00201 scoped_lock lk(m);
00202 sp.state = s; sp.primary = p;
00203 }
00204 void setSelfPrimary(const Member *self) { change(MemberState::RS_PRIMARY, self); }
00205 void setOtherPrimary(const Member *mem) {
00206 scoped_lock lk(m);
00207 assert( !sp.state.primary() );
00208 sp.primary = mem;
00209 }
00210 void noteRemoteIsPrimary(const Member *remote) {
00211 scoped_lock lk(m);
00212 if( !sp.state.secondary() && !sp.state.fatal() )
00213 sp.state = MemberState::RS_RECOVERING;
00214 sp.primary = remote;
00215 }
00216 StateBox() : m("StateBox") { }
00217 private:
00218 mongo::mutex m;
00219 SP sp;
00220 };
00221
00222 void parseReplsetCmdLine(string cfgString, string& setname, vector<HostAndPort>& seeds, set<HostAndPort>& seedSet );
00223
00227 class ReplSetCmdline {
00228 public:
00229 ReplSetCmdline(string cfgString) { parseReplsetCmdLine(cfgString, setname, seeds, seedSet); }
00230 string setname;
00231 vector<HostAndPort> seeds;
00232 set<HostAndPort> seedSet;
00233 };
00234
00235
00236
00237
00238
00239 class ReplSetImpl : protected RSBase {
00240 public:
00242 enum StartupStatus {
00243 PRESTART=0, LOADINGCONFIG=1, BADCONFIG=2, EMPTYCONFIG=3,
00244 EMPTYUNREACHABLE=4, STARTED=5, SOON=6
00245 };
00246 static StartupStatus startupStatus;
00247 static string startupStatusMsg;
00248 static string stateAsHtml(MemberState state);
00249
00250
00251 void msgUpdateHBInfo(HeartbeatInfo);
00252
00253 StateBox box;
00254
00255 OpTime lastOpTimeWritten;
00256 long long lastH;
00257 private:
00258 set<ReplSetHealthPollTask*> healthTasks;
00259 void endOldHealthTasks();
00260 void startHealthTaskFor(Member *m);
00261
00262 Consensus elect;
00263 void relinquish();
00264 void forgetPrimary();
00265 protected:
00266 bool _stepDown(int secs);
00267 bool _freeze(int secs);
00268 private:
00269 void assumePrimary();
00270 void loadLastOpTimeWritten();
00271 void changeState(MemberState s);
00272 const Member* getMemberToSyncTo();
00273 void _changeArbiterState();
00274 protected:
00275
00276
00277 char _hbmsg[256];
00278 time_t _hbmsgTime;
00279 public:
00280 void sethbmsg(string s, int logLevel = 0);
00281 protected:
00282 bool initFromConfig(ReplSetConfig& c, bool reconf=false);
00283 void _fillIsMaster(BSONObjBuilder&);
00284 void _fillIsMasterHost(const Member*, vector<string>&, vector<string>&, vector<string>&);
00285 const ReplSetConfig& config() { return *_cfg; }
00286 string name() const { return _name; }
00287 MemberState state() const { return box.getState(); }
00288 void _fatal();
00289 void _getOplogDiagsAsHtml(unsigned server_id, stringstream& ss) const;
00290 void _summarizeAsHtml(stringstream&) const;
00291 void _summarizeStatus(BSONObjBuilder&) const;
00292
00293
00294 ReplSetImpl(ReplSetCmdline&);
00295
00296
00297 void _go();
00298
00299 private:
00300 string _name;
00301 const vector<HostAndPort> *_seeds;
00302 ReplSetConfig *_cfg;
00303
00307 bool _loadConfigFinish(vector<ReplSetConfig>& v);
00308 void loadConfig();
00309
00310 list<HostAndPort> memberHostnames() const;
00311 const ReplSetConfig::MemberCfg& myConfig() const { return _self->config(); }
00312 bool iAmArbiterOnly() const { return myConfig().arbiterOnly; }
00313 bool iAmPotentiallyHot() const { return myConfig().potentiallyHot(); }
00314 protected:
00315 Member *_self;
00316 bool _buildIndexes;
00317 void setSelfTo(Member *);
00318 private:
00319 List1<Member> _members;
00320
00321 public:
00322 unsigned selfId() const { return _self->id(); }
00323 Manager *mgr;
00324
00325 private:
00326 Member* head() const { return _members.head(); }
00327 public:
00328 const Member* findById(unsigned id) const;
00329 private:
00330 void _getTargets(list<Target>&, int &configVersion);
00331 void getTargets(list<Target>&, int &configVersion);
00332 void startThreads();
00333 friend class FeedbackThread;
00334 friend class CmdReplSetElect;
00335 friend class Member;
00336 friend class Manager;
00337 friend class Consensus;
00338
00339 private:
00340
00341 bool initialSyncOplogApplication(const Member *primary, OpTime applyGTE, OpTime minValid);
00342 void _syncDoInitialSync();
00343 void syncDoInitialSync();
00344 void _syncThread();
00345 bool tryToGoLiveAsASecondary(OpTime&);
00346 void syncTail();
00347 void syncApply(const BSONObj &o);
00348 unsigned _syncRollback(OplogReader& r);
00349 void syncRollback(OplogReader& r);
00350 void syncFixUp(HowToFixUp& h, OplogReader& r);
00351 bool _getOplogReader(OplogReader& r, string& hn);
00352 bool _isStale(OplogReader& r, const string& hn);
00353 public:
00354 void syncThread();
00355 };
00356
00357 class ReplSet : public ReplSetImpl {
00358 public:
00359 ReplSet(ReplSetCmdline& replSetCmdline) : ReplSetImpl(replSetCmdline) { }
00360
00361
00362 bool stepDown(int secs) { return _stepDown(secs); }
00363
00364
00365 bool freeze(int secs) { return _freeze(secs); }
00366
00367 string selfFullName() {
00368 lock lk(this);
00369 return _self->fullName();
00370 }
00371
00372 bool buildIndexes() const { return _buildIndexes; }
00373
00374
00375 void go() { _go(); }
00376
00377 void fatal() { _fatal(); }
00378 bool isPrimary() { return box.getState().primary(); }
00379 bool isSecondary() { return box.getState().secondary(); }
00380 MemberState state() const { return ReplSetImpl::state(); }
00381 string name() const { return ReplSetImpl::name(); }
00382 const ReplSetConfig& config() { return ReplSetImpl::config(); }
00383 void getOplogDiagsAsHtml(unsigned server_id, stringstream& ss) const { _getOplogDiagsAsHtml(server_id,ss); }
00384 void summarizeAsHtml(stringstream& ss) const { _summarizeAsHtml(ss); }
00385 void summarizeStatus(BSONObjBuilder& b) const { _summarizeStatus(b); }
00386 void fillIsMaster(BSONObjBuilder& b) { _fillIsMaster(b); }
00387
00388
00389
00390
00391 void haveNewConfig(ReplSetConfig& c, bool comment);
00392
00393
00394 const ReplSetConfig& getConfig() { return config(); }
00395
00396 bool lockedByMe() { return RSBase::lockedByMe(); }
00397
00398
00399 string hbmsg() const {
00400 if( time(0)-_hbmsgTime > 120 ) return "";
00401 return _hbmsg;
00402 }
00403 };
00404
00408 class ReplSetCommand : public Command {
00409 protected:
00410 ReplSetCommand(const char * s, bool show=false) : Command(s, show) { }
00411 virtual bool slaveOk() const { return true; }
00412 virtual bool adminOnly() const { return true; }
00413 virtual bool logTheOp() { return false; }
00414 virtual LockType locktype() const { return NONE; }
00415 virtual void help( stringstream &help ) const { help << "internal"; }
00416 bool check(string& errmsg, BSONObjBuilder& result) {
00417 if( !replSet ) {
00418 errmsg = "not running with --replSet";
00419 return false;
00420 }
00421 if( theReplSet == 0 ) {
00422 result.append("startupStatus", ReplSet::startupStatus);
00423 errmsg = ReplSet::startupStatusMsg.empty() ? "replset unknown error 2" : ReplSet::startupStatusMsg;
00424 if( ReplSet::startupStatus == 3 )
00425 result.append("info", "run rs.initiate(...) if not yet done for the set");
00426 return false;
00427 }
00428 return true;
00429 }
00430 };
00431
00434 inline Member::Member(HostAndPort h, unsigned ord, const ReplSetConfig::MemberCfg *c, bool self) :
00435 _config(*c), _h(h), _hbinfo(ord) {
00436 if( self )
00437 _hbinfo.health = 1.0;
00438 }
00439
00440 }