00001
00003 #pragma once
00004
00005 #include "../client/dbclient.h"
00006 #include "../client/constants.h"
00007 #include "dbhelpers.h"
00008
00009 namespace mongo {
00010
00011
00012
00013
00014 class OplogReader {
00015 auto_ptr<DBClientConnection> _conn;
00016 auto_ptr<DBClientCursor> cursor;
00017 public:
00018
00019 OplogReader() {
00020 }
00021 ~OplogReader() {
00022 }
00023
00024 void resetCursor() {
00025 cursor.reset();
00026 }
00027 void resetConnection() {
00028 cursor.reset();
00029 _conn.reset();
00030 }
00031 DBClientConnection* conn() { return _conn.get(); }
00032 BSONObj findOne(const char *ns, const Query& q) {
00033 return conn()->findOne(ns, q, 0, QueryOption_SlaveOk);
00034 }
00035
00036 BSONObj getLastOp(const char *ns) {
00037 return findOne(ns, Query().sort(reverseNaturalObj));
00038 }
00039
00040
00041 bool connect(string hostname);
00042
00043 void tailCheck() {
00044 if( cursor.get() && cursor->isDead() ) {
00045 log() << "repl: old cursor isDead, will initiate a new one" << endl;
00046 resetCursor();
00047 }
00048 }
00049
00050 bool haveCursor() { return cursor.get() != 0; }
00051
00052 void query(const char *ns, const BSONObj& query) {
00053 assert( !haveCursor() );
00054 cursor = _conn->query(ns, query, 0, 0, 0, QueryOption_SlaveOk);
00055 }
00056
00057 void tailingQuery(const char *ns, const BSONObj& query) {
00058 assert( !haveCursor() );
00059 log(2) << "repl: " << ns << ".find(" << query.toString() << ')' << endl;
00060 cursor = _conn->query( ns, query, 0, 0, 0,
00061 QueryOption_CursorTailable | QueryOption_SlaveOk | QueryOption_OplogReplay |
00062
00063 QueryOption_AwaitData
00064 );
00065 }
00066
00067 void tailingQueryGTE(const char *ns, OpTime t) {
00068 BSONObjBuilder q;
00069 q.appendDate("$gte", t.asDate());
00070 BSONObjBuilder query;
00071 query.append("ts", q.done());
00072 tailingQuery(ns, query.done());
00073 }
00074
00075 bool more() {
00076 assert( cursor.get() );
00077 return cursor->more();
00078 }
00079 bool moreInCurrentBatch() {
00080 assert( cursor.get() );
00081 return cursor->moreInCurrentBatch();
00082 }
00083
00084
00085 bool awaitCapable() {
00086 return cursor->hasResultFlag(ResultFlag_AwaitCapable);
00087 }
00088
00089 void peek(vector<BSONObj>& v, int n) {
00090 if( cursor.get() )
00091 cursor->peek(v,n);
00092 }
00093
00094 BSONObj nextSafe() { return cursor->nextSafe(); }
00095
00096 BSONObj next() {
00097 return cursor->next();
00098 }
00099
00100 void putBack(BSONObj op) {
00101 cursor->putBack(op);
00102 }
00103 };
00104
00105 }