00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00022 #include "../pch.h"
00023 #include "dbclient.h"
00024 #include "redef_macros.h"
00025 #include "../db/dbmessage.h"
00026 #include "../db/matcher.h"
00027 #include "../util/concurrency/mvar.h"
00028
00029 namespace mongo {
00030
00034 class ServerAndQuery {
00035 public:
00036 ServerAndQuery( const string& server , BSONObj extra = BSONObj() , BSONObj orderObject = BSONObj() ) :
00037 _server( server ) , _extra( extra.getOwned() ) , _orderObject( orderObject.getOwned() ) {
00038 }
00039
00040 bool operator<( const ServerAndQuery& other ) const {
00041 if ( ! _orderObject.isEmpty() )
00042 return _orderObject.woCompare( other._orderObject ) < 0;
00043
00044 if ( _server < other._server )
00045 return true;
00046 if ( other._server > _server )
00047 return false;
00048 return _extra.woCompare( other._extra ) < 0;
00049 }
00050
00051 string toString() const {
00052 StringBuilder ss;
00053 ss << "server:" << _server << " _extra:" << _extra.toString() << " _orderObject:" << _orderObject.toString();
00054 return ss.str();
00055 }
00056
00057 operator string() const {
00058 return toString();
00059 }
00060
00061 string _server;
00062 BSONObj _extra;
00063 BSONObj _orderObject;
00064 };
00065
00070 class ClusteredCursor {
00071 public:
00072 ClusteredCursor( QueryMessage& q );
00073 ClusteredCursor( const string& ns , const BSONObj& q , int options=0 , const BSONObj& fields=BSONObj() );
00074 virtual ~ClusteredCursor();
00075
00077 void init();
00078
00079 virtual bool more() = 0;
00080 virtual BSONObj next() = 0;
00081
00082 static BSONObj concatQuery( const BSONObj& query , const BSONObj& extraFilter );
00083
00084 virtual string type() const = 0;
00085
00086 virtual BSONObj explain();
00087
00088 protected:
00089
00090 virtual void _init() = 0;
00091
00092 auto_ptr<DBClientCursor> query( const string& server , int num = 0 , BSONObj extraFilter = BSONObj() , int skipLeft = 0 );
00093 BSONObj explain( const string& server , BSONObj extraFilter = BSONObj() );
00094
00095 static BSONObj _concatFilter( const BSONObj& filter , const BSONObj& extraFilter );
00096
00097 virtual void _explain( map< string,list<BSONObj> >& out ) = 0;
00098
00099 string _ns;
00100 BSONObj _query;
00101 int _options;
00102 BSONObj _fields;
00103 int _batchSize;
00104
00105 bool _didInit;
00106
00107 bool _done;
00108 };
00109
00110
00111 class FilteringClientCursor {
00112 public:
00113 FilteringClientCursor( const BSONObj filter = BSONObj() );
00114 FilteringClientCursor( auto_ptr<DBClientCursor> cursor , const BSONObj filter = BSONObj() );
00115 ~FilteringClientCursor();
00116
00117 void reset( auto_ptr<DBClientCursor> cursor );
00118
00119 bool more();
00120 BSONObj next();
00121
00122 BSONObj peek();
00123 private:
00124 void _advance();
00125
00126 Matcher _matcher;
00127 auto_ptr<DBClientCursor> _cursor;
00128
00129 BSONObj _next;
00130 bool _done;
00131 };
00132
00133
00134 class Servers {
00135 public:
00136 Servers() {
00137 }
00138
00139 void add( const ServerAndQuery& s ) {
00140 add( s._server , s._extra );
00141 }
00142
00143 void add( const string& server , const BSONObj& filter ) {
00144 vector<BSONObj>& mine = _filters[server];
00145 mine.push_back( filter.getOwned() );
00146 }
00147
00148
00149 class View {
00150 View( const Servers* s ) {
00151 for ( map<string, vector<BSONObj> >::const_iterator i=s->_filters.begin(); i!=s->_filters.end(); ++i ) {
00152 _servers.push_back( i->first );
00153 _filters.push_back( i->second );
00154 }
00155 }
00156 public:
00157 int size() const {
00158 return _servers.size();
00159 }
00160
00161 string getServer( int n ) const {
00162 return _servers[n];
00163 }
00164
00165 vector<BSONObj> getFilter( int n ) const {
00166 return _filters[ n ];
00167 }
00168
00169 private:
00170 vector<string> _servers;
00171 vector< vector<BSONObj> > _filters;
00172
00173 friend class Servers;
00174 };
00175
00176 View view() const {
00177 return View( this );
00178 }
00179
00180
00181 private:
00182 map<string, vector<BSONObj> > _filters;
00183
00184 friend class View;
00185 };
00186
00187
00192 class SerialServerClusteredCursor : public ClusteredCursor {
00193 public:
00194 SerialServerClusteredCursor( const set<ServerAndQuery>& servers , QueryMessage& q , int sortOrder=0);
00195 virtual bool more();
00196 virtual BSONObj next();
00197 virtual string type() const { return "SerialServer"; }
00198
00199 protected:
00200 virtual void _explain( map< string,list<BSONObj> >& out );
00201
00202 void _init() {}
00203
00204 vector<ServerAndQuery> _servers;
00205 unsigned _serverIndex;
00206
00207 FilteringClientCursor _current;
00208
00209 int _needToSkip;
00210 };
00211
00212
00217 class ParallelSortClusteredCursor : public ClusteredCursor {
00218 public:
00219 ParallelSortClusteredCursor( const set<ServerAndQuery>& servers , QueryMessage& q , const BSONObj& sortKey );
00220 ParallelSortClusteredCursor( const set<ServerAndQuery>& servers , const string& ns ,
00221 const Query& q , int options=0, const BSONObj& fields=BSONObj() );
00222 virtual ~ParallelSortClusteredCursor();
00223 virtual bool more();
00224 virtual BSONObj next();
00225 virtual string type() const { return "ParallelSort"; }
00226 protected:
00227 void _finishCons();
00228 void _init();
00229
00230 virtual void _explain( map< string,list<BSONObj> >& out );
00231
00232 int _numServers;
00233 set<ServerAndQuery> _servers;
00234 BSONObj _sortKey;
00235
00236 FilteringClientCursor * _cursors;
00237 int _needToSkip;
00238 };
00239
00245 class Future {
00246 public:
00247 class CommandResult {
00248 public:
00249
00250 string getServer() const { return _server; }
00251
00252 bool isDone() const { return _done; }
00253
00254 bool ok() const {
00255 assert( _done );
00256 return _ok;
00257 }
00258
00259 BSONObj result() const {
00260 assert( _done );
00261 return _res;
00262 }
00263
00268 bool join();
00269
00270 private:
00271
00272 CommandResult( const string& server , const string& db , const BSONObj& cmd , DBClientBase * conn );
00273
00274 string _server;
00275 string _db;
00276 BSONObj _cmd;
00277 DBClientBase * _conn;
00278
00279 scoped_ptr<boost::thread> _thr;
00280
00281 BSONObj _res;
00282 bool _ok;
00283 bool _done;
00284
00285 friend class Future;
00286 };
00287
00288 static void commandThread(shared_ptr<CommandResult> res);
00289
00296 static shared_ptr<CommandResult> spawnCommand( const string& server , const string& db , const BSONObj& cmd , DBClientBase * conn = 0 );
00297 };
00298
00299
00300 }
00301
00302 #include "undef_macros.h"