00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015 """Cursor class to iterate over Mongo query results."""
00016
00017 from bson.code import Code
00018 from bson.son import SON
00019 from pymongo import (helpers,
00020 message)
00021 from pymongo.errors import (InvalidOperation,
00022 AutoReconnect)
00023
00024 _QUERY_OPTIONS = {
00025 "tailable_cursor": 2,
00026 "slave_okay": 4,
00027 "oplog_replay": 8,
00028 "no_timeout": 16}
00029
00030
00031
00032
00033
00034 class Cursor(object):
00035 """A cursor / iterator over Mongo query results.
00036 """
00037
00038 def __init__(self, collection, spec=None, fields=None, skip=0, limit=0,
00039 timeout=True, snapshot=False, tailable=False, sort=None,
00040 max_scan=None, as_class=None,
00041 _must_use_master=False, _is_command=False,
00042 **kwargs):
00043 """Create a new cursor.
00044
00045 Should not be called directly by application developers - see
00046 :meth:`~pymongo.collection.Collection.find` instead.
00047
00048 .. mongodoc:: cursors
00049 """
00050 self.__id = None
00051
00052 if spec is None:
00053 spec = {}
00054
00055 if not isinstance(spec, dict):
00056 raise TypeError("spec must be an instance of dict")
00057 if not isinstance(skip, int):
00058 raise TypeError("skip must be an instance of int")
00059 if not isinstance(limit, int):
00060 raise TypeError("limit must be an instance of int")
00061 if not isinstance(timeout, bool):
00062 raise TypeError("timeout must be an instance of bool")
00063 if not isinstance(snapshot, bool):
00064 raise TypeError("snapshot must be an instance of bool")
00065 if not isinstance(tailable, bool):
00066 raise TypeError("tailable must be an instance of bool")
00067
00068 if fields is not None:
00069 if not fields:
00070 fields = {"_id": 1}
00071 if not isinstance(fields, dict):
00072 fields = helpers._fields_list_to_dict(fields)
00073
00074 if as_class is None:
00075 as_class = collection.database.connection.document_class
00076
00077 self.__collection = collection
00078 self.__spec = spec
00079 self.__fields = fields
00080 self.__skip = skip
00081 self.__limit = limit
00082 self.__batch_size = 0
00083
00084
00085
00086
00087
00088
00089
00090 self.__empty = False
00091
00092 self.__timeout = timeout
00093 self.__tailable = tailable
00094 self.__snapshot = snapshot
00095 self.__ordering = sort and helpers._index_document(sort) or None
00096 self.__max_scan = max_scan
00097 self.__explain = False
00098 self.__hint = None
00099 self.__as_class = as_class
00100 self.__tz_aware = collection.database.connection.tz_aware
00101 self.__must_use_master = _must_use_master
00102 self.__is_command = _is_command
00103
00104 self.__data = []
00105 self.__connection_id = None
00106 self.__retrieved = 0
00107 self.__killed = False
00108
00109
00110
00111 self.__kwargs = kwargs
00112
00113 @property
00114 def collection(self):
00115 """The :class:`~pymongo.collection.Collection` that this
00116 :class:`Cursor` is iterating.
00117
00118 .. versionadded:: 1.1
00119 """
00120 return self.__collection
00121
00122 def __del__(self):
00123 if self.__id and not self.__killed:
00124 self.__die()
00125
00126 def rewind(self):
00127 """Rewind this cursor to it's unevaluated state.
00128
00129 Reset this cursor if it has been partially or completely evaluated.
00130 Any options that are present on the cursor will remain in effect.
00131 Future iterating performed on this cursor will cause new queries to
00132 be sent to the server, even if the resultant data has already been
00133 retrieved by this cursor.
00134 """
00135 self.__data = []
00136 self.__id = None
00137 self.__connection_id = None
00138 self.__retrieved = 0
00139 self.__killed = False
00140
00141 return self
00142
00143 def clone(self):
00144 """Get a clone of this cursor.
00145
00146 Returns a new Cursor instance with options matching those that have
00147 been set on the current instance. The clone will be completely
00148 unevaluated, even if the current instance has been partially or
00149 completely evaluated.
00150 """
00151 copy = Cursor(self.__collection, self.__spec, self.__fields,
00152 self.__skip, self.__limit, self.__timeout,
00153 self.__tailable, self.__snapshot)
00154 copy.__ordering = self.__ordering
00155 copy.__explain = self.__explain
00156 copy.__hint = self.__hint
00157 copy.__batch_size = self.__batch_size
00158 return copy
00159
00160 def __die(self):
00161 """Closes this cursor.
00162 """
00163 if self.__id and not self.__killed:
00164 connection = self.__collection.database.connection
00165 if self.__connection_id is not None:
00166 connection.close_cursor(self.__id, self.__connection_id)
00167 else:
00168 connection.close_cursor(self.__id)
00169 self.__killed = True
00170
00171 def __query_spec(self):
00172 """Get the spec to use for a query.
00173 """
00174 spec = self.__spec
00175 if not self.__is_command and "$query" not in self.__spec:
00176 spec = SON({"$query": self.__spec})
00177 if self.__ordering:
00178 spec["$orderby"] = self.__ordering
00179 if self.__explain:
00180 spec["$explain"] = True
00181 if self.__hint:
00182 spec["$hint"] = self.__hint
00183 if self.__snapshot:
00184 spec["$snapshot"] = True
00185 if self.__max_scan:
00186 spec["$maxScan"] = self.__max_scan
00187 return spec
00188
00189 def __query_options(self):
00190 """Get the query options string to use for this query.
00191 """
00192 options = 0
00193 if self.__tailable:
00194 options |= _QUERY_OPTIONS["tailable_cursor"]
00195 if self.__collection.database.connection.slave_okay:
00196 options |= _QUERY_OPTIONS["slave_okay"]
00197 if not self.__timeout:
00198 options |= _QUERY_OPTIONS["no_timeout"]
00199 return options
00200
00201 def __check_okay_to_chain(self):
00202 """Check if it is okay to chain more options onto this cursor.
00203 """
00204 if self.__retrieved or self.__id is not None:
00205 raise InvalidOperation("cannot set options after executing query")
00206
00207 def limit(self, limit):
00208 """Limits the number of results to be returned by this cursor.
00209
00210 Raises TypeError if limit is not an instance of int. Raises
00211 InvalidOperation if this cursor has already been used. The
00212 last `limit` applied to this cursor takes precedence. A limit
00213 of ``0`` is equivalent to no limit.
00214
00215 :Parameters:
00216 - `limit`: the number of results to return
00217
00218 .. mongodoc:: limit
00219 """
00220 if not isinstance(limit, int):
00221 raise TypeError("limit must be an int")
00222 self.__check_okay_to_chain()
00223
00224 self.__empty = False
00225 self.__limit = limit
00226 return self
00227
00228 def batch_size(self, batch_size):
00229 """Set the size for batches of results returned by this cursor.
00230
00231 Raises :class:`TypeError` if `batch_size` is not an instance
00232 of :class:`int`. Raises :class:`ValueError` if `batch_size` is
00233 less than ``0``. Raises
00234 :class:`~pymongo.errors.InvalidOperation` if this
00235 :class:`Cursor` has already been used. The last `batch_size`
00236 applied to this cursor takes precedence.
00237
00238 :Parameters:
00239 - `batch_size`: The size of each batch of results requested.
00240
00241 .. versionadded:: 1.9
00242 """
00243 if not isinstance(batch_size, int):
00244 raise TypeError("batch_size must be an int")
00245 if batch_size < 0:
00246 raise ValueError("batch_size must be >= 0")
00247 self.__check_okay_to_chain()
00248
00249 self.__batch_size = batch_size == 1 and 2 or batch_size
00250 return self
00251
00252 def skip(self, skip):
00253 """Skips the first `skip` results of this cursor.
00254
00255 Raises TypeError if skip is not an instance of int. Raises
00256 InvalidOperation if this cursor has already been used. The last `skip`
00257 applied to this cursor takes precedence.
00258
00259 :Parameters:
00260 - `skip`: the number of results to skip
00261 """
00262 if not isinstance(skip, (int, long)):
00263 raise TypeError("skip must be an int")
00264 self.__check_okay_to_chain()
00265
00266 self.__skip = skip
00267 return self
00268
00269 def __getitem__(self, index):
00270 """Get a single document or a slice of documents from this cursor.
00271
00272 Raises :class:`~pymongo.errors.InvalidOperation` if this
00273 cursor has already been used.
00274
00275 To get a single document use an integral index, e.g.::
00276
00277 >>> db.test.find()[50]
00278
00279 An :class:`IndexError` will be raised if the index is negative
00280 or greater than the amount of documents in this cursor. Any
00281 limit applied to this cursor will be ignored.
00282
00283 To get a slice of documents use a slice index, e.g.::
00284
00285 >>> db.test.find()[20:25]
00286
00287 This will return this cursor with a limit of ``5`` and skip of
00288 ``20`` applied. Using a slice index will override any prior
00289 limits or skips applied to this cursor (including those
00290 applied through previous calls to this method). Raises
00291 :class:`IndexError` when the slice has a step, a negative
00292 start value, or a stop value less than or equal to the start
00293 value.
00294
00295 :Parameters:
00296 - `index`: An integer or slice index to be applied to this cursor
00297 """
00298 self.__check_okay_to_chain()
00299 self.__empty = False
00300 if isinstance(index, slice):
00301 if index.step is not None:
00302 raise IndexError("Cursor instances do not support slice steps")
00303
00304 skip = 0
00305 if index.start is not None:
00306 if index.start < 0:
00307 raise IndexError("Cursor instances do not support"
00308 "negative indices")
00309 skip = index.start
00310
00311 if index.stop is not None:
00312 limit = index.stop - skip
00313 if limit < 0:
00314 raise IndexError("stop index must be greater than start"
00315 "index for slice %r" % index)
00316 if limit == 0:
00317 self.__empty = True
00318 else:
00319 limit = 0
00320
00321 self.__skip = skip
00322 self.__limit = limit
00323 return self
00324
00325 if isinstance(index, (int, long)):
00326 if index < 0:
00327 raise IndexError("Cursor instances do not support negative"
00328 "indices")
00329 clone = self.clone()
00330 clone.skip(index + self.__skip)
00331 clone.limit(-1)
00332 for doc in clone:
00333 return doc
00334 raise IndexError("no such item for Cursor instance")
00335 raise TypeError("index %r cannot be applied to Cursor "
00336 "instances" % index)
00337
00338 def max_scan(self, max_scan):
00339 """Limit the number of documents to scan when performing the query.
00340
00341 Raises :class:`~pymongo.errors.InvalidOperation` if this
00342 cursor has already been used. Only the last :meth:`max_scan`
00343 applied to this cursor has any effect.
00344
00345 :Parameters:
00346 - `max_scan`: the maximum number of documents to scan
00347
00348 .. note:: Requires server version **>= 1.5.1**
00349
00350 .. versionadded:: 1.7
00351 """
00352 self.__check_okay_to_chain()
00353 self.__max_scan = max_scan
00354 return self
00355
00356 def sort(self, key_or_list, direction=None):
00357 """Sorts this cursor's results.
00358
00359 Takes either a single key and a direction, or a list of (key,
00360 direction) pairs. The key(s) must be an instance of ``(str,
00361 unicode)``, and the direction(s) must be one of
00362 (:data:`~pymongo.ASCENDING`,
00363 :data:`~pymongo.DESCENDING`). Raises
00364 :class:`~pymongo.errors.InvalidOperation` if this cursor has
00365 already been used. Only the last :meth:`sort` applied to this
00366 cursor has any effect.
00367
00368 :Parameters:
00369 - `key_or_list`: a single key or a list of (key, direction)
00370 pairs specifying the keys to sort on
00371 - `direction` (optional): only used if `key_or_list` is a single
00372 key, if not given :data:`~pymongo.ASCENDING` is assumed
00373 """
00374 self.__check_okay_to_chain()
00375 keys = helpers._index_list(key_or_list, direction)
00376 self.__ordering = helpers._index_document(keys)
00377 return self
00378
00379 def count(self, with_limit_and_skip=False):
00380 """Get the size of the results set for this query.
00381
00382 Returns the number of documents in the results set for this query. Does
00383 not take :meth:`limit` and :meth:`skip` into account by default - set
00384 `with_limit_and_skip` to ``True`` if that is the desired behavior.
00385 Raises :class:`~pymongo.errors.OperationFailure` on a database error.
00386
00387 :Parameters:
00388 - `with_limit_and_skip` (optional): take any :meth:`limit` or
00389 :meth:`skip` that has been applied to this cursor into account when
00390 getting the count
00391
00392 .. note:: The `with_limit_and_skip` parameter requires server
00393 version **>= 1.1.4-**
00394
00395 .. versionadded:: 1.1.1
00396 The `with_limit_and_skip` parameter.
00397 :meth:`~pymongo.cursor.Cursor.__len__` was deprecated in favor of
00398 calling :meth:`count` with `with_limit_and_skip` set to ``True``.
00399 """
00400 command = {"query": self.__spec, "fields": self.__fields}
00401
00402 if with_limit_and_skip:
00403 if self.__limit:
00404 command["limit"] = self.__limit
00405 if self.__skip:
00406 command["skip"] = self.__skip
00407
00408 r = self.__collection.database.command("count", self.__collection.name,
00409 allowable_errors=["ns missing"],
00410 **command)
00411 if r.get("errmsg", "") == "ns missing":
00412 return 0
00413 return int(r["n"])
00414
00415 def distinct(self, key):
00416 """Get a list of distinct values for `key` among all documents
00417 in the result set of this query.
00418
00419 Raises :class:`TypeError` if `key` is not an instance of
00420 :class:`basestring`.
00421
00422 :Parameters:
00423 - `key`: name of key for which we want to get the distinct values
00424
00425 .. note:: Requires server version **>= 1.1.3+**
00426
00427 .. seealso:: :meth:`pymongo.collection.Collection.distinct`
00428
00429 .. versionadded:: 1.2
00430 """
00431 if not isinstance(key, basestring):
00432 raise TypeError("key must be an instance of basestring")
00433
00434 options = {"key": key}
00435 if self.__spec:
00436 options["query"] = self.__spec
00437
00438 return self.__collection.database.command("distinct",
00439 self.__collection.name,
00440 **options)["values"]
00441
00442 def explain(self):
00443 """Returns an explain plan record for this cursor.
00444
00445 .. mongodoc:: explain
00446 """
00447 c = self.clone()
00448 c.__explain = True
00449
00450
00451 if c.__limit:
00452 c.__limit = -abs(c.__limit)
00453 return c.next()
00454
00455 def hint(self, index):
00456 """Adds a 'hint', telling Mongo the proper index to use for the query.
00457
00458 Judicious use of hints can greatly improve query
00459 performance. When doing a query on multiple fields (at least
00460 one of which is indexed) pass the indexed field as a hint to
00461 the query. Hinting will not do anything if the corresponding
00462 index does not exist. Raises
00463 :class:`~pymongo.errors.InvalidOperation` if this cursor has
00464 already been used.
00465
00466 `index` should be an index as passed to
00467 :meth:`~pymongo.collection.Collection.create_index`
00468 (e.g. ``[('field', ASCENDING)]``). If `index`
00469 is ``None`` any existing hints for this query are cleared. The
00470 last hint applied to this cursor takes precedence over all
00471 others.
00472
00473 :Parameters:
00474 - `index`: index to hint on (as an index specifier)
00475 """
00476 self.__check_okay_to_chain()
00477 if index is None:
00478 self.__hint = None
00479 return self
00480
00481 self.__hint = helpers._index_document(index)
00482 return self
00483
00484 def where(self, code):
00485 """Adds a $where clause to this query.
00486
00487 The `code` argument must be an instance of :class:`basestring`
00488 or :class:`~bson.code.Code` containing a JavaScript
00489 expression. This expression will be evaluated for each
00490 document scanned. Only those documents for which the
00491 expression evaluates to *true* will be returned as
00492 results. The keyword *this* refers to the object currently
00493 being scanned.
00494
00495 Raises :class:`TypeError` if `code` is not an instance of
00496 :class:`basestring`. Raises
00497 :class:`~pymongo.errors.InvalidOperation` if this
00498 :class:`Cursor` has already been used. Only the last call to
00499 :meth:`where` applied to a :class:`Cursor` has any effect.
00500
00501 :Parameters:
00502 - `code`: JavaScript expression to use as a filter
00503 """
00504 self.__check_okay_to_chain()
00505 if not isinstance(code, Code):
00506 code = Code(code)
00507
00508 self.__spec["$where"] = code
00509 return self
00510
00511 def __send_message(self, message):
00512 """Send a query or getmore message and handles the response.
00513 """
00514 db = self.__collection.database
00515 kwargs = {"_must_use_master": self.__must_use_master}
00516 if self.__connection_id is not None:
00517 kwargs["_connection_to_use"] = self.__connection_id
00518 kwargs.update(self.__kwargs)
00519
00520 response = db.connection._send_message_with_response(message,
00521 **kwargs)
00522
00523 if isinstance(response, tuple):
00524 (connection_id, response) = response
00525 else:
00526 connection_id = None
00527
00528 self.__connection_id = connection_id
00529
00530 try:
00531 response = helpers._unpack_response(response, self.__id,
00532 self.__as_class,
00533 self.__tz_aware)
00534 except AutoReconnect:
00535 db.connection.disconnect()
00536 raise
00537 self.__id = response["cursor_id"]
00538
00539
00540 if not self.__tailable:
00541 assert response["starting_from"] == self.__retrieved
00542
00543 self.__retrieved += response["number_returned"]
00544 self.__data = response["data"]
00545
00546 if self.__limit and self.__id and self.__limit <= self.__retrieved:
00547 self.__die()
00548
00549 def _refresh(self):
00550 """Refreshes the cursor with more data from Mongo.
00551
00552 Returns the length of self.__data after refresh. Will exit early if
00553 self.__data is already non-empty. Raises OperationFailure when the
00554 cursor cannot be refreshed due to an error on the query.
00555 """
00556 if len(self.__data) or self.__killed:
00557 return len(self.__data)
00558
00559 if self.__id is None:
00560 self.__send_message(
00561 message.query(self.__query_options(),
00562 self.__collection.full_name,
00563 self.__skip, self.__limit,
00564 self.__query_spec(), self.__fields))
00565 if not self.__id:
00566 self.__killed = True
00567 elif self.__id:
00568 if self.__limit:
00569 limit = self.__limit - self.__retrieved
00570 if self.__batch_size:
00571 limit = min(limit, self.__batch_size)
00572 else:
00573 limit = self.__batch_size
00574
00575 self.__send_message(
00576 message.get_more(self.__collection.full_name,
00577 limit, self.__id))
00578
00579 return len(self.__data)
00580
00581 @property
00582 def alive(self):
00583 """Does this cursor have the potential to return more data?
00584
00585 This is mostly useful with `tailable cursors
00586 <http://www.mongodb.org/display/DOCS/Tailable+Cursors>`_
00587 since they will stop iterating even though they *may* return more
00588 results in the future.
00589
00590 .. versionadded:: 1.5
00591 """
00592 return bool(len(self.__data) or (not self.__killed))
00593
00594 def __iter__(self):
00595 return self
00596
00597 def next(self):
00598 if self.__empty:
00599 raise StopIteration
00600 db = self.__collection.database
00601 if len(self.__data) or self._refresh():
00602 next = db._fix_outgoing(self.__data.pop(0), self.__collection)
00603 else:
00604 raise StopIteration
00605 return next