00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015 """Master-Slave connection to Mongo.
00016
00017 Performs all writes to Master instance and distributes reads among all
00018 instances."""
00019
00020 import random
00021
00022 from pymongo.connection import Connection
00023 from pymongo.database import Database
00024
00025
00026 class MasterSlaveConnection(object):
00027 """A master-slave connection to Mongo.
00028 """
00029
00030 def __init__(self, master, slaves=[]):
00031 """Create a new Master-Slave connection.
00032
00033 The resultant connection should be interacted with using the same
00034 mechanisms as a regular `Connection`. The `Connection` instances used
00035 to create this `MasterSlaveConnection` can themselves make use of
00036 connection pooling, etc. 'Connection' instances used as slaves should
00037 be created with the slave_okay option set to True.
00038
00039 Raises TypeError if `master` is not an instance of `Connection` or
00040 slaves is not a list of at least one `Connection` instances.
00041
00042 :Parameters:
00043 - `master`: `Connection` instance for the writable Master
00044 - `slaves` (optional): list of `Connection` instances for the
00045 read-only slaves
00046 """
00047 if not isinstance(master, Connection):
00048 raise TypeError("master must be a Connection instance")
00049 if not isinstance(slaves, list) or len(slaves) == 0:
00050 raise TypeError("slaves must be a list of length >= 1")
00051
00052 for slave in slaves:
00053 if not isinstance(slave, Connection):
00054 raise TypeError("slave %r is not an instance of Connection" %
00055 slave)
00056
00057 self.__in_request = False
00058 self.__master = master
00059 self.__slaves = slaves
00060
00061 @property
00062 def master(self):
00063 return self.__master
00064
00065 @property
00066 def slaves(self):
00067 return self.__slaves
00068
00069
00070 @property
00071 def document_class(self):
00072 return dict
00073
00074
00075 @property
00076 def tz_aware(self):
00077 return True
00078
00079 @property
00080 def slave_okay(self):
00081 """Is it okay for this connection to connect directly to a slave?
00082
00083 This is always True for MasterSlaveConnection instances.
00084 """
00085 return True
00086
00087 def set_cursor_manager(self, manager_class):
00088 """Set the cursor manager for this connection.
00089
00090 Helper to set cursor manager for each individual `Connection` instance
00091 that make up this `MasterSlaveConnection`.
00092 """
00093 self.__master.set_cursor_manager(manager_class)
00094 for slave in self.__slaves:
00095 slave.set_cursor_manager(manager_class)
00096
00097
00098
00099
00100 def _send_message(self, message, safe=False, _connection_to_use=None):
00101 """Say something to Mongo.
00102
00103 Sends a message on the Master connection. This is used for inserts,
00104 updates, and deletes.
00105
00106 Raises ConnectionFailure if the message cannot be sent. Returns the
00107 request id of the sent message.
00108
00109 :Parameters:
00110 - `operation`: opcode of the message
00111 - `data`: data to send
00112 - `safe`: perform a getLastError after sending the message
00113 """
00114 if _connection_to_use is None or _connection_to_use == -1:
00115 return self.__master._send_message(message, safe)
00116 return self.__slaves[_connection_to_use]._send_message(message, safe)
00117
00118
00119
00120
00121 def _send_message_with_response(self, message, _connection_to_use=None,
00122 _must_use_master=False, **kwargs):
00123 """Receive a message from Mongo.
00124
00125 Sends the given message and returns a (connection_id, response) pair.
00126
00127 :Parameters:
00128 - `operation`: opcode of the message to send
00129 - `data`: data to send
00130 """
00131 if _connection_to_use is not None:
00132 if _connection_to_use == -1:
00133 return (-1,
00134 self.__master._send_message_with_response(message,
00135 **kwargs))
00136 else:
00137 return (_connection_to_use,
00138 self.__slaves[_connection_to_use]
00139 ._send_message_with_response(message, **kwargs))
00140
00141
00142 connection_id = random.randrange(0, len(self.__slaves))
00143
00144
00145
00146
00147 if _must_use_master or self.__in_request or connection_id == -1:
00148 return (-1, self.__master._send_message_with_response(message,
00149 **kwargs))
00150
00151 slaves = self.__slaves[connection_id]
00152 return (connection_id, slaves._send_message_with_response(message,
00153 **kwargs))
00154
00155 def start_request(self):
00156 """Start a "request".
00157
00158 Start a sequence of operations in which order matters. Note
00159 that all operations performed within a request will be sent
00160 using the Master connection.
00161 """
00162 self.__in_request = True
00163
00164 def end_request(self):
00165 """End the current "request".
00166
00167 See documentation for `Connection.end_request`.
00168 """
00169 self.__in_request = False
00170 self.__master.end_request()
00171
00172 def __cmp__(self, other):
00173 if isinstance(other, MasterSlaveConnection):
00174 return cmp((self.__master, self.__slaves),
00175 (other.__master, other.__slaves))
00176 return NotImplemented
00177
00178 def __repr__(self):
00179 return "MasterSlaveConnection(%r, %r)" % (self.__master, self.__slaves)
00180
00181 def __getattr__(self, name):
00182 """Get a database by name.
00183
00184 Raises InvalidName if an invalid database name is used.
00185
00186 :Parameters:
00187 - `name`: the name of the database to get
00188 """
00189 return Database(self, name)
00190
00191 def __getitem__(self, name):
00192 """Get a database by name.
00193
00194 Raises InvalidName if an invalid database name is used.
00195
00196 :Parameters:
00197 - `name`: the name of the database to get
00198 """
00199 return self.__getattr__(name)
00200
00201 def close_cursor(self, cursor_id, connection_id):
00202 """Close a single database cursor.
00203
00204 Raises TypeError if cursor_id is not an instance of (int, long). What
00205 closing the cursor actually means depends on this connection's cursor
00206 manager.
00207
00208 :Parameters:
00209 - `cursor_id`: cursor id to close
00210 - `connection_id`: id of the `Connection` instance where the cursor
00211 was opened
00212 """
00213 if connection_id == -1:
00214 return self.__master.close_cursor(cursor_id)
00215 return self.__slaves[connection_id].close_cursor(cursor_id)
00216
00217 def database_names(self):
00218 """Get a list of all database names.
00219 """
00220 return self.__master.database_names()
00221
00222 def drop_database(self, name_or_database):
00223 """Drop a database.
00224
00225 :Parameters:
00226 - `name_or_database`: the name of a database to drop or the object
00227 itself
00228 """
00229 return self.__master.drop_database(name_or_database)
00230
00231 def __iter__(self):
00232 return self
00233
00234 def next(self):
00235 raise TypeError("'MasterSlaveConnection' object is not iterable")
00236
00237 def _cache_index(self, database_name, collection_name, index_name, ttl):
00238 return self.__master._cache_index(database_name, collection_name,
00239 index_name, ttl)
00240
00241 def _purge_index(self, database_name,
00242 collection_name=None, index_name=None):
00243 return self.__master._purge_index(database_name,
00244 collection_name,
00245 index_name)