00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #pragma once
00019
00020 #include "../util/sock.h"
00021 #include "../bson/util/atomic_int.h"
00022 #include "hostandport.h"
00023
00024 namespace mongo {
00025
00026 extern bool noUnixSocket;
00027
00028 class Message;
00029 class MessagingPort;
00030 class PiggyBackData;
00031 typedef AtomicUInt MSGID;
00032
00033 class Listener : boost::noncopyable {
00034 public:
00035 Listener(const string &ip, int p, bool logConnect=true ) : _port(p), _ip(ip), _logConnect(logConnect), _elapsedTime(0) { }
00036 virtual ~Listener() {
00037 if ( _timeTracker == this )
00038 _timeTracker = 0;
00039 }
00040 void initAndListen();
00041
00042
00043 virtual void accepted(int sock, const SockAddr& from);
00044 virtual void accepted(MessagingPort *mp) {
00045 assert(!"You must overwrite one of the accepted methods");
00046 }
00047
00048 const int _port;
00049
00053 long long getMyElapsedTimeMillis() const { return _elapsedTime; }
00054
00055 void setAsTimeTracker() {
00056 _timeTracker = this;
00057 }
00058
00059 static const Listener* getTimeTracker() {
00060 return _timeTracker;
00061 }
00062
00063 static long long getElapsedTimeMillis() {
00064 if ( _timeTracker )
00065 return _timeTracker->getMyElapsedTimeMillis();
00066
00067
00068 return 0;
00069 }
00070
00071 private:
00072 string _ip;
00073 bool _logConnect;
00074 long long _elapsedTime;
00075
00076 static const Listener* _timeTracker;
00077 };
00078
00079 class AbstractMessagingPort : boost::noncopyable {
00080 public:
00081 virtual ~AbstractMessagingPort() { }
00082 virtual void reply(Message& received, Message& response, MSGID responseTo) = 0;
00083 virtual void reply(Message& received, Message& response) = 0;
00084
00085 virtual HostAndPort remote() const = 0;
00086 virtual unsigned remotePort() const = 0;
00087
00088 private:
00089 int _clientId;
00090 };
00091
00092 class MessagingPort : public AbstractMessagingPort {
00093 public:
00094 MessagingPort(int sock, const SockAddr& farEnd);
00095
00096
00097
00098
00099 MessagingPort(double so_timeout = 0, int logLevel = 0 );
00100
00101 virtual ~MessagingPort();
00102
00103 void shutdown();
00104
00105 bool connect(SockAddr& farEnd);
00106
00107
00108
00109
00110 bool recv(Message& m);
00111 void reply(Message& received, Message& response, MSGID responseTo);
00112 void reply(Message& received, Message& response);
00113 bool call(Message& toSend, Message& response);
00114
00115 void say(Message& toSend, int responseTo = -1);
00116
00126 bool recv( const Message& sent , Message& response );
00127
00128 void piggyBack( Message& toSend , int responseTo = -1 );
00129
00130 virtual unsigned remotePort() const;
00131 virtual HostAndPort remote() const;
00132
00133
00134 void send( const char * data , int len, const char *context );
00135 void send( const vector< pair< char *, int > > &data, const char *context );
00136
00137
00138 void recv( char * data , int len );
00139
00140 int unsafe_recv( char *buf, int max );
00141
00142 void clearCounters() { _bytesIn = 0; _bytesOut = 0; }
00143 long long getBytesIn() const { return _bytesIn; }
00144 long long getBytesOut() const { return _bytesOut; }
00145 private:
00146 int sock;
00147 PiggyBackData * piggyBackData;
00148
00149 long long _bytesIn;
00150 long long _bytesOut;
00151
00152
00153
00154 mutable HostAndPort _farEndParsed;
00155
00156 public:
00157 SockAddr farEnd;
00158 double _timeout;
00159 int _logLevel;
00160
00161 static void closeAllSockets(unsigned tagMask = 0xffffffff);
00162
00163
00164 unsigned tag;
00165
00166 friend class PiggyBackData;
00167 };
00168
00169 enum Operations {
00170 opReply = 1,
00171 dbMsg = 1000,
00172 dbUpdate = 2001,
00173 dbInsert = 2002,
00174
00175 dbQuery = 2004,
00176 dbGetMore = 2005,
00177 dbDelete = 2006,
00178 dbKillCursors = 2007
00179 };
00180
00181 bool doesOpGetAResponse( int op );
00182
00183 inline const char * opToString( int op ) {
00184 switch ( op ) {
00185 case 0: return "none";
00186 case opReply: return "reply";
00187 case dbMsg: return "msg";
00188 case dbUpdate: return "update";
00189 case dbInsert: return "insert";
00190 case dbQuery: return "query";
00191 case dbGetMore: return "getmore";
00192 case dbDelete: return "remove";
00193 case dbKillCursors: return "killcursors";
00194 default:
00195 PRINT(op);
00196 assert(0);
00197 return "";
00198 }
00199 }
00200
00201 inline bool opIsWrite( int op ) {
00202 switch ( op ) {
00203
00204 case 0:
00205 case opReply:
00206 case dbMsg:
00207 case dbQuery:
00208 case dbGetMore:
00209 case dbKillCursors:
00210 return false;
00211
00212 case dbUpdate:
00213 case dbInsert:
00214 case dbDelete:
00215 return false;
00216
00217 default:
00218 PRINT(op);
00219 assert(0);
00220 return "";
00221 }
00222
00223 }
00224
00225 #pragma pack(1)
00226
00227
00228 struct MSGHEADER {
00229 int messageLength;
00230 int requestID;
00231 int responseTo;
00232
00233 int opCode;
00234 };
00235 struct OP_GETMORE : public MSGHEADER {
00236 MSGHEADER header;
00237 int ZERO_or_flags;
00238
00239
00240
00241 };
00242 #pragma pack()
00243
00244 #pragma pack(1)
00245
00246 struct MsgData {
00247 int len;
00248 MSGID id;
00249 MSGID responseTo;
00250 short _operation;
00251 char _flags;
00252 char _version;
00253 int operation() const {
00254 return _operation;
00255 }
00256 void setOperation(int o) {
00257 _flags = 0;
00258 _version = 0;
00259 _operation = o;
00260 }
00261 char _data[4];
00262
00263 int& dataAsInt() {
00264 return *((int *) _data);
00265 }
00266
00267 bool valid() {
00268 if ( len <= 0 || len > ( 4 * BSONObjMaxInternalSize ) )
00269 return false;
00270 if ( _operation < 0 || _operation > 30000 )
00271 return false;
00272 return true;
00273 }
00274
00275 long long getCursor() {
00276 assert( responseTo > 0 );
00277 assert( _operation == opReply );
00278 long long * l = (long long *)(_data + 4);
00279 return l[0];
00280 }
00281
00282 int dataLen();
00283 };
00284 const int MsgDataHeaderSize = sizeof(MsgData) - 4;
00285 inline int MsgData::dataLen() {
00286 return len - MsgDataHeaderSize;
00287 }
00288 #pragma pack()
00289
00290 class Message {
00291 public:
00292
00293 Message() : _buf( 0 ), _data( 0 ), _freeIt( false ) {}
00294 Message( void * data , bool freeIt ) :
00295 _buf( 0 ), _data( 0 ), _freeIt( false ) {
00296 _setData( reinterpret_cast< MsgData* >( data ), freeIt );
00297 };
00298 Message(Message& r) : _buf( 0 ), _data( 0 ), _freeIt( false ) {
00299 *this = r;
00300 }
00301 ~Message() {
00302 reset();
00303 }
00304
00305 SockAddr _from;
00306
00307 MsgData *header() const {
00308 assert( !empty() );
00309 return _buf ? _buf : reinterpret_cast< MsgData* > ( _data[ 0 ].first );
00310 }
00311 int operation() const { return header()->operation(); }
00312
00313 MsgData *singleData() const {
00314 massert( 13273, "single data buffer expected", _buf );
00315 return header();
00316 }
00317
00318 bool empty() const { return !_buf && _data.empty(); }
00319
00320 int size() const {
00321 int res = 0;
00322 if ( _buf ) {
00323 res = _buf->len;
00324 }
00325 else {
00326 for (MsgVec::const_iterator it = _data.begin(); it != _data.end(); ++it) {
00327 res += it->second;
00328 }
00329 }
00330 return res;
00331 }
00332
00333 int dataSize() const { return size() - sizeof(MSGHEADER); }
00334
00335
00336
00337 void concat() {
00338 if ( _buf || empty() ) {
00339 return;
00340 }
00341
00342 assert( _freeIt );
00343 int totalSize = 0;
00344 for( vector< pair< char *, int > >::const_iterator i = _data.begin(); i != _data.end(); ++i ) {
00345 totalSize += i->second;
00346 }
00347 char *buf = (char*)malloc( totalSize );
00348 char *p = buf;
00349 for( vector< pair< char *, int > >::const_iterator i = _data.begin(); i != _data.end(); ++i ) {
00350 memcpy( p, i->first, i->second );
00351 p += i->second;
00352 }
00353 reset();
00354 _setData( (MsgData*)buf, true );
00355 }
00356
00357
00358 Message& operator=(Message& r) {
00359 assert( empty() );
00360 assert( r._freeIt );
00361 _buf = r._buf;
00362 r._buf = 0;
00363 if ( r._data.size() > 0 ) {
00364 _data.swap( r._data );
00365 }
00366 r._freeIt = false;
00367 _freeIt = true;
00368 return *this;
00369 }
00370
00371 void reset() {
00372 if ( _freeIt ) {
00373 if ( _buf ) {
00374 free( _buf );
00375 }
00376 for( vector< pair< char *, int > >::const_iterator i = _data.begin(); i != _data.end(); ++i ) {
00377 free(i->first);
00378 }
00379 }
00380 _buf = 0;
00381 _data.clear();
00382 _freeIt = false;
00383 }
00384
00385
00386
00387 void appendData(char *d, int size) {
00388 if ( size <= 0 ) {
00389 return;
00390 }
00391 if ( empty() ) {
00392 MsgData *md = (MsgData*)d;
00393 md->len = size;
00394 _setData( md, true );
00395 return;
00396 }
00397 assert( _freeIt );
00398 if ( _buf ) {
00399 _data.push_back( make_pair( (char*)_buf, _buf->len ) );
00400 _buf = 0;
00401 }
00402 _data.push_back( make_pair( d, size ) );
00403 header()->len += size;
00404 }
00405
00406
00407 void setData(MsgData *d, bool freeIt) {
00408 assert( empty() );
00409 _setData( d, freeIt );
00410 }
00411 void setData(int operation, const char *msgtxt) {
00412 setData(operation, msgtxt, strlen(msgtxt)+1);
00413 }
00414 void setData(int operation, const char *msgdata, size_t len) {
00415 assert( empty() );
00416 size_t dataLen = len + sizeof(MsgData) - 4;
00417 MsgData *d = (MsgData *) malloc(dataLen);
00418 memcpy(d->_data, msgdata, len);
00419 d->len = fixEndian(dataLen);
00420 d->setOperation(operation);
00421 _setData( d, true );
00422 }
00423
00424 bool doIFreeIt() {
00425 return _freeIt;
00426 }
00427
00428 void send( MessagingPort &p, const char *context ) {
00429 if ( empty() ) {
00430 return;
00431 }
00432 if ( _buf != 0 ) {
00433 p.send( (char*)_buf, _buf->len, context );
00434 }
00435 else {
00436 p.send( _data, context );
00437 }
00438 }
00439
00440 private:
00441 void _setData( MsgData *d, bool freeIt ) {
00442 _freeIt = freeIt;
00443 _buf = d;
00444 }
00445
00446 MsgData * _buf;
00447
00448 typedef vector< pair< char*, int > > MsgVec;
00449 MsgVec _data;
00450 bool _freeIt;
00451 };
00452
00453 class SocketException : public DBException {
00454 public:
00455 const enum Type { CLOSED , RECV_ERROR , SEND_ERROR, RECV_TIMEOUT, SEND_TIMEOUT, FAILED_STATE, CONNECT_ERROR } _type;
00456
00457 SocketException( Type t , string server="" , int code = 9001 , string extra="" ) : DBException( "socket exception" , code ) , _type(t) , _server(server), _extra(extra){ }
00458 virtual ~SocketException() throw() {}
00459
00460 bool shouldPrint() const { return _type != CLOSED; }
00461 virtual string toString() const;
00462
00463 private:
00464 string _server;
00465 string _extra;
00466 };
00467
00468 MSGID nextMessageId();
00469
00470 extern TicketHolder connTicketHolder;
00471
00472 class ElapsedTracker {
00473 public:
00474 ElapsedTracker( int hitsBetweenMarks , int msBetweenMarks )
00475 : _h( hitsBetweenMarks ) , _ms( msBetweenMarks ) , _pings(0) {
00476 _last = Listener::getElapsedTimeMillis();
00477 }
00478
00483 bool ping() {
00484 if ( ( ++_pings % _h ) == 0 ) {
00485 _last = Listener::getElapsedTimeMillis();
00486 return true;
00487 }
00488
00489 long long now = Listener::getElapsedTimeMillis();
00490 if ( now - _last > _ms ) {
00491 _last = now;
00492 return true;
00493 }
00494
00495 return false;
00496 }
00497
00498 private:
00499 int _h;
00500 int _ms;
00501
00502 unsigned long long _pings;
00503
00504 long long _last;
00505
00506 };
00507
00508 }