00001
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #pragma once
00019
00020 #include "../pch.h"
00021 #include "../util/message.h"
00022 #include "../db/jsobj.h"
00023 #include "../db/json.h"
00024 #include <stack>
00025
00026 namespace mongo {
00027
00029 enum QueryOptions {
00038 QueryOption_CursorTailable = 1 << 1,
00039
00042 QueryOption_SlaveOk = 1 << 2,
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052 QueryOption_OplogReplay = 1 << 3,
00053
00057 QueryOption_NoCursorTimeout = 1 << 4,
00058
00062 QueryOption_AwaitData = 1 << 5,
00063
00071 QueryOption_Exhaust = 1 << 6,
00072
00077 QueryOption_PartialResults = 1 << 7 ,
00078
00079 QueryOption_AllSupported = QueryOption_CursorTailable | QueryOption_SlaveOk | QueryOption_OplogReplay | QueryOption_NoCursorTimeout | QueryOption_AwaitData | QueryOption_Exhaust | QueryOption_PartialResults
00080
00081 };
00082
00083 enum UpdateOptions {
00085 UpdateOption_Upsert = 1 << 0,
00086
00089 UpdateOption_Multi = 1 << 1,
00090
00092 UpdateOption_Broadcast = 1 << 2
00093 };
00094
00095 enum RemoveOptions {
00097 RemoveOption_JustOne = 1 << 0,
00098
00100 RemoveOption_Broadcast = 1 << 1
00101 };
00102
00103 class DBClientBase;
00104
00119 class ConnectionString {
00120 public:
00121 enum ConnectionType { INVALID , MASTER , PAIR , SET , SYNC };
00122
00123 ConnectionString() {
00124 _type = INVALID;
00125 }
00126
00127 ConnectionString( const HostAndPort& server ) {
00128 _type = MASTER;
00129 _servers.push_back( server );
00130 _finishInit();
00131 }
00132
00133 ConnectionString( ConnectionType type , const string& s , const string& setName = "" ) {
00134 _type = type;
00135 _setName = setName;
00136 _fillServers( s );
00137
00138 switch ( _type ) {
00139 case MASTER:
00140 assert( _servers.size() == 1 );
00141 break;
00142 case SET:
00143 assert( _setName.size() );
00144 assert( _servers.size() >= 1 );
00145 break;
00146 case PAIR:
00147 assert( _servers.size() == 2 );
00148 break;
00149 default:
00150 assert( _servers.size() > 0 );
00151 }
00152
00153 _finishInit();
00154 }
00155
00156 ConnectionString( const string& s , ConnectionType favoredMultipleType ) {
00157 _type = INVALID;
00158
00159 _fillServers( s );
00160 if ( _type != INVALID ) {
00161
00162 }
00163 else if ( _servers.size() == 1 ) {
00164 _type = MASTER;
00165 }
00166 else {
00167 _type = favoredMultipleType;
00168 assert( _type == SET || _type == SYNC );
00169 }
00170 _finishInit();
00171 }
00172
00173 bool isValid() const { return _type != INVALID; }
00174
00175 string toString() const { return _string; }
00176
00177 DBClientBase* connect( string& errmsg ) const;
00178
00179 string getSetName() const { return _setName; }
00180
00181 vector<HostAndPort> getServers() const { return _servers; }
00182
00183 ConnectionType type() const { return _type; }
00184
00185 static ConnectionString parse( const string& url , string& errmsg );
00186
00187 static string typeToString( ConnectionType type );
00188
00189 private:
00190
00191 void _fillServers( string s );
00192 void _finishInit();
00193
00194 ConnectionType _type;
00195 vector<HostAndPort> _servers;
00196 string _string;
00197 string _setName;
00198 };
00199
00204 enum WriteConcern {
00205 W_NONE = 0 ,
00206 W_NORMAL = 1
00207
00208 };
00209
00210 class BSONObj;
00211 class ScopedDbConnection;
00212 class DBClientCursor;
00213 class DBClientCursorBatchIterator;
00214
00220 class Query {
00221 public:
00222 BSONObj obj;
00223 Query() : obj(BSONObj()) { }
00224 Query(const BSONObj& b) : obj(b) { }
00225 Query(const string &json) :
00226 obj(fromjson(json)) { }
00227 Query(const char * json) :
00228 obj(fromjson(json)) { }
00229
00238 Query& sort(const BSONObj& sortPattern);
00239
00245 Query& sort(const string &field, int asc = 1) { sort( BSON( field << asc ) ); return *this; }
00246
00252 Query& hint(BSONObj keyPattern);
00253 Query& hint(const string &jsonKeyPatt) { return hint(fromjson(jsonKeyPatt)); }
00254
00258 Query& minKey(const BSONObj &val);
00262 Query& maxKey(const BSONObj &val);
00263
00267 Query& explain();
00268
00277 Query& snapshot();
00278
00295 Query& where(const string &jscode, BSONObj scope);
00296 Query& where(const string &jscode) { return where(jscode, BSONObj()); }
00297
00301 bool isComplex( bool * hasDollar = 0 ) const;
00302
00303 BSONObj getFilter() const;
00304 BSONObj getSort() const;
00305 BSONObj getHint() const;
00306 bool isExplain() const;
00307
00308 string toString() const;
00309 operator string() const { return toString(); }
00310 private:
00311 void makeComplex();
00312 template< class T >
00313 void appendComplex( const char *fieldName, const T& val ) {
00314 makeComplex();
00315 BSONObjBuilder b;
00316 b.appendElements(obj);
00317 b.append(fieldName, val);
00318 obj = b.obj();
00319 }
00320 };
00321
00325 #define QUERY(x) mongo::Query( BSON(x) )
00326
00330 class DBConnector {
00331 public:
00332 virtual ~DBConnector() {}
00334 virtual bool call( Message &toSend, Message &response, bool assertOk=true , string * actualServer = 0 ) = 0;
00335 virtual void say( Message &toSend ) = 0;
00336 virtual void sayPiggyBack( Message &toSend ) = 0;
00337 virtual void checkResponse( const char* data, int nReturned ) {}
00338
00339
00340 virtual void recv( Message& m ) { assert(false); }
00341 };
00342
00346 class DBClientInterface : boost::noncopyable {
00347 public:
00348 virtual auto_ptr<DBClientCursor> query(const string &ns, Query query, int nToReturn = 0, int nToSkip = 0,
00349 const BSONObj *fieldsToReturn = 0, int queryOptions = 0 , int batchSize = 0 ) = 0;
00350
00352 virtual auto_ptr<DBClientCursor> getMore( const string &ns, long long cursorId, int nToReturn = 0, int options = 0 ) = 0;
00353
00354 virtual void insert( const string &ns, BSONObj obj ) = 0;
00355
00356 virtual void insert( const string &ns, const vector< BSONObj >& v ) = 0;
00357
00358 virtual void remove( const string &ns , Query query, bool justOne = 0 ) = 0;
00359
00360 virtual void update( const string &ns , Query query , BSONObj obj , bool upsert = 0 , bool multi = 0 ) = 0;
00361
00362 virtual ~DBClientInterface() { }
00363
00368 virtual BSONObj findOne(const string &ns, const Query& query, const BSONObj *fieldsToReturn = 0, int queryOptions = 0);
00369
00370 virtual string getServerAddress() const = 0;
00371
00372 };
00373
00378 class DBClientWithCommands : public DBClientInterface {
00379 set<string> _seenIndexes;
00380 public:
00382 int _logLevel;
00383
00384 DBClientWithCommands() : _logLevel(0), _cachedAvailableOptions( (enum QueryOptions)0 ), _haveCachedAvailableOptions(false) { }
00385
00392 bool simpleCommand(const string &dbname, BSONObj *info, const string &command);
00393
00405 virtual bool runCommand(const string &dbname, const BSONObj& cmd, BSONObj &info, int options=0);
00406
00415 virtual bool auth(const string &dbname, const string &username, const string &pwd, string& errmsg, bool digestPassword = true);
00416
00420 virtual unsigned long long count(const string &ns, const BSONObj& query = BSONObj(), int options=0, int limit=0, int skip=0 );
00421
00422 string createPasswordDigest( const string &username , const string &clearTextPassword );
00423
00432 virtual bool isMaster(bool& isMaster, BSONObj *info=0);
00433
00450 bool createCollection(const string &ns, long long size = 0, bool capped = false, int max = 0, BSONObj *info = 0);
00451
00455 string getLastError();
00459 virtual BSONObj getLastErrorDetailed();
00460
00461 static string getLastErrorString( const BSONObj& res );
00462
00469 BSONObj getPrevError();
00470
00475 bool resetError() { return simpleCommand("admin", 0, "reseterror"); }
00476
00478 virtual bool dropCollection( const string &ns ) {
00479 string db = nsGetDB( ns );
00480 string coll = nsGetCollection( ns );
00481 uassert( 10011 , "no collection name", coll.size() );
00482
00483 BSONObj info;
00484
00485 bool res = runCommand( db.c_str() , BSON( "drop" << coll ) , info );
00486 resetIndexCache();
00487 return res;
00488 }
00489
00493 bool repairDatabase(const string &dbname, BSONObj *info = 0) {
00494 return simpleCommand(dbname, info, "repairDatabase");
00495 }
00496
00516 bool copyDatabase(const string &fromdb, const string &todb, const string &fromhost = "", BSONObj *info = 0);
00517
00522 enum ProfilingLevel {
00523 ProfileOff = 0,
00524 ProfileSlow = 1,
00525 ProfileAll = 2
00526
00527 };
00528 bool setDbProfilingLevel(const string &dbname, ProfilingLevel level, BSONObj *info = 0);
00529 bool getDbProfilingLevel(const string &dbname, ProfilingLevel& level, BSONObj *info = 0);
00530
00554 BSONObj mapreduce(const string &ns, const string &jsmapf, const string &jsreducef, BSONObj query = BSONObj(), const string& output = "");
00555
00571 bool eval(const string &dbname, const string &jscode, BSONObj& info, BSONElement& retValue, BSONObj *args = 0);
00572
00576 bool validate( const string &ns , bool scandata=true ) {
00577 BSONObj cmd = BSON( "validate" << nsGetCollection( ns ) << "scandata" << scandata );
00578 BSONObj info;
00579 return runCommand( nsGetDB( ns ).c_str() , cmd , info );
00580 }
00581
00582
00583
00584
00585 bool eval(const string &dbname, const string &jscode);
00586 template< class T >
00587 bool eval(const string &dbname, const string &jscode, T parm1) {
00588 BSONObj info;
00589 BSONElement retValue;
00590 BSONObjBuilder b;
00591 b.append("0", parm1);
00592 BSONObj args = b.done();
00593 return eval(dbname, jscode, info, retValue, &args);
00594 }
00595
00597 template< class T, class NumType >
00598 bool eval(const string &dbname, const string &jscode, T parm1, NumType& ret) {
00599 BSONObj info;
00600 BSONElement retValue;
00601 BSONObjBuilder b;
00602 b.append("0", parm1);
00603 BSONObj args = b.done();
00604 if ( !eval(dbname, jscode, info, retValue, &args) )
00605 return false;
00606 ret = (NumType) retValue.number();
00607 return true;
00608 }
00609
00615 list<string> getDatabaseNames();
00616
00620 list<string> getCollectionNames( const string& db );
00621
00622 bool exists( const string& ns );
00623
00635 virtual bool ensureIndex( const string &ns , BSONObj keys , bool unique = false, const string &name = "",
00636 bool cache = true );
00637
00641 virtual void resetIndexCache();
00642
00643 virtual auto_ptr<DBClientCursor> getIndexes( const string &ns );
00644
00645 virtual void dropIndex( const string& ns , BSONObj keys );
00646 virtual void dropIndex( const string& ns , const string& indexName );
00647
00651 virtual void dropIndexes( const string& ns );
00652
00653 virtual void reIndex( const string& ns );
00654
00655 string genIndexName( const BSONObj& keys );
00656
00658 virtual bool dropDatabase(const string &dbname, BSONObj *info = 0) {
00659 bool ret = simpleCommand(dbname, info, "dropDatabase");
00660 resetIndexCache();
00661 return ret;
00662 }
00663
00664 virtual string toString() = 0;
00665
00667 string nsGetDB( const string &ns ) {
00668 string::size_type pos = ns.find( "." );
00669 if ( pos == string::npos )
00670 return ns;
00671
00672 return ns.substr( 0 , pos );
00673 }
00674
00676 string nsGetCollection( const string &ns ) {
00677 string::size_type pos = ns.find( "." );
00678 if ( pos == string::npos )
00679 return "";
00680
00681 return ns.substr( pos + 1 );
00682 }
00683
00684 protected:
00685 bool isOk(const BSONObj&);
00686
00687 BSONObj _countCmd(const string &ns, const BSONObj& query, int options, int limit, int skip );
00688
00689 enum QueryOptions availableOptions();
00690
00691 private:
00692 enum QueryOptions _cachedAvailableOptions;
00693 bool _haveCachedAvailableOptions;
00694 };
00695
00699 class DBClientBase : public DBClientWithCommands, public DBConnector {
00700 protected:
00701 WriteConcern _writeConcern;
00702
00703 public:
00704 DBClientBase() {
00705 _writeConcern = W_NORMAL;
00706 }
00707
00708 WriteConcern getWriteConcern() const { return _writeConcern; }
00709 void setWriteConcern( WriteConcern w ) { _writeConcern = w; }
00710
00725 virtual auto_ptr<DBClientCursor> query(const string &ns, Query query, int nToReturn = 0, int nToSkip = 0,
00726 const BSONObj *fieldsToReturn = 0, int queryOptions = 0 , int batchSize = 0 );
00727
00733 virtual auto_ptr<DBClientCursor> getMore( const string &ns, long long cursorId, int nToReturn = 0, int options = 0 );
00734
00738 virtual void insert( const string &ns , BSONObj obj );
00739
00743 virtual void insert( const string &ns, const vector< BSONObj >& v );
00744
00749 virtual void remove( const string &ns , Query q , bool justOne = 0 );
00750
00754 virtual void update( const string &ns , Query query , BSONObj obj , bool upsert = false , bool multi = false );
00755
00756 virtual bool isFailed() const = 0;
00757
00758 virtual void killCursor( long long cursorID ) = 0;
00759
00760 virtual bool callRead( Message& toSend , Message& response ) = 0;
00761
00762 virtual void say( Message& toSend ) = 0;
00763
00764 virtual ConnectionString::ConnectionType type() const = 0;
00765
00766 };
00767
00768 class DBClientReplicaSet;
00769
00770 class ConnectException : public UserException {
00771 public:
00772 ConnectException(string msg) : UserException(9000,msg) { }
00773 };
00774
00779 class DBClientConnection : public DBClientBase {
00780 public:
00787 DBClientConnection(bool _autoReconnect=false, DBClientReplicaSet* cp=0, double so_timeout=0) :
00788 clientSet(cp), failed(false), autoReconnect(_autoReconnect), lastReconnectTry(0), _so_timeout(so_timeout) {
00789 _numConnections++;
00790 }
00791
00792 virtual ~DBClientConnection() {
00793 _numConnections--;
00794 }
00795
00807 virtual bool connect(const char * hostname, string& errmsg) {
00808
00809 HostAndPort t( hostname );
00810 return connect( t , errmsg );
00811 }
00812
00822 virtual bool connect(const HostAndPort& server, string& errmsg);
00823
00832 void connect(const string& serverHostname) {
00833 string errmsg;
00834 if( !connect(HostAndPort(serverHostname), errmsg) )
00835 throw ConnectException(string("can't connect ") + errmsg);
00836 }
00837
00838 virtual bool auth(const string &dbname, const string &username, const string &pwd, string& errmsg, bool digestPassword = true);
00839
00840 virtual auto_ptr<DBClientCursor> query(const string &ns, Query query=Query(), int nToReturn = 0, int nToSkip = 0,
00841 const BSONObj *fieldsToReturn = 0, int queryOptions = 0 , int batchSize = 0 ) {
00842 checkConnection();
00843 return DBClientBase::query( ns, query, nToReturn, nToSkip, fieldsToReturn, queryOptions , batchSize );
00844 }
00845
00852 unsigned long long query( boost::function<void(const BSONObj&)> f, const string& ns, Query query, const BSONObj *fieldsToReturn = 0, int queryOptions = 0);
00853 unsigned long long query( boost::function<void(DBClientCursorBatchIterator&)> f, const string& ns, Query query, const BSONObj *fieldsToReturn = 0, int queryOptions = 0);
00854
00859 bool isFailed() const { return failed; }
00860
00861 MessagingPort& port() { return *p; }
00862
00863 string toStringLong() const {
00864 stringstream ss;
00865 ss << _serverString;
00866 if ( failed ) ss << " failed";
00867 return ss.str();
00868 }
00869
00871 string toString() { return _serverString; }
00872
00873 string getServerAddress() const { return _serverString; }
00874
00875 virtual void killCursor( long long cursorID );
00876 virtual bool callRead( Message& toSend , Message& response ) { return call( toSend , response ); }
00877 virtual void say( Message &toSend );
00878 virtual bool call( Message &toSend, Message &response, bool assertOk = true , string * actualServer = 0 );
00879 virtual ConnectionString::ConnectionType type() const { return ConnectionString::MASTER; }
00880 virtual void checkResponse( const char *data, int nReturned );
00881 void setSoTimeout(double to) { _so_timeout = to; }
00882
00883 static int getNumConnections() {
00884 return _numConnections;
00885 }
00886
00887 static void setLazyKillCursor( bool lazy ) { _lazyKillCursor = lazy; }
00888 static bool getLazyKillCursor() { return _lazyKillCursor; }
00889
00890 protected:
00891 friend class SyncClusterConnection;
00892 virtual void recv( Message& m );
00893 virtual void sayPiggyBack( Message &toSend );
00894
00895 DBClientReplicaSet *clientSet;
00896 boost::scoped_ptr<MessagingPort> p;
00897 boost::scoped_ptr<SockAddr> server;
00898 bool failed;
00899 const bool autoReconnect;
00900 time_t lastReconnectTry;
00901 HostAndPort _server;
00902 string _serverString;
00903 void _checkConnection();
00904
00905
00906 void checkConnection() { if( failed ) _checkConnection(); }
00907
00908 map< string, pair<string,string> > authCache;
00909 double _so_timeout;
00910 bool _connect( string& errmsg );
00911
00912 static AtomicUInt _numConnections;
00913 static bool _lazyKillCursor;
00914 };
00915
00918 bool serverAlive( const string &uri );
00919
00920 DBClientBase * createDirectClient();
00921
00922 }
00923
00924 #include "dbclientcursor.h"
00925 #include "dbclient_rs.h"
00926 #include "undef_macros.h"