00001 from __future__ import with_statement
00002 from itertools import starmap
00003 import datetime
00004 import warnings
00005 import time as mod_time
00006 from redis._compat import (b, izip, imap, iteritems, dictkeys, dictvalues,
00007 basestring, long, nativestr, urlparse)
00008 from redis.connection import ConnectionPool, UnixDomainSocketConnection
00009 from redis.exceptions import (
00010 ConnectionError,
00011 DataError,
00012 RedisError,
00013 ResponseError,
00014 WatchError,
00015 NoScriptError
00016 )
00017
00018 SYM_EMPTY = b('')
00019
00020
00021 def list_or_args(keys, args):
00022
00023 try:
00024 iter(keys)
00025
00026
00027 if isinstance(keys, basestring):
00028 keys = [keys]
00029 except TypeError:
00030 keys = [keys]
00031 if args:
00032 keys.extend(args)
00033 return keys
00034
00035
00036 def timestamp_to_datetime(response):
00037 "Converts a unix timestamp to a Python datetime object"
00038 if not response:
00039 return None
00040 try:
00041 response = int(response)
00042 except ValueError:
00043 return None
00044 return datetime.datetime.fromtimestamp(response)
00045
00046
00047 def string_keys_to_dict(key_string, callback):
00048 return dict.fromkeys(key_string.split(), callback)
00049
00050
00051 def dict_merge(*dicts):
00052 merged = {}
00053 [merged.update(d) for d in dicts]
00054 return merged
00055
00056
00057 def parse_debug_object(response):
00058 "Parse the results of Redis's DEBUG OBJECT command into a Python dict"
00059
00060
00061 response = nativestr(response)
00062 response = 'type:' + response
00063 response = dict([kv.split(':') for kv in response.split()])
00064
00065
00066
00067 int_fields = ('refcount', 'serializedlength', 'lru', 'lru_seconds_idle')
00068 for field in int_fields:
00069 if field in response:
00070 response[field] = int(response[field])
00071
00072 return response
00073
00074
00075 def parse_object(response, infotype):
00076 "Parse the results of an OBJECT command"
00077 if infotype in ('idletime', 'refcount'):
00078 return int(response)
00079 return response
00080
00081
00082 def parse_info(response):
00083 "Parse the result of Redis's INFO command into a Python dict"
00084 info = {}
00085 response = nativestr(response)
00086
00087 def get_value(value):
00088 if ',' not in value or '=' not in value:
00089 return value
00090
00091 sub_dict = {}
00092 for item in value.split(','):
00093 k, v = item.rsplit('=', 1)
00094 try:
00095 sub_dict[k] = int(v)
00096 except ValueError:
00097 sub_dict[k] = v
00098 return sub_dict
00099
00100 for line in response.splitlines():
00101 if line and not line.startswith('#'):
00102 key, value = line.split(':')
00103 try:
00104 if '.' in value:
00105 info[key] = float(value)
00106 else:
00107 info[key] = int(value)
00108 except ValueError:
00109 info[key] = get_value(value)
00110 return info
00111
00112
00113 def pairs_to_dict(response):
00114 "Create a dict given a list of key/value pairs"
00115 it = iter(response)
00116 return dict(izip(it, it))
00117
00118
00119 def zset_score_pairs(response, **options):
00120 """
00121 If ``withscores`` is specified in the options, return the response as
00122 a list of (value, score) pairs
00123 """
00124 if not response or not options['withscores']:
00125 return response
00126 score_cast_func = options.get('score_cast_func', float)
00127 it = iter(response)
00128 return list(izip(it, imap(score_cast_func, it)))
00129
00130
00131 def int_or_none(response):
00132 if response is None:
00133 return None
00134 return int(response)
00135
00136
00137 def float_or_none(response):
00138 if response is None:
00139 return None
00140 return float(response)
00141
00142
00143 def parse_config(response, **options):
00144 if options['parse'] == 'GET':
00145 response = [nativestr(i) if i is not None else None for i in response]
00146 return response and pairs_to_dict(response) or {}
00147 return nativestr(response) == 'OK'
00148
00149
00150 def parse_script(response, **options):
00151 parse = options['parse']
00152 if parse in ('FLUSH', 'KILL'):
00153 return response == 'OK'
00154 if parse == 'EXISTS':
00155 return list(imap(bool, response))
00156 return response
00157
00158
00159 class StrictRedis(object):
00160 """
00161 Implementation of the Redis protocol.
00162
00163 This abstract class provides a Python interface to all Redis commands
00164 and an implementation of the Redis protocol.
00165
00166 Connection and Pipeline derive from this, implementing how
00167 the commands are sent and received to the Redis server
00168 """
00169 RESPONSE_CALLBACKS = dict_merge(
00170 string_keys_to_dict(
00171 'AUTH DEL EXISTS EXPIRE EXPIREAT HDEL HEXISTS HMSET MOVE MSETNX '
00172 'PERSIST RENAMENX SISMEMBER SMOVE SETEX SETNX SREM ZREM',
00173 bool
00174 ),
00175 string_keys_to_dict(
00176 'BITCOUNT DECRBY GETBIT HLEN INCRBY LINSERT LLEN LPUSHX RPUSHX '
00177 'SADD SCARD SDIFFSTORE SETBIT SETRANGE SINTERSTORE STRLEN '
00178 'SUNIONSTORE ZADD ZCARD ZREMRANGEBYRANK ZREMRANGEBYSCORE',
00179 int
00180 ),
00181 string_keys_to_dict(
00182
00183 'LPUSH RPUSH',
00184 lambda r: isinstance(r, long) and r or nativestr(r) == 'OK'
00185 ),
00186 string_keys_to_dict('ZSCORE ZINCRBY', float_or_none),
00187 string_keys_to_dict(
00188 'FLUSHALL FLUSHDB LSET LTRIM MSET RENAME '
00189 'SAVE SELECT SET SHUTDOWN SLAVEOF WATCH UNWATCH',
00190 lambda r: nativestr(r) == 'OK'
00191 ),
00192 string_keys_to_dict('BLPOP BRPOP', lambda r: r and tuple(r) or None),
00193 string_keys_to_dict(
00194 'SDIFF SINTER SMEMBERS SUNION',
00195 lambda r: r and set(r) or set()
00196 ),
00197 string_keys_to_dict(
00198 'ZRANGE ZRANGEBYSCORE ZREVRANGE ZREVRANGEBYSCORE',
00199 zset_score_pairs
00200 ),
00201 string_keys_to_dict('ZRANK ZREVRANK', int_or_none),
00202 {
00203 'BGREWRITEAOF': (
00204 lambda r: r == 'Background rewriting of AOF file started'
00205 ),
00206 'BGSAVE': lambda r: r == 'Background saving started',
00207 'BRPOPLPUSH': lambda r: r and r or None,
00208 'CONFIG': parse_config,
00209 'DEBUG': parse_debug_object,
00210 'HGETALL': lambda r: r and pairs_to_dict(r) or {},
00211 'INFO': parse_info,
00212 'LASTSAVE': timestamp_to_datetime,
00213 'OBJECT': parse_object,
00214 'PING': lambda r: nativestr(r) == 'PONG',
00215 'RANDOMKEY': lambda r: r and r or None,
00216 'SCRIPT': parse_script,
00217 'TIME': lambda x: (int(x[0]), int(x[1]))
00218 }
00219 )
00220
00221 @classmethod
00222 def from_url(cls, url, db=None, **kwargs):
00223 """
00224 Return a Redis client object configured from the given URL.
00225
00226 For example::
00227
00228 redis://username:password@localhost:6379/0
00229
00230 If ``db`` is None, this method will attempt to extract the database ID
00231 from the URL path component.
00232
00233 Any additional keyword arguments will be passed along to the Redis
00234 class's initializer.
00235 """
00236 url = urlparse(url)
00237
00238
00239 assert url.scheme == 'redis' or not url.scheme
00240
00241
00242 if db is None:
00243 try:
00244 db = int(url.path.replace('/', ''))
00245 except (AttributeError, ValueError):
00246 db = 0
00247
00248 return cls(host=url.hostname, port=url.port, db=db,
00249 password=url.password, **kwargs)
00250
00251 def __init__(self, host='localhost', port=6379,
00252 db=0, password=None, socket_timeout=None,
00253 connection_pool=None, charset='utf-8',
00254 errors='strict', decode_responses=False,
00255 unix_socket_path=None):
00256 if not connection_pool:
00257 kwargs = {
00258 'db': db,
00259 'password': password,
00260 'socket_timeout': socket_timeout,
00261 'encoding': charset,
00262 'encoding_errors': errors,
00263 'decode_responses': decode_responses,
00264 }
00265
00266 if unix_socket_path:
00267 kwargs.update({
00268 'path': unix_socket_path,
00269 'connection_class': UnixDomainSocketConnection
00270 })
00271 else:
00272 kwargs.update({
00273 'host': host,
00274 'port': port
00275 })
00276 connection_pool = ConnectionPool(**kwargs)
00277 self.connection_pool = connection_pool
00278
00279 self.response_callbacks = self.__class__.RESPONSE_CALLBACKS.copy()
00280
00281 def set_response_callback(self, command, callback):
00282 "Set a custom Response Callback"
00283 self.response_callbacks[command] = callback
00284
00285 def pipeline(self, transaction=True, shard_hint=None):
00286 """
00287 Return a new pipeline object that can queue multiple commands for
00288 later execution. ``transaction`` indicates whether all commands
00289 should be executed atomically. Apart from making a group of operations
00290 atomic, pipelines are useful for reducing the back-and-forth overhead
00291 between the client and server.
00292 """
00293 return StrictPipeline(
00294 self.connection_pool,
00295 self.response_callbacks,
00296 transaction,
00297 shard_hint)
00298
00299 def transaction(self, func, *watches, **kwargs):
00300 """
00301 Convenience method for executing the callable `func` as a transaction
00302 while watching all keys specified in `watches`. The 'func' callable
00303 should expect a single arguement which is a Pipeline object.
00304 """
00305 shard_hint = kwargs.pop('shard_hint', None)
00306 with self.pipeline(True, shard_hint) as pipe:
00307 while 1:
00308 try:
00309 if watches:
00310 pipe.watch(*watches)
00311 func(pipe)
00312 return pipe.execute()
00313 except WatchError:
00314 continue
00315
00316 def lock(self, name, timeout=None, sleep=0.1):
00317 """
00318 Return a new Lock object using key ``name`` that mimics
00319 the behavior of threading.Lock.
00320
00321 If specified, ``timeout`` indicates a maximum life for the lock.
00322 By default, it will remain locked until release() is called.
00323
00324 ``sleep`` indicates the amount of time to sleep per loop iteration
00325 when the lock is in blocking mode and another client is currently
00326 holding the lock.
00327 """
00328 return Lock(self, name, timeout=timeout, sleep=sleep)
00329
00330 def pubsub(self, shard_hint=None):
00331 """
00332 Return a Publish/Subscribe object. With this object, you can
00333 subscribe to channels and listen for messages that get published to
00334 them.
00335 """
00336 return PubSub(self.connection_pool, shard_hint)
00337
00338
00339 def execute_command(self, *args, **options):
00340 "Execute a command and return a parsed response"
00341 pool = self.connection_pool
00342 command_name = args[0]
00343 connection = pool.get_connection(command_name, **options)
00344 try:
00345 connection.send_command(*args)
00346 return self.parse_response(connection, command_name, **options)
00347 except ConnectionError:
00348 connection.disconnect()
00349 connection.send_command(*args)
00350 return self.parse_response(connection, command_name, **options)
00351 finally:
00352 pool.release(connection)
00353
00354 def parse_response(self, connection, command_name, **options):
00355 "Parses a response from the Redis server"
00356 response = connection.read_response()
00357 if command_name in self.response_callbacks:
00358 return self.response_callbacks[command_name](response, **options)
00359 return response
00360
00361
00362 def bgrewriteaof(self):
00363 "Tell the Redis server to rewrite the AOF file from data in memory."
00364 return self.execute_command('BGREWRITEAOF')
00365
00366 def bgsave(self):
00367 """
00368 Tell the Redis server to save its data to disk. Unlike save(),
00369 this method is asynchronous and returns immediately.
00370 """
00371 return self.execute_command('BGSAVE')
00372
00373 def config_get(self, pattern="*"):
00374 "Return a dictionary of configuration based on the ``pattern``"
00375 return self.execute_command('CONFIG', 'GET', pattern, parse='GET')
00376
00377 def config_set(self, name, value):
00378 "Set config item ``name`` with ``value``"
00379 return self.execute_command('CONFIG', 'SET', name, value, parse='SET')
00380
00381 def dbsize(self):
00382 "Returns the number of keys in the current database"
00383 return self.execute_command('DBSIZE')
00384
00385 def time(self):
00386 """
00387 Returns the server time as a 2-item tuple of ints:
00388 (seconds since epoch, microseconds into this second).
00389 """
00390 return self.execute_command('TIME')
00391
00392 def debug_object(self, key):
00393 "Returns version specific metainformation about a give key"
00394 return self.execute_command('DEBUG', 'OBJECT', key)
00395
00396 def delete(self, *names):
00397 "Delete one or more keys specified by ``names``"
00398 return self.execute_command('DEL', *names)
00399 __delitem__ = delete
00400
00401 def echo(self, value):
00402 "Echo the string back from the server"
00403 return self.execute_command('ECHO', value)
00404
00405 def flushall(self):
00406 "Delete all keys in all databases on the current host"
00407 return self.execute_command('FLUSHALL')
00408
00409 def flushdb(self):
00410 "Delete all keys in the current database"
00411 return self.execute_command('FLUSHDB')
00412
00413 def info(self):
00414 "Returns a dictionary containing information about the Redis server"
00415 return self.execute_command('INFO')
00416
00417 def lastsave(self):
00418 """
00419 Return a Python datetime object representing the last time the
00420 Redis database was saved to disk
00421 """
00422 return self.execute_command('LASTSAVE')
00423
00424 def object(self, infotype, key):
00425 "Return the encoding, idletime, or refcount about the key"
00426 return self.execute_command('OBJECT', infotype, key, infotype=infotype)
00427
00428 def ping(self):
00429 "Ping the Redis server"
00430 return self.execute_command('PING')
00431
00432 def save(self):
00433 """
00434 Tell the Redis server to save its data to disk,
00435 blocking until the save is complete
00436 """
00437 return self.execute_command('SAVE')
00438
00439 def shutdown(self):
00440 "Shutdown the server"
00441 try:
00442 self.execute_command('SHUTDOWN')
00443 except ConnectionError:
00444
00445 return
00446 raise RedisError("SHUTDOWN seems to have failed.")
00447
00448 def slaveof(self, host=None, port=None):
00449 """
00450 Set the server to be a replicated slave of the instance identified
00451 by the ``host`` and ``port``. If called without arguements, the
00452 instance is promoted to a master instead.
00453 """
00454 if host is None and port is None:
00455 return self.execute_command("SLAVEOF", "NO", "ONE")
00456 return self.execute_command("SLAVEOF", host, port)
00457
00458
00459 def append(self, key, value):
00460 """
00461 Appends the string ``value`` to the value at ``key``. If ``key``
00462 doesn't already exist, create it with a value of ``value``.
00463 Returns the new length of the value at ``key``.
00464 """
00465 return self.execute_command('APPEND', key, value)
00466
00467 def getrange(self, key, start, end):
00468 """
00469 Returns the substring of the string value stored at ``key``,
00470 determined by the offsets ``start`` and ``end`` (both are inclusive)
00471 """
00472 return self.execute_command('GETRANGE', key, start, end)
00473
00474 def bitcount(self, key, start=None, end=None):
00475 """
00476 Returns the count of set bits in the value of ``key``. Optional
00477 ``start`` and ``end`` paramaters indicate which bytes to consider
00478 """
00479 params = [key]
00480 if start and end:
00481 params.append(start)
00482 params.append(end)
00483 elif (start and not end) or (end and not start):
00484 raise RedisError("Both start and end must be specified")
00485 return self.execute_command('BITCOUNT', *params)
00486
00487 def bitop(self, operation, dest, *keys):
00488 """
00489 Perform a bitwise operation using ``operation`` between ``keys`` and
00490 store the result in ``dest``.
00491 """
00492 return self.execute_command('BITOP', operation, dest, *keys)
00493
00494 def decr(self, name, amount=1):
00495 """
00496 Decrements the value of ``key`` by ``amount``. If no key exists,
00497 the value will be initialized as 0 - ``amount``
00498 """
00499 return self.execute_command('DECRBY', name, amount)
00500
00501 def exists(self, name):
00502 "Returns a boolean indicating whether key ``name`` exists"
00503 return self.execute_command('EXISTS', name)
00504 __contains__ = exists
00505
00506 def expire(self, name, time):
00507 """
00508 Set an expire flag on key ``name`` for ``time`` seconds. ``time``
00509 can be represented by an integer or a Python timedelta object.
00510 """
00511 if isinstance(time, datetime.timedelta):
00512 time = int(time.total_seconds())
00513 return self.execute_command('EXPIRE', name, time)
00514
00515 def expireat(self, name, when):
00516 """
00517 Set an expire flag on key ``name``. ``when`` can be represented
00518 as an integer indicating unix time or a Python datetime object.
00519 """
00520 if isinstance(when, datetime.datetime):
00521 when = int(mod_time.mktime(when.timetuple()))
00522 return self.execute_command('EXPIREAT', name, when)
00523
00524 def get(self, name):
00525 """
00526 Return the value at key ``name``, or None if the key doesn't exist
00527 """
00528 return self.execute_command('GET', name)
00529
00530 def __getitem__(self, name):
00531 """
00532 Return the value at key ``name``, raises a KeyError if the key
00533 doesn't exist.
00534 """
00535 value = self.get(name)
00536 if value:
00537 return value
00538 raise KeyError(name)
00539
00540 def getbit(self, name, offset):
00541 "Returns a boolean indicating the value of ``offset`` in ``name``"
00542 return self.execute_command('GETBIT', name, offset)
00543
00544 def getset(self, name, value):
00545 """
00546 Set the value at key ``name`` to ``value`` if key doesn't exist
00547 Return the value at key ``name`` atomically
00548 """
00549 return self.execute_command('GETSET', name, value)
00550
00551 def incr(self, name, amount=1):
00552 """
00553 Increments the value of ``key`` by ``amount``. If no key exists,
00554 the value will be initialized as ``amount``
00555 """
00556 return self.execute_command('INCRBY', name, amount)
00557
00558 def keys(self, pattern='*'):
00559 "Returns a list of keys matching ``pattern``"
00560 return self.execute_command('KEYS', pattern)
00561
00562 def mget(self, keys, *args):
00563 """
00564 Returns a list of values ordered identically to ``keys``
00565 """
00566 args = list_or_args(keys, args)
00567 return self.execute_command('MGET', *args)
00568
00569 def mset(self, mapping):
00570 "Sets each key in the ``mapping`` dict to its corresponding value"
00571 items = []
00572 for pair in iteritems(mapping):
00573 items.extend(pair)
00574 return self.execute_command('MSET', *items)
00575
00576 def msetnx(self, mapping):
00577 """
00578 Sets each key in the ``mapping`` dict to its corresponding value if
00579 none of the keys are already set
00580 """
00581 items = []
00582 for pair in iteritems(mapping):
00583 items.extend(pair)
00584 return self.execute_command('MSETNX', *items)
00585
00586 def move(self, name, db):
00587 "Moves the key ``name`` to a different Redis database ``db``"
00588 return self.execute_command('MOVE', name, db)
00589
00590 def persist(self, name):
00591 "Removes an expiration on ``name``"
00592 return self.execute_command('PERSIST', name)
00593
00594 def randomkey(self):
00595 "Returns the name of a random key"
00596 return self.execute_command('RANDOMKEY')
00597
00598 def rename(self, src, dst):
00599 """
00600 Rename key ``src`` to ``dst``
00601 """
00602 return self.execute_command('RENAME', src, dst)
00603
00604 def renamenx(self, src, dst):
00605 "Rename key ``src`` to ``dst`` if ``dst`` doesn't already exist"
00606 return self.execute_command('RENAMENX', src, dst)
00607
00608 def set(self, name, value):
00609 "Set the value at key ``name`` to ``value``"
00610 return self.execute_command('SET', name, value)
00611 __setitem__ = set
00612
00613 def setbit(self, name, offset, value):
00614 """
00615 Flag the ``offset`` in ``name`` as ``value``. Returns a boolean
00616 indicating the previous value of ``offset``.
00617 """
00618 value = value and 1 or 0
00619 return self.execute_command('SETBIT', name, offset, value)
00620
00621 def setex(self, name, time, value):
00622 """
00623 Set the value of key ``name`` to ``value`` that expires in ``time``
00624 seconds. ``time`` can be represented by an integer or a Python
00625 timedelta object.
00626 """
00627 if isinstance(time, datetime.timedelta):
00628 time = int(time.total_seconds())
00629 return self.execute_command('SETEX', name, time, value)
00630
00631 def setnx(self, name, value):
00632 "Set the value of key ``name`` to ``value`` if key doesn't exist"
00633 return self.execute_command('SETNX', name, value)
00634
00635 def setrange(self, name, offset, value):
00636 """
00637 Overwrite bytes in the value of ``name`` starting at ``offset`` with
00638 ``value``. If ``offset`` plus the length of ``value`` exceeds the
00639 length of the original value, the new value will be larger than before.
00640 If ``offset`` exceeds the length of the original value, null bytes
00641 will be used to pad between the end of the previous value and the start
00642 of what's being injected.
00643
00644 Returns the length of the new string.
00645 """
00646 return self.execute_command('SETRANGE', name, offset, value)
00647
00648 def strlen(self, name):
00649 "Return the number of bytes stored in the value of ``name``"
00650 return self.execute_command('STRLEN', name)
00651
00652 def substr(self, name, start, end=-1):
00653 """
00654 Return a substring of the string at key ``name``. ``start`` and ``end``
00655 are 0-based integers specifying the portion of the string to return.
00656 """
00657 return self.execute_command('SUBSTR', name, start, end)
00658
00659 def ttl(self, name):
00660 "Returns the number of seconds until the key ``name`` will expire"
00661 return self.execute_command('TTL', name)
00662
00663 def type(self, name):
00664 "Returns the type of key ``name``"
00665 return self.execute_command('TYPE', name)
00666
00667 def watch(self, *names):
00668 """
00669 Watches the values at keys ``names``, or None if the key doesn't exist
00670 """
00671 warnings.warn(DeprecationWarning('Call WATCH from a Pipeline object'))
00672
00673 def unwatch(self):
00674 """
00675 Unwatches the value at key ``name``, or None of the key doesn't exist
00676 """
00677 warnings.warn(
00678 DeprecationWarning('Call UNWATCH from a Pipeline object'))
00679
00680
00681 def blpop(self, keys, timeout=0):
00682 """
00683 LPOP a value off of the first non-empty list
00684 named in the ``keys`` list.
00685
00686 If none of the lists in ``keys`` has a value to LPOP, then block
00687 for ``timeout`` seconds, or until a value gets pushed on to one
00688 of the lists.
00689
00690 If timeout is 0, then block indefinitely.
00691 """
00692 if timeout is None:
00693 timeout = 0
00694 if isinstance(keys, basestring):
00695 keys = [keys]
00696 else:
00697 keys = list(keys)
00698 keys.append(timeout)
00699 return self.execute_command('BLPOP', *keys)
00700
00701 def brpop(self, keys, timeout=0):
00702 """
00703 RPOP a value off of the first non-empty list
00704 named in the ``keys`` list.
00705
00706 If none of the lists in ``keys`` has a value to LPOP, then block
00707 for ``timeout`` seconds, or until a value gets pushed on to one
00708 of the lists.
00709
00710 If timeout is 0, then block indefinitely.
00711 """
00712 if timeout is None:
00713 timeout = 0
00714 if isinstance(keys, basestring):
00715 keys = [keys]
00716 else:
00717 keys = list(keys)
00718 keys.append(timeout)
00719 return self.execute_command('BRPOP', *keys)
00720
00721 def brpoplpush(self, src, dst, timeout=0):
00722 """
00723 Pop a value off the tail of ``src``, push it on the head of ``dst``
00724 and then return it.
00725
00726 This command blocks until a value is in ``src`` or until ``timeout``
00727 seconds elapse, whichever is first. A ``timeout`` value of 0 blocks
00728 forever.
00729 """
00730 if timeout is None:
00731 timeout = 0
00732 return self.execute_command('BRPOPLPUSH', src, dst, timeout)
00733
00734 def lindex(self, name, index):
00735 """
00736 Return the item from list ``name`` at position ``index``
00737
00738 Negative indexes are supported and will return an item at the
00739 end of the list
00740 """
00741 return self.execute_command('LINDEX', name, index)
00742
00743 def linsert(self, name, where, refvalue, value):
00744 """
00745 Insert ``value`` in list ``name`` either immediately before or after
00746 [``where``] ``refvalue``
00747
00748 Returns the new length of the list on success or -1 if ``refvalue``
00749 is not in the list.
00750 """
00751 return self.execute_command('LINSERT', name, where, refvalue, value)
00752
00753 def llen(self, name):
00754 "Return the length of the list ``name``"
00755 return self.execute_command('LLEN', name)
00756
00757 def lpop(self, name):
00758 "Remove and return the first item of the list ``name``"
00759 return self.execute_command('LPOP', name)
00760
00761 def lpush(self, name, *values):
00762 "Push ``values`` onto the head of the list ``name``"
00763 return self.execute_command('LPUSH', name, *values)
00764
00765 def lpushx(self, name, value):
00766 "Push ``value`` onto the head of the list ``name`` if ``name`` exists"
00767 return self.execute_command('LPUSHX', name, value)
00768
00769 def lrange(self, name, start, end):
00770 """
00771 Return a slice of the list ``name`` between
00772 position ``start`` and ``end``
00773
00774 ``start`` and ``end`` can be negative numbers just like
00775 Python slicing notation
00776 """
00777 return self.execute_command('LRANGE', name, start, end)
00778
00779 def lrem(self, name, count, value):
00780 """
00781 Remove the first ``count`` occurrences of elements equal to ``value``
00782 from the list stored at ``name``.
00783
00784 The count argument influences the operation in the following ways:
00785 count > 0: Remove elements equal to value moving from head to tail.
00786 count < 0: Remove elements equal to value moving from tail to head.
00787 count = 0: Remove all elements equal to value.
00788 """
00789 return self.execute_command('LREM', name, count, value)
00790
00791 def lset(self, name, index, value):
00792 "Set ``position`` of list ``name`` to ``value``"
00793 return self.execute_command('LSET', name, index, value)
00794
00795 def ltrim(self, name, start, end):
00796 """
00797 Trim the list ``name``, removing all values not within the slice
00798 between ``start`` and ``end``
00799
00800 ``start`` and ``end`` can be negative numbers just like
00801 Python slicing notation
00802 """
00803 return self.execute_command('LTRIM', name, start, end)
00804
00805 def rpop(self, name):
00806 "Remove and return the last item of the list ``name``"
00807 return self.execute_command('RPOP', name)
00808
00809 def rpoplpush(self, src, dst):
00810 """
00811 RPOP a value off of the ``src`` list and atomically LPUSH it
00812 on to the ``dst`` list. Returns the value.
00813 """
00814 return self.execute_command('RPOPLPUSH', src, dst)
00815
00816 def rpush(self, name, *values):
00817 "Push ``values`` onto the tail of the list ``name``"
00818 return self.execute_command('RPUSH', name, *values)
00819
00820 def rpushx(self, name, value):
00821 "Push ``value`` onto the tail of the list ``name`` if ``name`` exists"
00822 return self.execute_command('RPUSHX', name, value)
00823
00824 def sort(self, name, start=None, num=None, by=None, get=None,
00825 desc=False, alpha=False, store=None):
00826 """
00827 Sort and return the list, set or sorted set at ``name``.
00828
00829 ``start`` and ``num`` allow for paging through the sorted data
00830
00831 ``by`` allows using an external key to weight and sort the items.
00832 Use an "*" to indicate where in the key the item value is located
00833
00834 ``get`` allows for returning items from external keys rather than the
00835 sorted data itself. Use an "*" to indicate where int he key
00836 the item value is located
00837
00838 ``desc`` allows for reversing the sort
00839
00840 ``alpha`` allows for sorting lexicographically rather than numerically
00841
00842 ``store`` allows for storing the result of the sort into
00843 the key ``store``
00844 """
00845 if (start is not None and num is None) or \
00846 (num is not None and start is None):
00847 raise RedisError("``start`` and ``num`` must both be specified")
00848
00849 pieces = [name]
00850 if by is not None:
00851 pieces.append('BY')
00852 pieces.append(by)
00853 if start is not None and num is not None:
00854 pieces.append('LIMIT')
00855 pieces.append(start)
00856 pieces.append(num)
00857 if get is not None:
00858
00859
00860
00861
00862 if isinstance(get, basestring):
00863 pieces.append('GET')
00864 pieces.append(get)
00865 else:
00866 for g in get:
00867 pieces.append('GET')
00868 pieces.append(g)
00869 if desc:
00870 pieces.append('DESC')
00871 if alpha:
00872 pieces.append('ALPHA')
00873 if store is not None:
00874 pieces.append('STORE')
00875 pieces.append(store)
00876 return self.execute_command('SORT', *pieces)
00877
00878
00879 def sadd(self, name, *values):
00880 "Add ``value(s)`` to set ``name``"
00881 return self.execute_command('SADD', name, *values)
00882
00883 def scard(self, name):
00884 "Return the number of elements in set ``name``"
00885 return self.execute_command('SCARD', name)
00886
00887 def sdiff(self, keys, *args):
00888 "Return the difference of sets specified by ``keys``"
00889 args = list_or_args(keys, args)
00890 return self.execute_command('SDIFF', *args)
00891
00892 def sdiffstore(self, dest, keys, *args):
00893 """
00894 Store the difference of sets specified by ``keys`` into a new
00895 set named ``dest``. Returns the number of keys in the new set.
00896 """
00897 args = list_or_args(keys, args)
00898 return self.execute_command('SDIFFSTORE', dest, *args)
00899
00900 def sinter(self, keys, *args):
00901 "Return the intersection of sets specified by ``keys``"
00902 args = list_or_args(keys, args)
00903 return self.execute_command('SINTER', *args)
00904
00905 def sinterstore(self, dest, keys, *args):
00906 """
00907 Store the intersection of sets specified by ``keys`` into a new
00908 set named ``dest``. Returns the number of keys in the new set.
00909 """
00910 args = list_or_args(keys, args)
00911 return self.execute_command('SINTERSTORE', dest, *args)
00912
00913 def sismember(self, name, value):
00914 "Return a boolean indicating if ``value`` is a member of set ``name``"
00915 return self.execute_command('SISMEMBER', name, value)
00916
00917 def smembers(self, name):
00918 "Return all members of the set ``name``"
00919 return self.execute_command('SMEMBERS', name)
00920
00921 def smove(self, src, dst, value):
00922 "Move ``value`` from set ``src`` to set ``dst`` atomically"
00923 return self.execute_command('SMOVE', src, dst, value)
00924
00925 def spop(self, name):
00926 "Remove and return a random member of set ``name``"
00927 return self.execute_command('SPOP', name)
00928
00929 def srandmember(self, name):
00930 "Return a random member of set ``name``"
00931 return self.execute_command('SRANDMEMBER', name)
00932
00933 def srem(self, name, *values):
00934 "Remove ``values`` from set ``name``"
00935 return self.execute_command('SREM', name, *values)
00936
00937 def sunion(self, keys, *args):
00938 "Return the union of sets specifiued by ``keys``"
00939 args = list_or_args(keys, args)
00940 return self.execute_command('SUNION', *args)
00941
00942 def sunionstore(self, dest, keys, *args):
00943 """
00944 Store the union of sets specified by ``keys`` into a new
00945 set named ``dest``. Returns the number of keys in the new set.
00946 """
00947 args = list_or_args(keys, args)
00948 return self.execute_command('SUNIONSTORE', dest, *args)
00949
00950
00951 def zadd(self, name, *args, **kwargs):
00952 """
00953 Set any number of score, element-name pairs to the key ``name``. Pairs
00954 can be specified in two ways:
00955
00956 As *args, in the form of: score1, name1, score2, name2, ...
00957 or as **kwargs, in the form of: name1=score1, name2=score2, ...
00958
00959 The following example would add four values to the 'my-key' key:
00960 redis.zadd('my-key', 1.1, 'name1', 2.2, 'name2', name3=3.3, name4=4.4)
00961 """
00962 pieces = []
00963 if args:
00964 if len(args) % 2 != 0:
00965 raise RedisError("ZADD requires an equal number of "
00966 "values and scores")
00967 pieces.extend(args)
00968 for pair in iteritems(kwargs):
00969 pieces.append(pair[1])
00970 pieces.append(pair[0])
00971 return self.execute_command('ZADD', name, *pieces)
00972
00973 def zcard(self, name):
00974 "Return the number of elements in the sorted set ``name``"
00975 return self.execute_command('ZCARD', name)
00976
00977 def zcount(self, name, min, max):
00978 return self.execute_command('ZCOUNT', name, min, max)
00979
00980 def zincrby(self, name, value, amount=1):
00981 "Increment the score of ``value`` in sorted set ``name`` by ``amount``"
00982 return self.execute_command('ZINCRBY', name, amount, value)
00983
00984 def zinterstore(self, dest, keys, aggregate=None):
00985 """
00986 Intersect multiple sorted sets specified by ``keys`` into
00987 a new sorted set, ``dest``. Scores in the destination will be
00988 aggregated based on the ``aggregate``, or SUM if none is provided.
00989 """
00990 return self._zaggregate('ZINTERSTORE', dest, keys, aggregate)
00991
00992 def zrange(self, name, start, end, desc=False, withscores=False,
00993 score_cast_func=float):
00994 """
00995 Return a range of values from sorted set ``name`` between
00996 ``start`` and ``end`` sorted in ascending order.
00997
00998 ``start`` and ``end`` can be negative, indicating the end of the range.
00999
01000 ``desc`` a boolean indicating whether to sort the results descendingly
01001
01002 ``withscores`` indicates to return the scores along with the values.
01003 The return type is a list of (value, score) pairs
01004
01005 ``score_cast_func`` a callable used to cast the score return value
01006 """
01007 if desc:
01008 return self.zrevrange(name, start, end, withscores,
01009 score_cast_func)
01010 pieces = ['ZRANGE', name, start, end]
01011 if withscores:
01012 pieces.append('withscores')
01013 options = {
01014 'withscores': withscores, 'score_cast_func': score_cast_func}
01015 return self.execute_command(*pieces, **options)
01016
01017 def zrangebyscore(self, name, min, max, start=None, num=None,
01018 withscores=False, score_cast_func=float):
01019 """
01020 Return a range of values from the sorted set ``name`` with scores
01021 between ``min`` and ``max``.
01022
01023 If ``start`` and ``num`` are specified, then return a slice
01024 of the range.
01025
01026 ``withscores`` indicates to return the scores along with the values.
01027 The return type is a list of (value, score) pairs
01028
01029 `score_cast_func`` a callable used to cast the score return value
01030 """
01031 if (start is not None and num is None) or \
01032 (num is not None and start is None):
01033 raise RedisError("``start`` and ``num`` must both be specified")
01034 pieces = ['ZRANGEBYSCORE', name, min, max]
01035 if start is not None and num is not None:
01036 pieces.extend(['LIMIT', start, num])
01037 if withscores:
01038 pieces.append('withscores')
01039 options = {
01040 'withscores': withscores, 'score_cast_func': score_cast_func}
01041 return self.execute_command(*pieces, **options)
01042
01043 def zrank(self, name, value):
01044 """
01045 Returns a 0-based value indicating the rank of ``value`` in sorted set
01046 ``name``
01047 """
01048 return self.execute_command('ZRANK', name, value)
01049
01050 def zrem(self, name, *values):
01051 "Remove member ``values`` from sorted set ``name``"
01052 return self.execute_command('ZREM', name, *values)
01053
01054 def zremrangebyrank(self, name, min, max):
01055 """
01056 Remove all elements in the sorted set ``name`` with ranks between
01057 ``min`` and ``max``. Values are 0-based, ordered from smallest score
01058 to largest. Values can be negative indicating the highest scores.
01059 Returns the number of elements removed
01060 """
01061 return self.execute_command('ZREMRANGEBYRANK', name, min, max)
01062
01063 def zremrangebyscore(self, name, min, max):
01064 """
01065 Remove all elements in the sorted set ``name`` with scores
01066 between ``min`` and ``max``. Returns the number of elements removed.
01067 """
01068 return self.execute_command('ZREMRANGEBYSCORE', name, min, max)
01069
01070 def zrevrange(self, name, start, num, withscores=False,
01071 score_cast_func=float):
01072 """
01073 Return a range of values from sorted set ``name`` between
01074 ``start`` and ``num`` sorted in descending order.
01075
01076 ``start`` and ``num`` can be negative, indicating the end of the range.
01077
01078 ``withscores`` indicates to return the scores along with the values
01079 The return type is a list of (value, score) pairs
01080
01081 ``score_cast_func`` a callable used to cast the score return value
01082 """
01083 pieces = ['ZREVRANGE', name, start, num]
01084 if withscores:
01085 pieces.append('withscores')
01086 options = {
01087 'withscores': withscores, 'score_cast_func': score_cast_func}
01088 return self.execute_command(*pieces, **options)
01089
01090 def zrevrangebyscore(self, name, max, min, start=None, num=None,
01091 withscores=False, score_cast_func=float):
01092 """
01093 Return a range of values from the sorted set ``name`` with scores
01094 between ``min`` and ``max`` in descending order.
01095
01096 If ``start`` and ``num`` are specified, then return a slice
01097 of the range.
01098
01099 ``withscores`` indicates to return the scores along with the values.
01100 The return type is a list of (value, score) pairs
01101
01102 ``score_cast_func`` a callable used to cast the score return value
01103 """
01104 if (start is not None and num is None) or \
01105 (num is not None and start is None):
01106 raise RedisError("``start`` and ``num`` must both be specified")
01107 pieces = ['ZREVRANGEBYSCORE', name, max, min]
01108 if start is not None and num is not None:
01109 pieces.extend(['LIMIT', start, num])
01110 if withscores:
01111 pieces.append('withscores')
01112 options = {
01113 'withscores': withscores, 'score_cast_func': score_cast_func}
01114 return self.execute_command(*pieces, **options)
01115
01116 def zrevrank(self, name, value):
01117 """
01118 Returns a 0-based value indicating the descending rank of
01119 ``value`` in sorted set ``name``
01120 """
01121 return self.execute_command('ZREVRANK', name, value)
01122
01123 def zscore(self, name, value):
01124 "Return the score of element ``value`` in sorted set ``name``"
01125 return self.execute_command('ZSCORE', name, value)
01126
01127 def zunionstore(self, dest, keys, aggregate=None):
01128 """
01129 Union multiple sorted sets specified by ``keys`` into
01130 a new sorted set, ``dest``. Scores in the destination will be
01131 aggregated based on the ``aggregate``, or SUM if none is provided.
01132 """
01133 return self._zaggregate('ZUNIONSTORE', dest, keys, aggregate)
01134
01135 def _zaggregate(self, command, dest, keys, aggregate=None):
01136 pieces = [command, dest, len(keys)]
01137 if isinstance(keys, dict):
01138 keys, weights = dictkeys(keys), dictvalues(keys)
01139 else:
01140 weights = None
01141 pieces.extend(keys)
01142 if weights:
01143 pieces.append('WEIGHTS')
01144 pieces.extend(weights)
01145 if aggregate:
01146 pieces.append('AGGREGATE')
01147 pieces.append(aggregate)
01148 return self.execute_command(*pieces)
01149
01150
01151 def hdel(self, name, *keys):
01152 "Delete ``keys`` from hash ``name``"
01153 return self.execute_command('HDEL', name, *keys)
01154
01155 def hexists(self, name, key):
01156 "Returns a boolean indicating if ``key`` exists within hash ``name``"
01157 return self.execute_command('HEXISTS', name, key)
01158
01159 def hget(self, name, key):
01160 "Return the value of ``key`` within the hash ``name``"
01161 return self.execute_command('HGET', name, key)
01162
01163 def hgetall(self, name):
01164 "Return a Python dict of the hash's name/value pairs"
01165 return self.execute_command('HGETALL', name)
01166
01167 def hincrby(self, name, key, amount=1):
01168 "Increment the value of ``key`` in hash ``name`` by ``amount``"
01169 return self.execute_command('HINCRBY', name, key, amount)
01170
01171 def hkeys(self, name):
01172 "Return the list of keys within hash ``name``"
01173 return self.execute_command('HKEYS', name)
01174
01175 def hlen(self, name):
01176 "Return the number of elements in hash ``name``"
01177 return self.execute_command('HLEN', name)
01178
01179 def hset(self, name, key, value):
01180 """
01181 Set ``key`` to ``value`` within hash ``name``
01182 Returns 1 if HSET created a new field, otherwise 0
01183 """
01184 return self.execute_command('HSET', name, key, value)
01185
01186 def hsetnx(self, name, key, value):
01187 """
01188 Set ``key`` to ``value`` within hash ``name`` if ``key`` does not
01189 exist. Returns 1 if HSETNX created a field, otherwise 0.
01190 """
01191 return self.execute_command("HSETNX", name, key, value)
01192
01193 def hmset(self, name, mapping):
01194 """
01195 Sets each key in the ``mapping`` dict to its corresponding value
01196 in the hash ``name``
01197 """
01198 if not mapping:
01199 raise DataError("'hmset' with 'mapping' of length 0")
01200 items = []
01201 for pair in iteritems(mapping):
01202 items.extend(pair)
01203 return self.execute_command('HMSET', name, *items)
01204
01205 def hmget(self, name, keys, *args):
01206 "Returns a list of values ordered identically to ``keys``"
01207 args = list_or_args(keys, args)
01208 return self.execute_command('HMGET', name, *args)
01209
01210 def hvals(self, name):
01211 "Return the list of values within hash ``name``"
01212 return self.execute_command('HVALS', name)
01213
01214 def publish(self, channel, message):
01215 """
01216 Publish ``message`` on ``channel``.
01217 Returns the number of subscribers the message was delivered to.
01218 """
01219 return self.execute_command('PUBLISH', channel, message)
01220
01221 def eval(self, script, numkeys, *keys_and_args):
01222 """
01223 Execute the LUA ``script``, specifying the ``numkeys`` the script
01224 will touch and the key names and argument values in ``keys_and_args``.
01225 Returns the result of the script.
01226
01227 In practice, use the object returned by ``register_script``. This
01228 function exists purely for Redis API completion.
01229 """
01230 return self.execute_command('EVAL', script, numkeys, *keys_and_args)
01231
01232 def evalsha(self, sha, numkeys, *keys_and_args):
01233 """
01234 Use the ``sha`` to execute a LUA script already registered via EVAL
01235 or SCRIPT LOAD. Specify the ``numkeys`` the script will touch and the
01236 key names and argument values in ``keys_and_args``. Returns the result
01237 of the script.
01238
01239 In practice, use the object returned by ``register_script``. This
01240 function exists purely for Redis API completion.
01241 """
01242 return self.execute_command('EVALSHA', sha, numkeys, *keys_and_args)
01243
01244 def script_exists(self, *args):
01245 """
01246 Check if a script exists in the script cache by specifying the SHAs of
01247 each script as ``args``. Returns a list of boolean values indicating if
01248 if each already script exists in the cache.
01249 """
01250 options = {'parse': 'EXISTS'}
01251 return self.execute_command('SCRIPT', 'EXISTS', *args, **options)
01252
01253 def script_flush(self):
01254 "Flush all scripts from the script cache"
01255 options = {'parse': 'FLUSH'}
01256 return self.execute_command('SCRIPT', 'FLUSH', **options)
01257
01258 def script_kill(self):
01259 "Kill the currently executing LUA script"
01260 options = {'parse': 'KILL'}
01261 return self.execute_command('SCRIPT', 'KILL', **options)
01262
01263 def script_load(self, script):
01264 "Load a LUA ``script`` into the script cache. Returns the SHA."
01265 options = {'parse': 'LOAD'}
01266 return self.execute_command('SCRIPT', 'LOAD', script, **options)
01267
01268 def register_script(self, script):
01269 """
01270 Register a LUA ``script`` specifying the ``keys`` it will touch.
01271 Returns a Script object that is callable and hides the complexity of
01272 deal with scripts, keys, and shas. This is the preferred way to work
01273 with LUA scripts.
01274 """
01275 return Script(self, script)
01276
01277 class Redis(StrictRedis):
01278 """
01279 Provides backwards compatibility with older versions of redis-py that
01280 changed arguments to some commands to be more Pythonic, sane, or by
01281 accident.
01282 """
01283
01284 RESPONSE_CALLBACKS = dict_merge(
01285 StrictRedis.RESPONSE_CALLBACKS,
01286 {
01287 'TTL': lambda r: r != -1 and r or None,
01288 }
01289 )
01290
01291 def pipeline(self, transaction=True, shard_hint=None):
01292 """
01293 Return a new pipeline object that can queue multiple commands for
01294 later execution. ``transaction`` indicates whether all commands
01295 should be executed atomically. Apart from making a group of operations
01296 atomic, pipelines are useful for reducing the back-and-forth overhead
01297 between the client and server.
01298 """
01299 return Pipeline(
01300 self.connection_pool,
01301 self.response_callbacks,
01302 transaction,
01303 shard_hint)
01304
01305 def setex(self, name, value, time):
01306 """
01307 Set the value of key ``name`` to ``value`` that expires in ``time``
01308 seconds. ``time`` can be represented by an integer or a Python
01309 timedelta object.
01310 """
01311 if isinstance(time, datetime.timedelta):
01312 time = int(time.total_seconds())
01313 return self.execute_command('SETEX', name, time, value)
01314
01315 def lrem(self, name, value, num=0):
01316 """
01317 Remove the first ``num`` occurrences of elements equal to ``value``
01318 from the list stored at ``name``.
01319
01320 The ``num`` argument influences the operation in the following ways:
01321 num > 0: Remove elements equal to value moving from head to tail.
01322 num < 0: Remove elements equal to value moving from tail to head.
01323 num = 0: Remove all elements equal to value.
01324 """
01325 return self.execute_command('LREM', name, num, value)
01326
01327 def zadd(self, name, *args, **kwargs):
01328 """
01329 NOTE: The order of arguments differs from that of the official ZADD
01330 command. For backwards compatability, this method accepts arguments
01331 in the form of name1, score1, name2, score2, while the official Redis
01332 documents expects score1, name1, score2, name2.
01333
01334 If you're looking to use the standard syntax, consider using the
01335 StrictRedis class. See the API Reference section of the docs for more
01336 information.
01337
01338 Set any number of element-name, score pairs to the key ``name``. Pairs
01339 can be specified in two ways:
01340
01341 As *args, in the form of: name1, score1, name2, score2, ...
01342 or as **kwargs, in the form of: name1=score1, name2=score2, ...
01343
01344 The following example would add four values to the 'my-key' key:
01345 redis.zadd('my-key', 'name1', 1.1, 'name2', 2.2, name3=3.3, name4=4.4)
01346 """
01347 pieces = []
01348 if args:
01349 if len(args) % 2 != 0:
01350 raise RedisError("ZADD requires an equal number of "
01351 "values and scores")
01352 pieces.extend(reversed(args))
01353 for pair in iteritems(kwargs):
01354 pieces.append(pair[1])
01355 pieces.append(pair[0])
01356 return self.execute_command('ZADD', name, *pieces)
01357
01358
01359 class PubSub(object):
01360 """
01361 PubSub provides publish, subscribe and listen support to Redis channels.
01362
01363 After subscribing to one or more channels, the listen() method will block
01364 until a message arrives on one of the subscribed channels. That message
01365 will be returned and it's safe to start listening again.
01366 """
01367 def __init__(self, connection_pool, shard_hint=None):
01368 self.connection_pool = connection_pool
01369 self.shard_hint = shard_hint
01370 self.connection = None
01371 self.channels = set()
01372 self.patterns = set()
01373 self.subscription_count = 0
01374 self.subscribe_commands = set(
01375 ('subscribe', 'psubscribe', 'unsubscribe', 'punsubscribe')
01376 )
01377
01378 def __del__(self):
01379 try:
01380
01381
01382
01383 if self.connection and (self.channels or self.patterns):
01384 self.connection.disconnect()
01385 self.reset()
01386 except:
01387 pass
01388
01389 def reset(self):
01390 if self.connection:
01391 self.connection.disconnect()
01392 self.connection_pool.release(self.connection)
01393 self.connection = None
01394
01395 def close(self):
01396 self.reset()
01397
01398 def execute_command(self, *args, **kwargs):
01399 "Execute a publish/subscribe command"
01400
01401
01402
01403
01404
01405 if self.connection is None:
01406 self.connection = self.connection_pool.get_connection(
01407 'pubsub',
01408 self.shard_hint
01409 )
01410 connection = self.connection
01411 try:
01412 connection.send_command(*args)
01413 except ConnectionError:
01414 connection.disconnect()
01415
01416
01417 connection.connect()
01418
01419
01420 for channel in self.channels:
01421 self.subscribe(channel)
01422 for pattern in self.patterns:
01423 self.psubscribe(pattern)
01424 connection.send_command(*args)
01425
01426 def parse_response(self):
01427 "Parse the response from a publish/subscribe command"
01428 response = self.connection.read_response()
01429 if nativestr(response[0]) in self.subscribe_commands:
01430 self.subscription_count = response[2]
01431
01432
01433 if not self.subscription_count:
01434 self.reset()
01435 return response
01436
01437 def psubscribe(self, patterns):
01438 "Subscribe to all channels matching any pattern in ``patterns``"
01439 if isinstance(patterns, basestring):
01440 patterns = [patterns]
01441 for pattern in patterns:
01442 self.patterns.add(pattern)
01443 return self.execute_command('PSUBSCRIBE', *patterns)
01444
01445 def punsubscribe(self, patterns=[]):
01446 """
01447 Unsubscribe from any channel matching any pattern in ``patterns``.
01448 If empty, unsubscribe from all channels.
01449 """
01450 if isinstance(patterns, basestring):
01451 patterns = [patterns]
01452 for pattern in patterns:
01453 try:
01454 self.patterns.remove(pattern)
01455 except KeyError:
01456 pass
01457 return self.execute_command('PUNSUBSCRIBE', *patterns)
01458
01459 def subscribe(self, channels):
01460 "Subscribe to ``channels``, waiting for messages to be published"
01461 if isinstance(channels, basestring):
01462 channels = [channels]
01463 for channel in channels:
01464 self.channels.add(channel)
01465 return self.execute_command('SUBSCRIBE', *channels)
01466
01467 def unsubscribe(self, channels=[]):
01468 """
01469 Unsubscribe from ``channels``. If empty, unsubscribe
01470 from all channels
01471 """
01472 if isinstance(channels, basestring):
01473 channels = [channels]
01474 for channel in channels:
01475 try:
01476 self.channels.remove(channel)
01477 except KeyError:
01478 pass
01479 return self.execute_command('UNSUBSCRIBE', *channels)
01480
01481 def listen(self):
01482 "Listen for messages on channels this client has been subscribed to"
01483 while self.subscription_count or self.channels or self.patterns:
01484 if self.connection == None:
01485 break
01486
01487 r = self.parse_response()
01488 msg_type = nativestr(r[0])
01489 if msg_type == 'pmessage':
01490 msg = {
01491 'type': msg_type,
01492 'pattern': nativestr(r[1]),
01493 'channel': nativestr(r[2]),
01494 'data': r[3]
01495 }
01496 else:
01497 msg = {
01498 'type': msg_type,
01499 'pattern': None,
01500 'channel': nativestr(r[1]),
01501 'data': r[2]
01502 }
01503 yield msg
01504
01505
01506 class BasePipeline(object):
01507 """
01508 Pipelines provide a way to transmit multiple commands to the Redis server
01509 in one transmission. This is convenient for batch processing, such as
01510 saving all the values in a list to Redis.
01511
01512 All commands executed within a pipeline are wrapped with MULTI and EXEC
01513 calls. This guarantees all commands executed in the pipeline will be
01514 executed atomically.
01515
01516 Any command raising an exception does *not* halt the execution of
01517 subsequent commands in the pipeline. Instead, the exception is caught
01518 and its instance is placed into the response list returned by execute().
01519 Code iterating over the response list should be able to deal with an
01520 instance of an exception as a potential value. In general, these will be
01521 ResponseError exceptions, such as those raised when issuing a command
01522 on a key of a different datatype.
01523 """
01524
01525 UNWATCH_COMMANDS = set(('DISCARD', 'EXEC', 'UNWATCH'))
01526
01527 def __init__(self, connection_pool, response_callbacks, transaction,
01528 shard_hint):
01529 self.connection_pool = connection_pool
01530 self.connection = None
01531 self.response_callbacks = response_callbacks
01532 self.transaction = transaction
01533 self.shard_hint = shard_hint
01534
01535 self.watching = False
01536 self.reset()
01537
01538 def __enter__(self):
01539 return self
01540
01541 def __exit__(self, exc_type, exc_value, traceback):
01542 self.reset()
01543
01544 def __del__(self):
01545 try:
01546 self.reset()
01547 except:
01548 pass
01549
01550 def reset(self):
01551 self.command_stack = []
01552 self.scripts = set()
01553
01554
01555 if self.watching and self.connection:
01556 try:
01557
01558
01559 self.connection.send_command('UNWATCH')
01560 self.connection.read_response()
01561 except ConnectionError:
01562
01563 self.connection.disconnect()
01564
01565 self.watching = False
01566 self.explicit_transaction = False
01567
01568
01569 if self.connection:
01570 self.connection_pool.release(self.connection)
01571 self.connection = None
01572
01573 def multi(self):
01574 """
01575 Start a transactional block of the pipeline after WATCH commands
01576 are issued. End the transactional block with `execute`.
01577 """
01578 if self.explicit_transaction:
01579 raise RedisError('Cannot issue nested calls to MULTI')
01580 if self.command_stack:
01581 raise RedisError('Commands without an initial WATCH have already '
01582 'been issued')
01583 self.explicit_transaction = True
01584
01585 def execute_command(self, *args, **kwargs):
01586 if (self.watching or args[0] == 'WATCH') and \
01587 not self.explicit_transaction:
01588 return self.immediate_execute_command(*args, **kwargs)
01589 return self.pipeline_execute_command(*args, **kwargs)
01590
01591 def immediate_execute_command(self, *args, **options):
01592 """
01593 Execute a command immediately, but don't auto-retry on a
01594 ConnectionError if we're already WATCHing a variable. Used when
01595 issuing WATCH or subsequent commands retrieving their values but before
01596 MULTI is called.
01597 """
01598 command_name = args[0]
01599 conn = self.connection
01600
01601 if not conn:
01602 conn = self.connection_pool.get_connection(command_name,
01603 self.shard_hint)
01604 self.connection = conn
01605 try:
01606 conn.send_command(*args)
01607 return self.parse_response(conn, command_name, **options)
01608 except ConnectionError:
01609 conn.disconnect()
01610
01611
01612 if not self.watching:
01613 conn.send_command(*args)
01614 return self.parse_response(conn, command_name, **options)
01615 self.reset()
01616 raise
01617
01618 def pipeline_execute_command(self, *args, **options):
01619 """
01620 Stage a command to be executed when execute() is next called
01621
01622 Returns the current Pipeline object back so commands can be
01623 chained together, such as:
01624
01625 pipe = pipe.set('foo', 'bar').incr('baz').decr('bang')
01626
01627 At some other point, you can then run: pipe.execute(),
01628 which will execute all commands queued in the pipe.
01629 """
01630 self.command_stack.append((args, options))
01631 return self
01632
01633 def _execute_transaction(self, connection, commands):
01634 all_cmds = SYM_EMPTY.join(
01635 starmap(connection.pack_command,
01636 [args for args, options in commands]))
01637 connection.send_packed_command(all_cmds)
01638
01639 commands = commands[1:-1]
01640
01641
01642
01643 for i in range(len(commands) + 1):
01644 self.parse_response(connection, '_')
01645
01646 response = self.parse_response(connection, '_')
01647
01648 if response is None:
01649 raise WatchError("Watched variable changed.")
01650
01651 if len(response) != len(commands):
01652 raise ResponseError("Wrong number of response items from "
01653 "pipeline execution")
01654
01655 data = []
01656 for r, cmd in izip(response, commands):
01657 if not isinstance(r, Exception):
01658 args, options = cmd
01659 command_name = args[0]
01660 if command_name in self.response_callbacks:
01661 r = self.response_callbacks[command_name](r, **options)
01662 data.append(r)
01663 return data
01664
01665 def _execute_pipeline(self, connection, commands):
01666
01667 all_cmds = SYM_EMPTY.join(
01668 starmap(connection.pack_command,
01669 [args for args, options in commands]))
01670 connection.send_packed_command(all_cmds)
01671 return [self.parse_response(connection, args[0], **options)
01672 for args, options in commands]
01673
01674 def parse_response(self, connection, command_name, **options):
01675 result = StrictRedis.parse_response(
01676 self, connection, command_name, **options)
01677 if command_name in self.UNWATCH_COMMANDS:
01678 self.watching = False
01679 elif command_name == 'WATCH':
01680 self.watching = True
01681 return result
01682
01683 def load_scripts(self):
01684
01685 scripts = list(self.scripts)
01686 immediate = self.immediate_execute_command
01687 shas = [s.sha for s in scripts]
01688 exists = immediate('SCRIPT', 'EXISTS', *shas, **{'parse': 'EXISTS'})
01689 if not all(exists):
01690 for s, exist in izip(scripts, exists):
01691 if not exist:
01692 immediate('SCRIPT', 'LOAD', s.script, **{'parse': 'LOAD'})
01693
01694 def execute(self):
01695 "Execute all the commands in the current pipeline"
01696 if self.scripts:
01697 self.load_scripts()
01698 stack = self.command_stack
01699 if self.transaction or self.explicit_transaction:
01700 stack = [(('MULTI', ), {})] + stack + [(('EXEC', ), {})]
01701 execute = self._execute_transaction
01702 else:
01703 execute = self._execute_pipeline
01704
01705 conn = self.connection
01706 if not conn:
01707 conn = self.connection_pool.get_connection('MULTI',
01708 self.shard_hint)
01709
01710
01711 self.connection = conn
01712
01713 try:
01714 return execute(conn, stack)
01715 except ConnectionError:
01716 conn.disconnect()
01717
01718
01719
01720
01721
01722 if self.watching:
01723 raise WatchError("A ConnectionError occured on while watching "
01724 "one or more keys")
01725
01726
01727 return execute(conn, stack)
01728 finally:
01729 self.reset()
01730
01731 def watch(self, *names):
01732 "Watches the values at keys ``names``"
01733 if self.explicit_transaction:
01734 raise RedisError('Cannot issue a WATCH after a MULTI')
01735 return self.execute_command('WATCH', *names)
01736
01737 def unwatch(self):
01738 "Unwatches all previously specified keys"
01739 return self.watching and self.execute_command('UNWATCH') or True
01740
01741 def script_load_for_pipeline(self, script):
01742 "Make sure scripts are loaded prior to pipeline execution"
01743 self.scripts.add(script)
01744
01745
01746 class StrictPipeline(BasePipeline, StrictRedis):
01747 "Pipeline for the StrictRedis class"
01748 pass
01749
01750
01751 class Pipeline(BasePipeline, Redis):
01752 "Pipeline for the Redis class"
01753 pass
01754
01755
01756 class Script(object):
01757 "An executable LUA script object returned by ``register_script``"
01758
01759 def __init__(self, registered_client, script):
01760 self.registered_client = registered_client
01761 self.script = script
01762 self.sha = registered_client.script_load(script)
01763
01764 def __call__(self, keys=[], args=[], client=None):
01765 "Execute the script, passing any required ``args``"
01766 client = client or self.registered_client
01767 args = tuple(keys) + tuple(args)
01768
01769 if isinstance(client, BasePipeline):
01770
01771 client.script_load_for_pipeline(self)
01772 try:
01773 return client.evalsha(self.sha, len(keys), *args)
01774 except NoScriptError:
01775
01776
01777 self.sha = client.script_load(self.script)
01778 return client.evalsha(self.sha, len(keys), *args)
01779
01780
01781 class LockError(RedisError):
01782 "Errors thrown from the Lock"
01783 pass
01784
01785
01786 class Lock(object):
01787 """
01788 A shared, distributed Lock. Using Redis for locking allows the Lock
01789 to be shared across processes and/or machines.
01790
01791 It's left to the user to resolve deadlock issues and make sure
01792 multiple clients play nicely together.
01793 """
01794
01795 LOCK_FOREVER = float(2 ** 31 + 1)
01796
01797 def __init__(self, redis, name, timeout=None, sleep=0.1):
01798 """
01799 Create a new Lock instnace named ``name`` using the Redis client
01800 supplied by ``redis``.
01801
01802 ``timeout`` indicates a maximum life for the lock.
01803 By default, it will remain locked until release() is called.
01804
01805 ``sleep`` indicates the amount of time to sleep per loop iteration
01806 when the lock is in blocking mode and another client is currently
01807 holding the lock.
01808
01809 Note: If using ``timeout``, you should make sure all the hosts
01810 that are running clients have their time synchronized with a network
01811 time service like ntp.
01812 """
01813 self.redis = redis
01814 self.name = name
01815 self.acquired_until = None
01816 self.timeout = timeout
01817 self.sleep = sleep
01818 if self.timeout and self.sleep > self.timeout:
01819 raise LockError("'sleep' must be less than 'timeout'")
01820
01821 def __enter__(self):
01822 return self.acquire()
01823
01824 def __exit__(self, exc_type, exc_value, traceback):
01825 self.release()
01826
01827 def acquire(self, blocking=True):
01828 """
01829 Use Redis to hold a shared, distributed lock named ``name``.
01830 Returns True once the lock is acquired.
01831
01832 If ``blocking`` is False, always return immediately. If the lock
01833 was acquired, return True, otherwise return False.
01834 """
01835 sleep = self.sleep
01836 timeout = self.timeout
01837 while 1:
01838 unixtime = int(mod_time.time())
01839 if timeout:
01840 timeout_at = unixtime + timeout
01841 else:
01842 timeout_at = Lock.LOCK_FOREVER
01843 timeout_at = float(timeout_at)
01844 if self.redis.setnx(self.name, timeout_at):
01845 self.acquired_until = timeout_at
01846 return True
01847
01848
01849 existing = float(self.redis.get(self.name) or 1)
01850 if existing < unixtime:
01851
01852 existing = float(self.redis.getset(self.name, timeout_at) or 1)
01853 if existing < unixtime:
01854
01855 self.acquired_until = timeout_at
01856 return True
01857 if not blocking:
01858 return False
01859 mod_time.sleep(sleep)
01860
01861 def release(self):
01862 "Releases the already acquired lock"
01863 if self.acquired_until is None:
01864 raise ValueError("Cannot release an unlocked lock")
01865 existing = float(self.redis.get(self.name) or 1)
01866
01867 if existing >= self.acquired_until:
01868 self.redis.delete(self.name)
01869 self.acquired_until = None