client.py
Go to the documentation of this file.
00001 from __future__ import with_statement
00002 from itertools import starmap
00003 import datetime
00004 import warnings
00005 import time as mod_time
00006 from redis._compat import (b, izip, imap, iteritems, dictkeys, dictvalues,
00007                            basestring, long, nativestr, urlparse)
00008 from redis.connection import ConnectionPool, UnixDomainSocketConnection
00009 from redis.exceptions import (
00010     ConnectionError,
00011     DataError,
00012     RedisError,
00013     ResponseError,
00014     WatchError,
00015     NoScriptError
00016 )
00017 
00018 SYM_EMPTY = b('')
00019 
00020 
00021 def list_or_args(keys, args):
00022     # returns a single list combining keys and args
00023     try:
00024         iter(keys)
00025         # a string can be iterated, but indicates
00026         # keys wasn't passed as a list
00027         if isinstance(keys, basestring):
00028             keys = [keys]
00029     except TypeError:
00030         keys = [keys]
00031     if args:
00032         keys.extend(args)
00033     return keys
00034 
00035 
00036 def timestamp_to_datetime(response):
00037     "Converts a unix timestamp to a Python datetime object"
00038     if not response:
00039         return None
00040     try:
00041         response = int(response)
00042     except ValueError:
00043         return None
00044     return datetime.datetime.fromtimestamp(response)
00045 
00046 
00047 def string_keys_to_dict(key_string, callback):
00048     return dict.fromkeys(key_string.split(), callback)
00049 
00050 
00051 def dict_merge(*dicts):
00052     merged = {}
00053     [merged.update(d) for d in dicts]
00054     return merged
00055 
00056 
00057 def parse_debug_object(response):
00058     "Parse the results of Redis's DEBUG OBJECT command into a Python dict"
00059     # The 'type' of the object is the first item in the response, but isn't
00060     # prefixed with a name
00061     response = nativestr(response)
00062     response = 'type:' + response
00063     response = dict([kv.split(':') for kv in response.split()])
00064 
00065     # parse some expected int values from the string response
00066     # note: this cmd isn't spec'd so these may not appear in all redis versions
00067     int_fields = ('refcount', 'serializedlength', 'lru', 'lru_seconds_idle')
00068     for field in int_fields:
00069         if field in response:
00070             response[field] = int(response[field])
00071 
00072     return response
00073 
00074 
00075 def parse_object(response, infotype):
00076     "Parse the results of an OBJECT command"
00077     if infotype in ('idletime', 'refcount'):
00078         return int(response)
00079     return response
00080 
00081 
00082 def parse_info(response):
00083     "Parse the result of Redis's INFO command into a Python dict"
00084     info = {}
00085     response = nativestr(response)
00086 
00087     def get_value(value):
00088         if ',' not in value or '=' not in value:
00089             return value
00090 
00091         sub_dict = {}
00092         for item in value.split(','):
00093             k, v = item.rsplit('=', 1)
00094             try:
00095                 sub_dict[k] = int(v)
00096             except ValueError:
00097                 sub_dict[k] = v
00098         return sub_dict
00099 
00100     for line in response.splitlines():
00101         if line and not line.startswith('#'):
00102             key, value = line.split(':')
00103             try:
00104                 if '.' in value:
00105                     info[key] = float(value)
00106                 else:
00107                     info[key] = int(value)
00108             except ValueError:
00109                 info[key] = get_value(value)
00110     return info
00111 
00112 
00113 def pairs_to_dict(response):
00114     "Create a dict given a list of key/value pairs"
00115     it = iter(response)
00116     return dict(izip(it, it))
00117 
00118 
00119 def zset_score_pairs(response, **options):
00120     """
00121     If ``withscores`` is specified in the options, return the response as
00122     a list of (value, score) pairs
00123     """
00124     if not response or not options['withscores']:
00125         return response
00126     score_cast_func = options.get('score_cast_func', float)
00127     it = iter(response)
00128     return list(izip(it, imap(score_cast_func, it)))
00129 
00130 
00131 def int_or_none(response):
00132     if response is None:
00133         return None
00134     return int(response)
00135 
00136 
00137 def float_or_none(response):
00138     if response is None:
00139         return None
00140     return float(response)
00141 
00142 
00143 def parse_config(response, **options):
00144     if options['parse'] == 'GET':
00145         response = [nativestr(i) if i is not None else None for i in response]
00146         return response and pairs_to_dict(response) or {}
00147     return nativestr(response) == 'OK'
00148 
00149 
00150 def parse_script(response, **options):
00151     parse = options['parse']
00152     if parse in ('FLUSH', 'KILL'):
00153         return response == 'OK'
00154     if parse == 'EXISTS':
00155         return list(imap(bool, response))
00156     return response
00157 
00158 
00159 class StrictRedis(object):
00160     """
00161     Implementation of the Redis protocol.
00162 
00163     This abstract class provides a Python interface to all Redis commands
00164     and an implementation of the Redis protocol.
00165 
00166     Connection and Pipeline derive from this, implementing how
00167     the commands are sent and received to the Redis server
00168     """
00169     RESPONSE_CALLBACKS = dict_merge(
00170         string_keys_to_dict(
00171             'AUTH DEL EXISTS EXPIRE EXPIREAT HDEL HEXISTS HMSET MOVE MSETNX '
00172             'PERSIST RENAMENX SISMEMBER SMOVE SETEX SETNX SREM ZREM',
00173             bool
00174         ),
00175         string_keys_to_dict(
00176             'BITCOUNT DECRBY GETBIT HLEN INCRBY LINSERT LLEN LPUSHX RPUSHX '
00177             'SADD SCARD SDIFFSTORE SETBIT SETRANGE SINTERSTORE STRLEN '
00178             'SUNIONSTORE ZADD ZCARD ZREMRANGEBYRANK ZREMRANGEBYSCORE',
00179             int
00180         ),
00181         string_keys_to_dict(
00182             # these return OK, or int if redis-server is >=1.3.4
00183             'LPUSH RPUSH',
00184             lambda r: isinstance(r, long) and r or nativestr(r) == 'OK'
00185         ),
00186         string_keys_to_dict('ZSCORE ZINCRBY', float_or_none),
00187         string_keys_to_dict(
00188             'FLUSHALL FLUSHDB LSET LTRIM MSET RENAME '
00189             'SAVE SELECT SET SHUTDOWN SLAVEOF WATCH UNWATCH',
00190             lambda r: nativestr(r) == 'OK'
00191         ),
00192         string_keys_to_dict('BLPOP BRPOP', lambda r: r and tuple(r) or None),
00193         string_keys_to_dict(
00194             'SDIFF SINTER SMEMBERS SUNION',
00195             lambda r: r and set(r) or set()
00196         ),
00197         string_keys_to_dict(
00198             'ZRANGE ZRANGEBYSCORE ZREVRANGE ZREVRANGEBYSCORE',
00199             zset_score_pairs
00200         ),
00201         string_keys_to_dict('ZRANK ZREVRANK', int_or_none),
00202         {
00203             'BGREWRITEAOF': (
00204                 lambda r: r == 'Background rewriting of AOF file started'
00205             ),
00206             'BGSAVE': lambda r: r == 'Background saving started',
00207             'BRPOPLPUSH': lambda r: r and r or None,
00208             'CONFIG': parse_config,
00209             'DEBUG': parse_debug_object,
00210             'HGETALL': lambda r: r and pairs_to_dict(r) or {},
00211             'INFO': parse_info,
00212             'LASTSAVE': timestamp_to_datetime,
00213             'OBJECT': parse_object,
00214             'PING': lambda r: nativestr(r) == 'PONG',
00215             'RANDOMKEY': lambda r: r and r or None,
00216             'SCRIPT': parse_script,
00217             'TIME': lambda x: (int(x[0]), int(x[1]))
00218         }
00219     )
00220 
00221     @classmethod
00222     def from_url(cls, url, db=None, **kwargs):
00223         """
00224         Return a Redis client object configured from the given URL.
00225 
00226         For example::
00227 
00228             redis://username:password@localhost:6379/0
00229 
00230         If ``db`` is None, this method will attempt to extract the database ID
00231         from the URL path component.
00232 
00233         Any additional keyword arguments will be passed along to the Redis
00234         class's initializer.
00235         """
00236         url = urlparse(url)
00237 
00238         # We only support redis:// schemes.
00239         assert url.scheme == 'redis' or not url.scheme
00240 
00241         # Extract the database ID from the path component if hasn't been given.
00242         if db is None:
00243             try:
00244                 db = int(url.path.replace('/', ''))
00245             except (AttributeError, ValueError):
00246                 db = 0
00247 
00248         return cls(host=url.hostname, port=url.port, db=db,
00249                    password=url.password, **kwargs)
00250 
00251     def __init__(self, host='localhost', port=6379,
00252                  db=0, password=None, socket_timeout=None,
00253                  connection_pool=None, charset='utf-8',
00254                  errors='strict', decode_responses=False,
00255                  unix_socket_path=None):
00256         if not connection_pool:
00257             kwargs = {
00258                 'db': db,
00259                 'password': password,
00260                 'socket_timeout': socket_timeout,
00261                 'encoding': charset,
00262                 'encoding_errors': errors,
00263                 'decode_responses': decode_responses,
00264             }
00265             # based on input, setup appropriate connection args
00266             if unix_socket_path:
00267                 kwargs.update({
00268                     'path': unix_socket_path,
00269                     'connection_class': UnixDomainSocketConnection
00270                 })
00271             else:
00272                 kwargs.update({
00273                     'host': host,
00274                     'port': port
00275                 })
00276             connection_pool = ConnectionPool(**kwargs)
00277         self.connection_pool = connection_pool
00278 
00279         self.response_callbacks = self.__class__.RESPONSE_CALLBACKS.copy()
00280 
00281     def set_response_callback(self, command, callback):
00282         "Set a custom Response Callback"
00283         self.response_callbacks[command] = callback
00284 
00285     def pipeline(self, transaction=True, shard_hint=None):
00286         """
00287         Return a new pipeline object that can queue multiple commands for
00288         later execution. ``transaction`` indicates whether all commands
00289         should be executed atomically. Apart from making a group of operations
00290         atomic, pipelines are useful for reducing the back-and-forth overhead
00291         between the client and server.
00292         """
00293         return StrictPipeline(
00294             self.connection_pool,
00295             self.response_callbacks,
00296             transaction,
00297             shard_hint)
00298 
00299     def transaction(self, func, *watches, **kwargs):
00300         """
00301         Convenience method for executing the callable `func` as a transaction
00302         while watching all keys specified in `watches`. The 'func' callable
00303         should expect a single arguement which is a Pipeline object.
00304         """
00305         shard_hint = kwargs.pop('shard_hint', None)
00306         with self.pipeline(True, shard_hint) as pipe:
00307             while 1:
00308                 try:
00309                     if watches:
00310                         pipe.watch(*watches)
00311                     func(pipe)
00312                     return pipe.execute()
00313                 except WatchError:
00314                     continue
00315 
00316     def lock(self, name, timeout=None, sleep=0.1):
00317         """
00318         Return a new Lock object using key ``name`` that mimics
00319         the behavior of threading.Lock.
00320 
00321         If specified, ``timeout`` indicates a maximum life for the lock.
00322         By default, it will remain locked until release() is called.
00323 
00324         ``sleep`` indicates the amount of time to sleep per loop iteration
00325         when the lock is in blocking mode and another client is currently
00326         holding the lock.
00327         """
00328         return Lock(self, name, timeout=timeout, sleep=sleep)
00329 
00330     def pubsub(self, shard_hint=None):
00331         """
00332         Return a Publish/Subscribe object. With this object, you can
00333         subscribe to channels and listen for messages that get published to
00334         them.
00335         """
00336         return PubSub(self.connection_pool, shard_hint)
00337 
00338     #### COMMAND EXECUTION AND PROTOCOL PARSING ####
00339     def execute_command(self, *args, **options):
00340         "Execute a command and return a parsed response"
00341         pool = self.connection_pool
00342         command_name = args[0]
00343         connection = pool.get_connection(command_name, **options)
00344         try:
00345             connection.send_command(*args)
00346             return self.parse_response(connection, command_name, **options)
00347         except ConnectionError:
00348             connection.disconnect()
00349             connection.send_command(*args)
00350             return self.parse_response(connection, command_name, **options)
00351         finally:
00352             pool.release(connection)
00353 
00354     def parse_response(self, connection, command_name, **options):
00355         "Parses a response from the Redis server"
00356         response = connection.read_response()
00357         if command_name in self.response_callbacks:
00358             return self.response_callbacks[command_name](response, **options)
00359         return response
00360 
00361     #### SERVER INFORMATION ####
00362     def bgrewriteaof(self):
00363         "Tell the Redis server to rewrite the AOF file from data in memory."
00364         return self.execute_command('BGREWRITEAOF')
00365 
00366     def bgsave(self):
00367         """
00368         Tell the Redis server to save its data to disk.  Unlike save(),
00369         this method is asynchronous and returns immediately.
00370         """
00371         return self.execute_command('BGSAVE')
00372 
00373     def config_get(self, pattern="*"):
00374         "Return a dictionary of configuration based on the ``pattern``"
00375         return self.execute_command('CONFIG', 'GET', pattern, parse='GET')
00376 
00377     def config_set(self, name, value):
00378         "Set config item ``name`` with ``value``"
00379         return self.execute_command('CONFIG', 'SET', name, value, parse='SET')
00380 
00381     def dbsize(self):
00382         "Returns the number of keys in the current database"
00383         return self.execute_command('DBSIZE')
00384 
00385     def time(self):
00386         """
00387         Returns the server time as a 2-item tuple of ints:
00388         (seconds since epoch, microseconds into this second).
00389         """
00390         return self.execute_command('TIME')
00391 
00392     def debug_object(self, key):
00393         "Returns version specific metainformation about a give key"
00394         return self.execute_command('DEBUG', 'OBJECT', key)
00395 
00396     def delete(self, *names):
00397         "Delete one or more keys specified by ``names``"
00398         return self.execute_command('DEL', *names)
00399     __delitem__ = delete
00400 
00401     def echo(self, value):
00402         "Echo the string back from the server"
00403         return self.execute_command('ECHO', value)
00404 
00405     def flushall(self):
00406         "Delete all keys in all databases on the current host"
00407         return self.execute_command('FLUSHALL')
00408 
00409     def flushdb(self):
00410         "Delete all keys in the current database"
00411         return self.execute_command('FLUSHDB')
00412 
00413     def info(self):
00414         "Returns a dictionary containing information about the Redis server"
00415         return self.execute_command('INFO')
00416 
00417     def lastsave(self):
00418         """
00419         Return a Python datetime object representing the last time the
00420         Redis database was saved to disk
00421         """
00422         return self.execute_command('LASTSAVE')
00423 
00424     def object(self, infotype, key):
00425         "Return the encoding, idletime, or refcount about the key"
00426         return self.execute_command('OBJECT', infotype, key, infotype=infotype)
00427 
00428     def ping(self):
00429         "Ping the Redis server"
00430         return self.execute_command('PING')
00431 
00432     def save(self):
00433         """
00434         Tell the Redis server to save its data to disk,
00435         blocking until the save is complete
00436         """
00437         return self.execute_command('SAVE')
00438 
00439     def shutdown(self):
00440         "Shutdown the server"
00441         try:
00442             self.execute_command('SHUTDOWN')
00443         except ConnectionError:
00444             # a ConnectionError here is expected
00445             return
00446         raise RedisError("SHUTDOWN seems to have failed.")
00447 
00448     def slaveof(self, host=None, port=None):
00449         """
00450         Set the server to be a replicated slave of the instance identified
00451         by the ``host`` and ``port``. If called without arguements, the
00452         instance is promoted to a master instead.
00453         """
00454         if host is None and port is None:
00455             return self.execute_command("SLAVEOF", "NO", "ONE")
00456         return self.execute_command("SLAVEOF", host, port)
00457 
00458     #### BASIC KEY COMMANDS ####
00459     def append(self, key, value):
00460         """
00461         Appends the string ``value`` to the value at ``key``. If ``key``
00462         doesn't already exist, create it with a value of ``value``.
00463         Returns the new length of the value at ``key``.
00464         """
00465         return self.execute_command('APPEND', key, value)
00466 
00467     def getrange(self, key, start, end):
00468         """
00469         Returns the substring of the string value stored at ``key``,
00470         determined by the offsets ``start`` and ``end`` (both are inclusive)
00471         """
00472         return self.execute_command('GETRANGE', key, start, end)
00473 
00474     def bitcount(self, key, start=None, end=None):
00475         """
00476         Returns the count of set bits in the value of ``key``.  Optional
00477         ``start`` and ``end`` paramaters indicate which bytes to consider
00478         """
00479         params = [key]
00480         if start and end:
00481             params.append(start)
00482             params.append(end)
00483         elif (start and not end) or (end and not start):
00484             raise RedisError("Both start and end must be specified")
00485         return self.execute_command('BITCOUNT', *params)
00486 
00487     def bitop(self, operation, dest, *keys):
00488         """
00489         Perform a bitwise operation using ``operation`` between ``keys`` and
00490         store the result in ``dest``.
00491         """
00492         return self.execute_command('BITOP', operation, dest, *keys)
00493 
00494     def decr(self, name, amount=1):
00495         """
00496         Decrements the value of ``key`` by ``amount``.  If no key exists,
00497         the value will be initialized as 0 - ``amount``
00498         """
00499         return self.execute_command('DECRBY', name, amount)
00500 
00501     def exists(self, name):
00502         "Returns a boolean indicating whether key ``name`` exists"
00503         return self.execute_command('EXISTS', name)
00504     __contains__ = exists
00505 
00506     def expire(self, name, time):
00507         """
00508         Set an expire flag on key ``name`` for ``time`` seconds. ``time``
00509         can be represented by an integer or a Python timedelta object.
00510         """
00511         if isinstance(time, datetime.timedelta):
00512             time = int(time.total_seconds())
00513         return self.execute_command('EXPIRE', name, time)
00514 
00515     def expireat(self, name, when):
00516         """
00517         Set an expire flag on key ``name``. ``when`` can be represented
00518         as an integer indicating unix time or a Python datetime object.
00519         """
00520         if isinstance(when, datetime.datetime):
00521             when = int(mod_time.mktime(when.timetuple()))
00522         return self.execute_command('EXPIREAT', name, when)
00523 
00524     def get(self, name):
00525         """
00526         Return the value at key ``name``, or None if the key doesn't exist
00527         """
00528         return self.execute_command('GET', name)
00529 
00530     def __getitem__(self, name):
00531         """
00532         Return the value at key ``name``, raises a KeyError if the key
00533         doesn't exist.
00534         """
00535         value = self.get(name)
00536         if value:
00537             return value
00538         raise KeyError(name)
00539 
00540     def getbit(self, name, offset):
00541         "Returns a boolean indicating the value of ``offset`` in ``name``"
00542         return self.execute_command('GETBIT', name, offset)
00543 
00544     def getset(self, name, value):
00545         """
00546         Set the value at key ``name`` to ``value`` if key doesn't exist
00547         Return the value at key ``name`` atomically
00548         """
00549         return self.execute_command('GETSET', name, value)
00550 
00551     def incr(self, name, amount=1):
00552         """
00553         Increments the value of ``key`` by ``amount``.  If no key exists,
00554         the value will be initialized as ``amount``
00555         """
00556         return self.execute_command('INCRBY', name, amount)
00557 
00558     def keys(self, pattern='*'):
00559         "Returns a list of keys matching ``pattern``"
00560         return self.execute_command('KEYS', pattern)
00561 
00562     def mget(self, keys, *args):
00563         """
00564         Returns a list of values ordered identically to ``keys``
00565         """
00566         args = list_or_args(keys, args)
00567         return self.execute_command('MGET', *args)
00568 
00569     def mset(self, mapping):
00570         "Sets each key in the ``mapping`` dict to its corresponding value"
00571         items = []
00572         for pair in iteritems(mapping):
00573             items.extend(pair)
00574         return self.execute_command('MSET', *items)
00575 
00576     def msetnx(self, mapping):
00577         """
00578         Sets each key in the ``mapping`` dict to its corresponding value if
00579         none of the keys are already set
00580         """
00581         items = []
00582         for pair in iteritems(mapping):
00583             items.extend(pair)
00584         return self.execute_command('MSETNX', *items)
00585 
00586     def move(self, name, db):
00587         "Moves the key ``name`` to a different Redis database ``db``"
00588         return self.execute_command('MOVE', name, db)
00589 
00590     def persist(self, name):
00591         "Removes an expiration on ``name``"
00592         return self.execute_command('PERSIST', name)
00593 
00594     def randomkey(self):
00595         "Returns the name of a random key"
00596         return self.execute_command('RANDOMKEY')
00597 
00598     def rename(self, src, dst):
00599         """
00600         Rename key ``src`` to ``dst``
00601         """
00602         return self.execute_command('RENAME', src, dst)
00603 
00604     def renamenx(self, src, dst):
00605         "Rename key ``src`` to ``dst`` if ``dst`` doesn't already exist"
00606         return self.execute_command('RENAMENX', src, dst)
00607 
00608     def set(self, name, value):
00609         "Set the value at key ``name`` to ``value``"
00610         return self.execute_command('SET', name, value)
00611     __setitem__ = set
00612 
00613     def setbit(self, name, offset, value):
00614         """
00615         Flag the ``offset`` in ``name`` as ``value``. Returns a boolean
00616         indicating the previous value of ``offset``.
00617         """
00618         value = value and 1 or 0
00619         return self.execute_command('SETBIT', name, offset, value)
00620 
00621     def setex(self, name, time, value):
00622         """
00623         Set the value of key ``name`` to ``value`` that expires in ``time``
00624         seconds. ``time`` can be represented by an integer or a Python
00625         timedelta object.
00626         """
00627         if isinstance(time, datetime.timedelta):
00628             time = int(time.total_seconds())
00629         return self.execute_command('SETEX', name, time, value)
00630 
00631     def setnx(self, name, value):
00632         "Set the value of key ``name`` to ``value`` if key doesn't exist"
00633         return self.execute_command('SETNX', name, value)
00634 
00635     def setrange(self, name, offset, value):
00636         """
00637         Overwrite bytes in the value of ``name`` starting at ``offset`` with
00638         ``value``. If ``offset`` plus the length of ``value`` exceeds the
00639         length of the original value, the new value will be larger than before.
00640         If ``offset`` exceeds the length of the original value, null bytes
00641         will be used to pad between the end of the previous value and the start
00642         of what's being injected.
00643 
00644         Returns the length of the new string.
00645         """
00646         return self.execute_command('SETRANGE', name, offset, value)
00647 
00648     def strlen(self, name):
00649         "Return the number of bytes stored in the value of ``name``"
00650         return self.execute_command('STRLEN', name)
00651 
00652     def substr(self, name, start, end=-1):
00653         """
00654         Return a substring of the string at key ``name``. ``start`` and ``end``
00655         are 0-based integers specifying the portion of the string to return.
00656         """
00657         return self.execute_command('SUBSTR', name, start, end)
00658 
00659     def ttl(self, name):
00660         "Returns the number of seconds until the key ``name`` will expire"
00661         return self.execute_command('TTL', name)
00662 
00663     def type(self, name):
00664         "Returns the type of key ``name``"
00665         return self.execute_command('TYPE', name)
00666 
00667     def watch(self, *names):
00668         """
00669         Watches the values at keys ``names``, or None if the key doesn't exist
00670         """
00671         warnings.warn(DeprecationWarning('Call WATCH from a Pipeline object'))
00672 
00673     def unwatch(self):
00674         """
00675         Unwatches the value at key ``name``, or None of the key doesn't exist
00676         """
00677         warnings.warn(
00678             DeprecationWarning('Call UNWATCH from a Pipeline object'))
00679 
00680     #### LIST COMMANDS ####
00681     def blpop(self, keys, timeout=0):
00682         """
00683         LPOP a value off of the first non-empty list
00684         named in the ``keys`` list.
00685 
00686         If none of the lists in ``keys`` has a value to LPOP, then block
00687         for ``timeout`` seconds, or until a value gets pushed on to one
00688         of the lists.
00689 
00690         If timeout is 0, then block indefinitely.
00691         """
00692         if timeout is None:
00693             timeout = 0
00694         if isinstance(keys, basestring):
00695             keys = [keys]
00696         else:
00697             keys = list(keys)
00698         keys.append(timeout)
00699         return self.execute_command('BLPOP', *keys)
00700 
00701     def brpop(self, keys, timeout=0):
00702         """
00703         RPOP a value off of the first non-empty list
00704         named in the ``keys`` list.
00705 
00706         If none of the lists in ``keys`` has a value to LPOP, then block
00707         for ``timeout`` seconds, or until a value gets pushed on to one
00708         of the lists.
00709 
00710         If timeout is 0, then block indefinitely.
00711         """
00712         if timeout is None:
00713             timeout = 0
00714         if isinstance(keys, basestring):
00715             keys = [keys]
00716         else:
00717             keys = list(keys)
00718         keys.append(timeout)
00719         return self.execute_command('BRPOP', *keys)
00720 
00721     def brpoplpush(self, src, dst, timeout=0):
00722         """
00723         Pop a value off the tail of ``src``, push it on the head of ``dst``
00724         and then return it.
00725 
00726         This command blocks until a value is in ``src`` or until ``timeout``
00727         seconds elapse, whichever is first. A ``timeout`` value of 0 blocks
00728         forever.
00729         """
00730         if timeout is None:
00731             timeout = 0
00732         return self.execute_command('BRPOPLPUSH', src, dst, timeout)
00733 
00734     def lindex(self, name, index):
00735         """
00736         Return the item from list ``name`` at position ``index``
00737 
00738         Negative indexes are supported and will return an item at the
00739         end of the list
00740         """
00741         return self.execute_command('LINDEX', name, index)
00742 
00743     def linsert(self, name, where, refvalue, value):
00744         """
00745         Insert ``value`` in list ``name`` either immediately before or after
00746         [``where``] ``refvalue``
00747 
00748         Returns the new length of the list on success or -1 if ``refvalue``
00749         is not in the list.
00750         """
00751         return self.execute_command('LINSERT', name, where, refvalue, value)
00752 
00753     def llen(self, name):
00754         "Return the length of the list ``name``"
00755         return self.execute_command('LLEN', name)
00756 
00757     def lpop(self, name):
00758         "Remove and return the first item of the list ``name``"
00759         return self.execute_command('LPOP', name)
00760 
00761     def lpush(self, name, *values):
00762         "Push ``values`` onto the head of the list ``name``"
00763         return self.execute_command('LPUSH', name, *values)
00764 
00765     def lpushx(self, name, value):
00766         "Push ``value`` onto the head of the list ``name`` if ``name`` exists"
00767         return self.execute_command('LPUSHX', name, value)
00768 
00769     def lrange(self, name, start, end):
00770         """
00771         Return a slice of the list ``name`` between
00772         position ``start`` and ``end``
00773 
00774         ``start`` and ``end`` can be negative numbers just like
00775         Python slicing notation
00776         """
00777         return self.execute_command('LRANGE', name, start, end)
00778 
00779     def lrem(self, name, count, value):
00780         """
00781         Remove the first ``count`` occurrences of elements equal to ``value``
00782         from the list stored at ``name``.
00783 
00784         The count argument influences the operation in the following ways:
00785             count > 0: Remove elements equal to value moving from head to tail.
00786             count < 0: Remove elements equal to value moving from tail to head.
00787             count = 0: Remove all elements equal to value.
00788         """
00789         return self.execute_command('LREM', name, count, value)
00790 
00791     def lset(self, name, index, value):
00792         "Set ``position`` of list ``name`` to ``value``"
00793         return self.execute_command('LSET', name, index, value)
00794 
00795     def ltrim(self, name, start, end):
00796         """
00797         Trim the list ``name``, removing all values not within the slice
00798         between ``start`` and ``end``
00799 
00800         ``start`` and ``end`` can be negative numbers just like
00801         Python slicing notation
00802         """
00803         return self.execute_command('LTRIM', name, start, end)
00804 
00805     def rpop(self, name):
00806         "Remove and return the last item of the list ``name``"
00807         return self.execute_command('RPOP', name)
00808 
00809     def rpoplpush(self, src, dst):
00810         """
00811         RPOP a value off of the ``src`` list and atomically LPUSH it
00812         on to the ``dst`` list.  Returns the value.
00813         """
00814         return self.execute_command('RPOPLPUSH', src, dst)
00815 
00816     def rpush(self, name, *values):
00817         "Push ``values`` onto the tail of the list ``name``"
00818         return self.execute_command('RPUSH', name, *values)
00819 
00820     def rpushx(self, name, value):
00821         "Push ``value`` onto the tail of the list ``name`` if ``name`` exists"
00822         return self.execute_command('RPUSHX', name, value)
00823 
00824     def sort(self, name, start=None, num=None, by=None, get=None,
00825              desc=False, alpha=False, store=None):
00826         """
00827         Sort and return the list, set or sorted set at ``name``.
00828 
00829         ``start`` and ``num`` allow for paging through the sorted data
00830 
00831         ``by`` allows using an external key to weight and sort the items.
00832             Use an "*" to indicate where in the key the item value is located
00833 
00834         ``get`` allows for returning items from external keys rather than the
00835             sorted data itself.  Use an "*" to indicate where int he key
00836             the item value is located
00837 
00838         ``desc`` allows for reversing the sort
00839 
00840         ``alpha`` allows for sorting lexicographically rather than numerically
00841 
00842         ``store`` allows for storing the result of the sort into
00843             the key ``store``
00844         """
00845         if (start is not None and num is None) or \
00846                 (num is not None and start is None):
00847             raise RedisError("``start`` and ``num`` must both be specified")
00848 
00849         pieces = [name]
00850         if by is not None:
00851             pieces.append('BY')
00852             pieces.append(by)
00853         if start is not None and num is not None:
00854             pieces.append('LIMIT')
00855             pieces.append(start)
00856             pieces.append(num)
00857         if get is not None:
00858             # If get is a string assume we want to get a single value.
00859             # Otherwise assume it's an interable and we want to get multiple
00860             # values. We can't just iterate blindly because strings are
00861             # iterable.
00862             if isinstance(get, basestring):
00863                 pieces.append('GET')
00864                 pieces.append(get)
00865             else:
00866                 for g in get:
00867                     pieces.append('GET')
00868                     pieces.append(g)
00869         if desc:
00870             pieces.append('DESC')
00871         if alpha:
00872             pieces.append('ALPHA')
00873         if store is not None:
00874             pieces.append('STORE')
00875             pieces.append(store)
00876         return self.execute_command('SORT', *pieces)
00877 
00878     #### SET COMMANDS ####
00879     def sadd(self, name, *values):
00880         "Add ``value(s)`` to set ``name``"
00881         return self.execute_command('SADD', name, *values)
00882 
00883     def scard(self, name):
00884         "Return the number of elements in set ``name``"
00885         return self.execute_command('SCARD', name)
00886 
00887     def sdiff(self, keys, *args):
00888         "Return the difference of sets specified by ``keys``"
00889         args = list_or_args(keys, args)
00890         return self.execute_command('SDIFF', *args)
00891 
00892     def sdiffstore(self, dest, keys, *args):
00893         """
00894         Store the difference of sets specified by ``keys`` into a new
00895         set named ``dest``.  Returns the number of keys in the new set.
00896         """
00897         args = list_or_args(keys, args)
00898         return self.execute_command('SDIFFSTORE', dest, *args)
00899 
00900     def sinter(self, keys, *args):
00901         "Return the intersection of sets specified by ``keys``"
00902         args = list_or_args(keys, args)
00903         return self.execute_command('SINTER', *args)
00904 
00905     def sinterstore(self, dest, keys, *args):
00906         """
00907         Store the intersection of sets specified by ``keys`` into a new
00908         set named ``dest``.  Returns the number of keys in the new set.
00909         """
00910         args = list_or_args(keys, args)
00911         return self.execute_command('SINTERSTORE', dest, *args)
00912 
00913     def sismember(self, name, value):
00914         "Return a boolean indicating if ``value`` is a member of set ``name``"
00915         return self.execute_command('SISMEMBER', name, value)
00916 
00917     def smembers(self, name):
00918         "Return all members of the set ``name``"
00919         return self.execute_command('SMEMBERS', name)
00920 
00921     def smove(self, src, dst, value):
00922         "Move ``value`` from set ``src`` to set ``dst`` atomically"
00923         return self.execute_command('SMOVE', src, dst, value)
00924 
00925     def spop(self, name):
00926         "Remove and return a random member of set ``name``"
00927         return self.execute_command('SPOP', name)
00928 
00929     def srandmember(self, name):
00930         "Return a random member of set ``name``"
00931         return self.execute_command('SRANDMEMBER', name)
00932 
00933     def srem(self, name, *values):
00934         "Remove ``values`` from set ``name``"
00935         return self.execute_command('SREM', name, *values)
00936 
00937     def sunion(self, keys, *args):
00938         "Return the union of sets specifiued by ``keys``"
00939         args = list_or_args(keys, args)
00940         return self.execute_command('SUNION', *args)
00941 
00942     def sunionstore(self, dest, keys, *args):
00943         """
00944         Store the union of sets specified by ``keys`` into a new
00945         set named ``dest``.  Returns the number of keys in the new set.
00946         """
00947         args = list_or_args(keys, args)
00948         return self.execute_command('SUNIONSTORE', dest, *args)
00949 
00950     #### SORTED SET COMMANDS ####
00951     def zadd(self, name, *args, **kwargs):
00952         """
00953         Set any number of score, element-name pairs to the key ``name``. Pairs
00954         can be specified in two ways:
00955 
00956         As *args, in the form of: score1, name1, score2, name2, ...
00957         or as **kwargs, in the form of: name1=score1, name2=score2, ...
00958 
00959         The following example would add four values to the 'my-key' key:
00960         redis.zadd('my-key', 1.1, 'name1', 2.2, 'name2', name3=3.3, name4=4.4)
00961         """
00962         pieces = []
00963         if args:
00964             if len(args) % 2 != 0:
00965                 raise RedisError("ZADD requires an equal number of "
00966                                  "values and scores")
00967             pieces.extend(args)
00968         for pair in iteritems(kwargs):
00969             pieces.append(pair[1])
00970             pieces.append(pair[0])
00971         return self.execute_command('ZADD', name, *pieces)
00972 
00973     def zcard(self, name):
00974         "Return the number of elements in the sorted set ``name``"
00975         return self.execute_command('ZCARD', name)
00976 
00977     def zcount(self, name, min, max):
00978         return self.execute_command('ZCOUNT', name, min, max)
00979 
00980     def zincrby(self, name, value, amount=1):
00981         "Increment the score of ``value`` in sorted set ``name`` by ``amount``"
00982         return self.execute_command('ZINCRBY', name, amount, value)
00983 
00984     def zinterstore(self, dest, keys, aggregate=None):
00985         """
00986         Intersect multiple sorted sets specified by ``keys`` into
00987         a new sorted set, ``dest``. Scores in the destination will be
00988         aggregated based on the ``aggregate``, or SUM if none is provided.
00989         """
00990         return self._zaggregate('ZINTERSTORE', dest, keys, aggregate)
00991 
00992     def zrange(self, name, start, end, desc=False, withscores=False,
00993                score_cast_func=float):
00994         """
00995         Return a range of values from sorted set ``name`` between
00996         ``start`` and ``end`` sorted in ascending order.
00997 
00998         ``start`` and ``end`` can be negative, indicating the end of the range.
00999 
01000         ``desc`` a boolean indicating whether to sort the results descendingly
01001 
01002         ``withscores`` indicates to return the scores along with the values.
01003         The return type is a list of (value, score) pairs
01004 
01005         ``score_cast_func`` a callable used to cast the score return value
01006         """
01007         if desc:
01008             return self.zrevrange(name, start, end, withscores,
01009                                   score_cast_func)
01010         pieces = ['ZRANGE', name, start, end]
01011         if withscores:
01012             pieces.append('withscores')
01013         options = {
01014             'withscores': withscores, 'score_cast_func': score_cast_func}
01015         return self.execute_command(*pieces, **options)
01016 
01017     def zrangebyscore(self, name, min, max, start=None, num=None,
01018                       withscores=False, score_cast_func=float):
01019         """
01020         Return a range of values from the sorted set ``name`` with scores
01021         between ``min`` and ``max``.
01022 
01023         If ``start`` and ``num`` are specified, then return a slice
01024         of the range.
01025 
01026         ``withscores`` indicates to return the scores along with the values.
01027         The return type is a list of (value, score) pairs
01028 
01029         `score_cast_func`` a callable used to cast the score return value
01030         """
01031         if (start is not None and num is None) or \
01032                 (num is not None and start is None):
01033             raise RedisError("``start`` and ``num`` must both be specified")
01034         pieces = ['ZRANGEBYSCORE', name, min, max]
01035         if start is not None and num is not None:
01036             pieces.extend(['LIMIT', start, num])
01037         if withscores:
01038             pieces.append('withscores')
01039         options = {
01040             'withscores': withscores, 'score_cast_func': score_cast_func}
01041         return self.execute_command(*pieces, **options)
01042 
01043     def zrank(self, name, value):
01044         """
01045         Returns a 0-based value indicating the rank of ``value`` in sorted set
01046         ``name``
01047         """
01048         return self.execute_command('ZRANK', name, value)
01049 
01050     def zrem(self, name, *values):
01051         "Remove member ``values`` from sorted set ``name``"
01052         return self.execute_command('ZREM', name, *values)
01053 
01054     def zremrangebyrank(self, name, min, max):
01055         """
01056         Remove all elements in the sorted set ``name`` with ranks between
01057         ``min`` and ``max``. Values are 0-based, ordered from smallest score
01058         to largest. Values can be negative indicating the highest scores.
01059         Returns the number of elements removed
01060         """
01061         return self.execute_command('ZREMRANGEBYRANK', name, min, max)
01062 
01063     def zremrangebyscore(self, name, min, max):
01064         """
01065         Remove all elements in the sorted set ``name`` with scores
01066         between ``min`` and ``max``. Returns the number of elements removed.
01067         """
01068         return self.execute_command('ZREMRANGEBYSCORE', name, min, max)
01069 
01070     def zrevrange(self, name, start, num, withscores=False,
01071                   score_cast_func=float):
01072         """
01073         Return a range of values from sorted set ``name`` between
01074         ``start`` and ``num`` sorted in descending order.
01075 
01076         ``start`` and ``num`` can be negative, indicating the end of the range.
01077 
01078         ``withscores`` indicates to return the scores along with the values
01079         The return type is a list of (value, score) pairs
01080 
01081         ``score_cast_func`` a callable used to cast the score return value
01082         """
01083         pieces = ['ZREVRANGE', name, start, num]
01084         if withscores:
01085             pieces.append('withscores')
01086         options = {
01087             'withscores': withscores, 'score_cast_func': score_cast_func}
01088         return self.execute_command(*pieces, **options)
01089 
01090     def zrevrangebyscore(self, name, max, min, start=None, num=None,
01091                          withscores=False, score_cast_func=float):
01092         """
01093         Return a range of values from the sorted set ``name`` with scores
01094         between ``min`` and ``max`` in descending order.
01095 
01096         If ``start`` and ``num`` are specified, then return a slice
01097         of the range.
01098 
01099         ``withscores`` indicates to return the scores along with the values.
01100         The return type is a list of (value, score) pairs
01101 
01102         ``score_cast_func`` a callable used to cast the score return value
01103         """
01104         if (start is not None and num is None) or \
01105                 (num is not None and start is None):
01106             raise RedisError("``start`` and ``num`` must both be specified")
01107         pieces = ['ZREVRANGEBYSCORE', name, max, min]
01108         if start is not None and num is not None:
01109             pieces.extend(['LIMIT', start, num])
01110         if withscores:
01111             pieces.append('withscores')
01112         options = {
01113             'withscores': withscores, 'score_cast_func': score_cast_func}
01114         return self.execute_command(*pieces, **options)
01115 
01116     def zrevrank(self, name, value):
01117         """
01118         Returns a 0-based value indicating the descending rank of
01119         ``value`` in sorted set ``name``
01120         """
01121         return self.execute_command('ZREVRANK', name, value)
01122 
01123     def zscore(self, name, value):
01124         "Return the score of element ``value`` in sorted set ``name``"
01125         return self.execute_command('ZSCORE', name, value)
01126 
01127     def zunionstore(self, dest, keys, aggregate=None):
01128         """
01129         Union multiple sorted sets specified by ``keys`` into
01130         a new sorted set, ``dest``. Scores in the destination will be
01131         aggregated based on the ``aggregate``, or SUM if none is provided.
01132         """
01133         return self._zaggregate('ZUNIONSTORE', dest, keys, aggregate)
01134 
01135     def _zaggregate(self, command, dest, keys, aggregate=None):
01136         pieces = [command, dest, len(keys)]
01137         if isinstance(keys, dict):
01138             keys, weights = dictkeys(keys), dictvalues(keys)
01139         else:
01140             weights = None
01141         pieces.extend(keys)
01142         if weights:
01143             pieces.append('WEIGHTS')
01144             pieces.extend(weights)
01145         if aggregate:
01146             pieces.append('AGGREGATE')
01147             pieces.append(aggregate)
01148         return self.execute_command(*pieces)
01149 
01150     #### HASH COMMANDS ####
01151     def hdel(self, name, *keys):
01152         "Delete ``keys`` from hash ``name``"
01153         return self.execute_command('HDEL', name, *keys)
01154 
01155     def hexists(self, name, key):
01156         "Returns a boolean indicating if ``key`` exists within hash ``name``"
01157         return self.execute_command('HEXISTS', name, key)
01158 
01159     def hget(self, name, key):
01160         "Return the value of ``key`` within the hash ``name``"
01161         return self.execute_command('HGET', name, key)
01162 
01163     def hgetall(self, name):
01164         "Return a Python dict of the hash's name/value pairs"
01165         return self.execute_command('HGETALL', name)
01166 
01167     def hincrby(self, name, key, amount=1):
01168         "Increment the value of ``key`` in hash ``name`` by ``amount``"
01169         return self.execute_command('HINCRBY', name, key, amount)
01170 
01171     def hkeys(self, name):
01172         "Return the list of keys within hash ``name``"
01173         return self.execute_command('HKEYS', name)
01174 
01175     def hlen(self, name):
01176         "Return the number of elements in hash ``name``"
01177         return self.execute_command('HLEN', name)
01178 
01179     def hset(self, name, key, value):
01180         """
01181         Set ``key`` to ``value`` within hash ``name``
01182         Returns 1 if HSET created a new field, otherwise 0
01183         """
01184         return self.execute_command('HSET', name, key, value)
01185 
01186     def hsetnx(self, name, key, value):
01187         """
01188         Set ``key`` to ``value`` within hash ``name`` if ``key`` does not
01189         exist.  Returns 1 if HSETNX created a field, otherwise 0.
01190         """
01191         return self.execute_command("HSETNX", name, key, value)
01192 
01193     def hmset(self, name, mapping):
01194         """
01195         Sets each key in the ``mapping`` dict to its corresponding value
01196         in the hash ``name``
01197         """
01198         if not mapping:
01199             raise DataError("'hmset' with 'mapping' of length 0")
01200         items = []
01201         for pair in iteritems(mapping):
01202             items.extend(pair)
01203         return self.execute_command('HMSET', name, *items)
01204 
01205     def hmget(self, name, keys, *args):
01206         "Returns a list of values ordered identically to ``keys``"
01207         args = list_or_args(keys, args)
01208         return self.execute_command('HMGET', name, *args)
01209 
01210     def hvals(self, name):
01211         "Return the list of values within hash ``name``"
01212         return self.execute_command('HVALS', name)
01213 
01214     def publish(self, channel, message):
01215         """
01216         Publish ``message`` on ``channel``.
01217         Returns the number of subscribers the message was delivered to.
01218         """
01219         return self.execute_command('PUBLISH', channel, message)
01220 
01221     def eval(self, script, numkeys, *keys_and_args):
01222         """
01223         Execute the LUA ``script``, specifying the ``numkeys`` the script
01224         will touch and the key names and argument values in ``keys_and_args``.
01225         Returns the result of the script.
01226 
01227         In practice, use the object returned by ``register_script``. This
01228         function exists purely for Redis API completion.
01229         """
01230         return self.execute_command('EVAL', script, numkeys, *keys_and_args)
01231 
01232     def evalsha(self, sha, numkeys, *keys_and_args):
01233         """
01234         Use the ``sha`` to execute a LUA script already registered via EVAL
01235         or SCRIPT LOAD. Specify the ``numkeys`` the script will touch and the
01236         key names and argument values in ``keys_and_args``. Returns the result
01237         of the script.
01238 
01239         In practice, use the object returned by ``register_script``. This
01240         function exists purely for Redis API completion.
01241         """
01242         return self.execute_command('EVALSHA', sha, numkeys, *keys_and_args)
01243 
01244     def script_exists(self, *args):
01245         """
01246         Check if a script exists in the script cache by specifying the SHAs of
01247         each script as ``args``. Returns a list of boolean values indicating if
01248         if each already script exists in the cache.
01249         """
01250         options = {'parse': 'EXISTS'}
01251         return self.execute_command('SCRIPT', 'EXISTS', *args, **options)
01252 
01253     def script_flush(self):
01254         "Flush all scripts from the script cache"
01255         options = {'parse': 'FLUSH'}
01256         return self.execute_command('SCRIPT', 'FLUSH', **options)
01257 
01258     def script_kill(self):
01259         "Kill the currently executing LUA script"
01260         options = {'parse': 'KILL'}
01261         return self.execute_command('SCRIPT', 'KILL', **options)
01262 
01263     def script_load(self, script):
01264         "Load a LUA ``script`` into the script cache. Returns the SHA."
01265         options = {'parse': 'LOAD'}
01266         return self.execute_command('SCRIPT', 'LOAD', script, **options)
01267 
01268     def register_script(self, script):
01269         """
01270         Register a LUA ``script`` specifying the ``keys`` it will touch.
01271         Returns a Script object that is callable and hides the complexity of
01272         deal with scripts, keys, and shas. This is the preferred way to work
01273         with LUA scripts.
01274         """
01275         return Script(self, script)
01276 
01277 class Redis(StrictRedis):
01278     """
01279     Provides backwards compatibility with older versions of redis-py that
01280     changed arguments to some commands to be more Pythonic, sane, or by
01281     accident.
01282     """
01283     # Overridden callbacks
01284     RESPONSE_CALLBACKS = dict_merge(
01285         StrictRedis.RESPONSE_CALLBACKS,
01286         {
01287             'TTL': lambda r: r != -1 and r or None,
01288         }
01289     )
01290 
01291     def pipeline(self, transaction=True, shard_hint=None):
01292         """
01293         Return a new pipeline object that can queue multiple commands for
01294         later execution. ``transaction`` indicates whether all commands
01295         should be executed atomically. Apart from making a group of operations
01296         atomic, pipelines are useful for reducing the back-and-forth overhead
01297         between the client and server.
01298         """
01299         return Pipeline(
01300             self.connection_pool,
01301             self.response_callbacks,
01302             transaction,
01303             shard_hint)
01304 
01305     def setex(self, name, value, time):
01306         """
01307         Set the value of key ``name`` to ``value`` that expires in ``time``
01308         seconds. ``time`` can be represented by an integer or a Python
01309         timedelta object.
01310         """
01311         if isinstance(time, datetime.timedelta):
01312             time = int(time.total_seconds())
01313         return self.execute_command('SETEX', name, time, value)
01314 
01315     def lrem(self, name, value, num=0):
01316         """
01317         Remove the first ``num`` occurrences of elements equal to ``value``
01318         from the list stored at ``name``.
01319 
01320         The ``num`` argument influences the operation in the following ways:
01321             num > 0: Remove elements equal to value moving from head to tail.
01322             num < 0: Remove elements equal to value moving from tail to head.
01323             num = 0: Remove all elements equal to value.
01324         """
01325         return self.execute_command('LREM', name, num, value)
01326 
01327     def zadd(self, name, *args, **kwargs):
01328         """
01329         NOTE: The order of arguments differs from that of the official ZADD
01330         command. For backwards compatability, this method accepts arguments
01331         in the form of name1, score1, name2, score2, while the official Redis
01332         documents expects score1, name1, score2, name2.
01333 
01334         If you're looking to use the standard syntax, consider using the
01335         StrictRedis class. See the API Reference section of the docs for more
01336         information.
01337 
01338         Set any number of element-name, score pairs to the key ``name``. Pairs
01339         can be specified in two ways:
01340 
01341         As *args, in the form of: name1, score1, name2, score2, ...
01342         or as **kwargs, in the form of: name1=score1, name2=score2, ...
01343 
01344         The following example would add four values to the 'my-key' key:
01345         redis.zadd('my-key', 'name1', 1.1, 'name2', 2.2, name3=3.3, name4=4.4)
01346         """
01347         pieces = []
01348         if args:
01349             if len(args) % 2 != 0:
01350                 raise RedisError("ZADD requires an equal number of "
01351                                  "values and scores")
01352             pieces.extend(reversed(args))
01353         for pair in iteritems(kwargs):
01354             pieces.append(pair[1])
01355             pieces.append(pair[0])
01356         return self.execute_command('ZADD', name, *pieces)
01357 
01358 
01359 class PubSub(object):
01360     """
01361     PubSub provides publish, subscribe and listen support to Redis channels.
01362 
01363     After subscribing to one or more channels, the listen() method will block
01364     until a message arrives on one of the subscribed channels. That message
01365     will be returned and it's safe to start listening again.
01366     """
01367     def __init__(self, connection_pool, shard_hint=None):
01368         self.connection_pool = connection_pool
01369         self.shard_hint = shard_hint
01370         self.connection = None
01371         self.channels = set()
01372         self.patterns = set()
01373         self.subscription_count = 0
01374         self.subscribe_commands = set(
01375             ('subscribe', 'psubscribe', 'unsubscribe', 'punsubscribe')
01376         )
01377 
01378     def __del__(self):
01379         try:
01380             # if this object went out of scope prior to shutting down
01381             # subscriptions, close the connection manually before
01382             # returning it to the connection pool
01383             if self.connection and (self.channels or self.patterns):
01384                 self.connection.disconnect()
01385             self.reset()
01386         except:
01387             pass
01388 
01389     def reset(self):
01390         if self.connection:
01391             self.connection.disconnect()
01392             self.connection_pool.release(self.connection)
01393             self.connection = None
01394 
01395     def close(self):
01396         self.reset()
01397 
01398     def execute_command(self, *args, **kwargs):
01399         "Execute a publish/subscribe command"
01400 
01401         # NOTE: don't parse the response in this function. it could pull a
01402         # legitmate message off the stack if the connection is already
01403         # subscribed to one or more channels
01404 
01405         if self.connection is None:
01406             self.connection = self.connection_pool.get_connection(
01407                 'pubsub',
01408                 self.shard_hint
01409             )
01410         connection = self.connection
01411         try:
01412             connection.send_command(*args)
01413         except ConnectionError:
01414             connection.disconnect()
01415             # Connect manually here. If the Redis server is down, this will
01416             # fail and raise a ConnectionError as desired.
01417             connection.connect()
01418             # resubscribe to all channels and patterns before
01419             # resending the current command
01420             for channel in self.channels:
01421                 self.subscribe(channel)
01422             for pattern in self.patterns:
01423                 self.psubscribe(pattern)
01424             connection.send_command(*args)
01425 
01426     def parse_response(self):
01427         "Parse the response from a publish/subscribe command"
01428         response = self.connection.read_response()
01429         if nativestr(response[0]) in self.subscribe_commands:
01430             self.subscription_count = response[2]
01431             # if we've just unsubscribed from the remaining channels,
01432             # release the connection back to the pool
01433             if not self.subscription_count:
01434                 self.reset()
01435         return response
01436 
01437     def psubscribe(self, patterns):
01438         "Subscribe to all channels matching any pattern in ``patterns``"
01439         if isinstance(patterns, basestring):
01440             patterns = [patterns]
01441         for pattern in patterns:
01442             self.patterns.add(pattern)
01443         return self.execute_command('PSUBSCRIBE', *patterns)
01444 
01445     def punsubscribe(self, patterns=[]):
01446         """
01447         Unsubscribe from any channel matching any pattern in ``patterns``.
01448         If empty, unsubscribe from all channels.
01449         """
01450         if isinstance(patterns, basestring):
01451             patterns = [patterns]
01452         for pattern in patterns:
01453             try:
01454                 self.patterns.remove(pattern)
01455             except KeyError:
01456                 pass
01457         return self.execute_command('PUNSUBSCRIBE', *patterns)
01458 
01459     def subscribe(self, channels):
01460         "Subscribe to ``channels``, waiting for messages to be published"
01461         if isinstance(channels, basestring):
01462             channels = [channels]
01463         for channel in channels:
01464             self.channels.add(channel)
01465         return self.execute_command('SUBSCRIBE', *channels)
01466 
01467     def unsubscribe(self, channels=[]):
01468         """
01469         Unsubscribe from ``channels``. If empty, unsubscribe
01470         from all channels
01471         """
01472         if isinstance(channels, basestring):
01473             channels = [channels]
01474         for channel in channels:
01475             try:
01476                 self.channels.remove(channel)
01477             except KeyError:
01478                 pass
01479         return self.execute_command('UNSUBSCRIBE', *channels)
01480 
01481     def listen(self):
01482         "Listen for messages on channels this client has been subscribed to"
01483         while self.subscription_count or self.channels or self.patterns:
01484             if self.connection == None:
01485               break
01486           
01487             r = self.parse_response()
01488             msg_type = nativestr(r[0])
01489             if msg_type == 'pmessage':
01490                 msg = {
01491                     'type': msg_type,
01492                     'pattern': nativestr(r[1]),
01493                     'channel': nativestr(r[2]),
01494                     'data': r[3]
01495                 }
01496             else:
01497                 msg = {
01498                     'type': msg_type,
01499                     'pattern': None,
01500                     'channel': nativestr(r[1]),
01501                     'data': r[2]
01502                 }
01503             yield msg
01504 
01505 
01506 class BasePipeline(object):
01507     """
01508     Pipelines provide a way to transmit multiple commands to the Redis server
01509     in one transmission.  This is convenient for batch processing, such as
01510     saving all the values in a list to Redis.
01511 
01512     All commands executed within a pipeline are wrapped with MULTI and EXEC
01513     calls. This guarantees all commands executed in the pipeline will be
01514     executed atomically.
01515 
01516     Any command raising an exception does *not* halt the execution of
01517     subsequent commands in the pipeline. Instead, the exception is caught
01518     and its instance is placed into the response list returned by execute().
01519     Code iterating over the response list should be able to deal with an
01520     instance of an exception as a potential value. In general, these will be
01521     ResponseError exceptions, such as those raised when issuing a command
01522     on a key of a different datatype.
01523     """
01524 
01525     UNWATCH_COMMANDS = set(('DISCARD', 'EXEC', 'UNWATCH'))
01526 
01527     def __init__(self, connection_pool, response_callbacks, transaction,
01528                  shard_hint):
01529         self.connection_pool = connection_pool
01530         self.connection = None
01531         self.response_callbacks = response_callbacks
01532         self.transaction = transaction
01533         self.shard_hint = shard_hint
01534 
01535         self.watching = False
01536         self.reset()
01537 
01538     def __enter__(self):
01539         return self
01540 
01541     def __exit__(self, exc_type, exc_value, traceback):
01542         self.reset()
01543 
01544     def __del__(self):
01545         try:
01546             self.reset()
01547         except:
01548             pass
01549 
01550     def reset(self):
01551         self.command_stack = []
01552         self.scripts = set()
01553         # make sure to reset the connection state in the event that we were
01554         # watching something
01555         if self.watching and self.connection:
01556             try:
01557                 # call this manually since our unwatch or
01558                 # immediate_execute_command methods can call reset()
01559                 self.connection.send_command('UNWATCH')
01560                 self.connection.read_response()
01561             except ConnectionError:
01562                 # disconnect will also remove any previous WATCHes
01563                 self.connection.disconnect()
01564         # clean up the other instance attributes
01565         self.watching = False
01566         self.explicit_transaction = False
01567         # we can safely return the connection to the pool here since we're
01568         # sure we're no longer WATCHing anything
01569         if self.connection:
01570             self.connection_pool.release(self.connection)
01571             self.connection = None
01572 
01573     def multi(self):
01574         """
01575         Start a transactional block of the pipeline after WATCH commands
01576         are issued. End the transactional block with `execute`.
01577         """
01578         if self.explicit_transaction:
01579             raise RedisError('Cannot issue nested calls to MULTI')
01580         if self.command_stack:
01581             raise RedisError('Commands without an initial WATCH have already '
01582                              'been issued')
01583         self.explicit_transaction = True
01584 
01585     def execute_command(self, *args, **kwargs):
01586         if (self.watching or args[0] == 'WATCH') and \
01587                 not self.explicit_transaction:
01588             return self.immediate_execute_command(*args, **kwargs)
01589         return self.pipeline_execute_command(*args, **kwargs)
01590 
01591     def immediate_execute_command(self, *args, **options):
01592         """
01593         Execute a command immediately, but don't auto-retry on a
01594         ConnectionError if we're already WATCHing a variable. Used when
01595         issuing WATCH or subsequent commands retrieving their values but before
01596         MULTI is called.
01597         """
01598         command_name = args[0]
01599         conn = self.connection
01600         # if this is the first call, we need a connection
01601         if not conn:
01602             conn = self.connection_pool.get_connection(command_name,
01603                                                        self.shard_hint)
01604             self.connection = conn
01605         try:
01606             conn.send_command(*args)
01607             return self.parse_response(conn, command_name, **options)
01608         except ConnectionError:
01609             conn.disconnect()
01610             # if we're not already watching, we can safely retry the command
01611             # assuming it was a connection timeout
01612             if not self.watching:
01613                 conn.send_command(*args)
01614                 return self.parse_response(conn, command_name, **options)
01615             self.reset()
01616             raise
01617 
01618     def pipeline_execute_command(self, *args, **options):
01619         """
01620         Stage a command to be executed when execute() is next called
01621 
01622         Returns the current Pipeline object back so commands can be
01623         chained together, such as:
01624 
01625         pipe = pipe.set('foo', 'bar').incr('baz').decr('bang')
01626 
01627         At some other point, you can then run: pipe.execute(),
01628         which will execute all commands queued in the pipe.
01629         """
01630         self.command_stack.append((args, options))
01631         return self
01632 
01633     def _execute_transaction(self, connection, commands):
01634         all_cmds = SYM_EMPTY.join(
01635             starmap(connection.pack_command,
01636                     [args for args, options in commands]))
01637         connection.send_packed_command(all_cmds)
01638         # we don't care about the multi/exec any longer
01639         commands = commands[1:-1]
01640         # parse off the response for MULTI and all commands prior to EXEC.
01641         # the only data we care about is the response the EXEC
01642         # which is the last command
01643         for i in range(len(commands) + 1):
01644             self.parse_response(connection, '_')
01645         # parse the EXEC.
01646         response = self.parse_response(connection, '_')
01647 
01648         if response is None:
01649             raise WatchError("Watched variable changed.")
01650 
01651         if len(response) != len(commands):
01652             raise ResponseError("Wrong number of response items from "
01653                                 "pipeline execution")
01654         # We have to run response callbacks manually
01655         data = []
01656         for r, cmd in izip(response, commands):
01657             if not isinstance(r, Exception):
01658                 args, options = cmd
01659                 command_name = args[0]
01660                 if command_name in self.response_callbacks:
01661                     r = self.response_callbacks[command_name](r, **options)
01662             data.append(r)
01663         return data
01664 
01665     def _execute_pipeline(self, connection, commands):
01666         # build up all commands into a single request to increase network perf
01667         all_cmds = SYM_EMPTY.join(
01668             starmap(connection.pack_command,
01669                     [args for args, options in commands]))
01670         connection.send_packed_command(all_cmds)
01671         return [self.parse_response(connection, args[0], **options)
01672                 for args, options in commands]
01673 
01674     def parse_response(self, connection, command_name, **options):
01675         result = StrictRedis.parse_response(
01676             self, connection, command_name, **options)
01677         if command_name in self.UNWATCH_COMMANDS:
01678             self.watching = False
01679         elif command_name == 'WATCH':
01680             self.watching = True
01681         return result
01682 
01683     def load_scripts(self):
01684         # make sure all scripts that are about to be run on this pipeline exist
01685         scripts = list(self.scripts)
01686         immediate = self.immediate_execute_command
01687         shas = [s.sha for s in scripts]
01688         exists = immediate('SCRIPT', 'EXISTS', *shas, **{'parse': 'EXISTS'})
01689         if not all(exists):
01690             for s, exist in izip(scripts, exists):
01691                 if not exist:
01692                     immediate('SCRIPT', 'LOAD', s.script, **{'parse': 'LOAD'})
01693 
01694     def execute(self):
01695         "Execute all the commands in the current pipeline"
01696         if self.scripts:
01697             self.load_scripts()
01698         stack = self.command_stack
01699         if self.transaction or self.explicit_transaction:
01700             stack = [(('MULTI', ), {})] + stack + [(('EXEC', ), {})]
01701             execute = self._execute_transaction
01702         else:
01703             execute = self._execute_pipeline
01704 
01705         conn = self.connection
01706         if not conn:
01707             conn = self.connection_pool.get_connection('MULTI',
01708                                                        self.shard_hint)
01709             # assign to self.connection so reset() releases the connection
01710             # back to the pool after we're done
01711             self.connection = conn
01712 
01713         try:
01714             return execute(conn, stack)
01715         except ConnectionError:
01716             conn.disconnect()
01717             # if we were watching a variable, the watch is no longer valid
01718             # since this connection has died. raise a WatchError, which
01719             # indicates the user should retry his transaction. If this is more
01720             # than a temporary failure, the WATCH that the user next issue
01721             # will fail, propegating the real ConnectionError
01722             if self.watching:
01723                 raise WatchError("A ConnectionError occured on while watching "
01724                                  "one or more keys")
01725             # otherwise, it's safe to retry since the transaction isn't
01726             # predicated on any state
01727             return execute(conn, stack)
01728         finally:
01729             self.reset()
01730 
01731     def watch(self, *names):
01732         "Watches the values at keys ``names``"
01733         if self.explicit_transaction:
01734             raise RedisError('Cannot issue a WATCH after a MULTI')
01735         return self.execute_command('WATCH', *names)
01736 
01737     def unwatch(self):
01738         "Unwatches all previously specified keys"
01739         return self.watching and self.execute_command('UNWATCH') or True
01740 
01741     def script_load_for_pipeline(self, script):
01742         "Make sure scripts are loaded prior to pipeline execution"
01743         self.scripts.add(script)
01744 
01745 
01746 class StrictPipeline(BasePipeline, StrictRedis):
01747     "Pipeline for the StrictRedis class"
01748     pass
01749 
01750 
01751 class Pipeline(BasePipeline, Redis):
01752     "Pipeline for the Redis class"
01753     pass
01754 
01755 
01756 class Script(object):
01757     "An executable LUA script object returned by ``register_script``"
01758 
01759     def __init__(self, registered_client, script):
01760         self.registered_client = registered_client
01761         self.script = script
01762         self.sha = registered_client.script_load(script)
01763 
01764     def __call__(self, keys=[], args=[], client=None):
01765         "Execute the script, passing any required ``args``"
01766         client = client or self.registered_client
01767         args = tuple(keys) + tuple(args)
01768         # make sure the Redis server knows about the script
01769         if isinstance(client, BasePipeline):
01770             # make sure this script is good to go on pipeline
01771             client.script_load_for_pipeline(self)
01772         try:
01773             return client.evalsha(self.sha, len(keys), *args)
01774         except NoScriptError:
01775             # Maybe the client is pointed to a differnet server than the client
01776             # that created this instance?
01777             self.sha = client.script_load(self.script)
01778             return client.evalsha(self.sha, len(keys), *args)
01779 
01780 
01781 class LockError(RedisError):
01782     "Errors thrown from the Lock"
01783     pass
01784 
01785 
01786 class Lock(object):
01787     """
01788     A shared, distributed Lock. Using Redis for locking allows the Lock
01789     to be shared across processes and/or machines.
01790 
01791     It's left to the user to resolve deadlock issues and make sure
01792     multiple clients play nicely together.
01793     """
01794 
01795     LOCK_FOREVER = float(2 ** 31 + 1)  # 1 past max unix time
01796 
01797     def __init__(self, redis, name, timeout=None, sleep=0.1):
01798         """
01799         Create a new Lock instnace named ``name`` using the Redis client
01800         supplied by ``redis``.
01801 
01802         ``timeout`` indicates a maximum life for the lock.
01803         By default, it will remain locked until release() is called.
01804 
01805         ``sleep`` indicates the amount of time to sleep per loop iteration
01806         when the lock is in blocking mode and another client is currently
01807         holding the lock.
01808 
01809         Note: If using ``timeout``, you should make sure all the hosts
01810         that are running clients have their time synchronized with a network
01811         time service like ntp.
01812         """
01813         self.redis = redis
01814         self.name = name
01815         self.acquired_until = None
01816         self.timeout = timeout
01817         self.sleep = sleep
01818         if self.timeout and self.sleep > self.timeout:
01819             raise LockError("'sleep' must be less than 'timeout'")
01820 
01821     def __enter__(self):
01822         return self.acquire()
01823 
01824     def __exit__(self, exc_type, exc_value, traceback):
01825         self.release()
01826 
01827     def acquire(self, blocking=True):
01828         """
01829         Use Redis to hold a shared, distributed lock named ``name``.
01830         Returns True once the lock is acquired.
01831 
01832         If ``blocking`` is False, always return immediately. If the lock
01833         was acquired, return True, otherwise return False.
01834         """
01835         sleep = self.sleep
01836         timeout = self.timeout
01837         while 1:
01838             unixtime = int(mod_time.time())
01839             if timeout:
01840                 timeout_at = unixtime + timeout
01841             else:
01842                 timeout_at = Lock.LOCK_FOREVER
01843             timeout_at = float(timeout_at)
01844             if self.redis.setnx(self.name, timeout_at):
01845                 self.acquired_until = timeout_at
01846                 return True
01847             # We want blocking, but didn't acquire the lock
01848             # check to see if the current lock is expired
01849             existing = float(self.redis.get(self.name) or 1)
01850             if existing < unixtime:
01851                 # the previous lock is expired, attempt to overwrite it
01852                 existing = float(self.redis.getset(self.name, timeout_at) or 1)
01853                 if existing < unixtime:
01854                     # we successfully acquired the lock
01855                     self.acquired_until = timeout_at
01856                     return True
01857             if not blocking:
01858                 return False
01859             mod_time.sleep(sleep)
01860 
01861     def release(self):
01862         "Releases the already acquired lock"
01863         if self.acquired_until is None:
01864             raise ValueError("Cannot release an unlocked lock")
01865         existing = float(self.redis.get(self.name) or 1)
01866         # if the lock time is in the future, delete the lock
01867         if existing >= self.acquired_until:
01868             self.redis.delete(self.name)
01869         self.acquired_until = None
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Properties Friends


redis
Author(s): Daniel Stonier
autogenerated on Tue Jan 15 2013 17:43:13