Package redis :: Module client
[frames] | no frames]

Source Code for Module redis.client

   1  from __future__ import with_statement 
   2  from itertools import starmap 
   3  import datetime 
   4  import warnings 
   5  import time as mod_time 
   6  from redis._compat import (b, izip, imap, iteritems, dictkeys, dictvalues, 
   7                             basestring, long, nativestr, urlparse) 
   8  from redis.connection import ConnectionPool, UnixDomainSocketConnection 
   9  from redis.exceptions import ( 
  10      ConnectionError, 
  11      DataError, 
  12      RedisError, 
  13      ResponseError, 
  14      WatchError, 
  15      NoScriptError 
  16  ) 
  17   
  18  SYM_EMPTY = b('') 
19 20 21 -def list_or_args(keys, args):
22 # returns a single list combining keys and args 23 try: 24 iter(keys) 25 # a string can be iterated, but indicates 26 # keys wasn't passed as a list 27 if isinstance(keys, basestring): 28 keys = [keys] 29 except TypeError: 30 keys = [keys] 31 if args: 32 keys.extend(args) 33 return keys
34
35 36 -def timestamp_to_datetime(response):
37 "Converts a unix timestamp to a Python datetime object" 38 if not response: 39 return None 40 try: 41 response = int(response) 42 except ValueError: 43 return None 44 return datetime.datetime.fromtimestamp(response)
45
46 47 -def string_keys_to_dict(key_string, callback):
48 return dict.fromkeys(key_string.split(), callback)
49
50 51 -def dict_merge(*dicts):
52 merged = {} 53 [merged.update(d) for d in dicts] 54 return merged
55
56 57 -def parse_debug_object(response):
58 "Parse the results of Redis's DEBUG OBJECT command into a Python dict" 59 # The 'type' of the object is the first item in the response, but isn't 60 # prefixed with a name 61 response = nativestr(response) 62 response = 'type:' + response 63 response = dict([kv.split(':') for kv in response.split()]) 64 65 # parse some expected int values from the string response 66 # note: this cmd isn't spec'd so these may not appear in all redis versions 67 int_fields = ('refcount', 'serializedlength', 'lru', 'lru_seconds_idle') 68 for field in int_fields: 69 if field in response: 70 response[field] = int(response[field]) 71 72 return response
73
74 75 -def parse_object(response, infotype):
76 "Parse the results of an OBJECT command" 77 if infotype in ('idletime', 'refcount'): 78 return int(response) 79 return response
80
81 82 -def parse_info(response):
83 "Parse the result of Redis's INFO command into a Python dict" 84 info = {} 85 response = nativestr(response) 86 87 def get_value(value): 88 if ',' not in value or '=' not in value: 89 return value 90 91 sub_dict = {} 92 for item in value.split(','): 93 k, v = item.rsplit('=', 1) 94 try: 95 sub_dict[k] = int(v) 96 except ValueError: 97 sub_dict[k] = v 98 return sub_dict
99 100 for line in response.splitlines(): 101 if line and not line.startswith('#'): 102 key, value = line.split(':') 103 try: 104 if '.' in value: 105 info[key] = float(value) 106 else: 107 info[key] = int(value) 108 except ValueError: 109 info[key] = get_value(value) 110 return info 111
112 113 -def pairs_to_dict(response):
114 "Create a dict given a list of key/value pairs" 115 it = iter(response) 116 return dict(izip(it, it))
117
118 119 -def zset_score_pairs(response, **options):
120 """ 121 If ``withscores`` is specified in the options, return the response as 122 a list of (value, score) pairs 123 """ 124 if not response or not options['withscores']: 125 return response 126 score_cast_func = options.get('score_cast_func', float) 127 it = iter(response) 128 return list(izip(it, imap(score_cast_func, it)))
129
130 131 -def int_or_none(response):
132 if response is None: 133 return None 134 return int(response)
135
136 137 -def float_or_none(response):
138 if response is None: 139 return None 140 return float(response)
141
142 143 -def parse_config(response, **options):
144 if options['parse'] == 'GET': 145 response = [nativestr(i) if i is not None else None for i in response] 146 return response and pairs_to_dict(response) or {} 147 return nativestr(response) == 'OK'
148
149 150 -def parse_script(response, **options):
151 parse = options['parse'] 152 if parse in ('FLUSH', 'KILL'): 153 return response == 'OK' 154 if parse == 'EXISTS': 155 return list(imap(bool, response)) 156 return response
157
158 159 -class StrictRedis(object):
160 """ 161 Implementation of the Redis protocol. 162 163 This abstract class provides a Python interface to all Redis commands 164 and an implementation of the Redis protocol. 165 166 Connection and Pipeline derive from this, implementing how 167 the commands are sent and received to the Redis server 168 """ 169 RESPONSE_CALLBACKS = dict_merge( 170 string_keys_to_dict( 171 'AUTH DEL EXISTS EXPIRE EXPIREAT HDEL HEXISTS HMSET MOVE MSETNX ' 172 'PERSIST RENAMENX SISMEMBER SMOVE SETEX SETNX SREM ZREM', 173 bool 174 ), 175 string_keys_to_dict( 176 'BITCOUNT DECRBY GETBIT HLEN INCRBY LINSERT LLEN LPUSHX RPUSHX ' 177 'SADD SCARD SDIFFSTORE SETBIT SETRANGE SINTERSTORE STRLEN ' 178 'SUNIONSTORE ZADD ZCARD ZREMRANGEBYRANK ZREMRANGEBYSCORE', 179 int 180 ), 181 string_keys_to_dict( 182 # these return OK, or int if redis-server is >=1.3.4 183 'LPUSH RPUSH', 184 lambda r: isinstance(r, long) and r or nativestr(r) == 'OK' 185 ), 186 string_keys_to_dict('ZSCORE ZINCRBY', float_or_none), 187 string_keys_to_dict( 188 'FLUSHALL FLUSHDB LSET LTRIM MSET RENAME ' 189 'SAVE SELECT SET SHUTDOWN SLAVEOF WATCH UNWATCH', 190 lambda r: nativestr(r) == 'OK' 191 ), 192 string_keys_to_dict('BLPOP BRPOP', lambda r: r and tuple(r) or None), 193 string_keys_to_dict( 194 'SDIFF SINTER SMEMBERS SUNION', 195 lambda r: r and set(r) or set() 196 ), 197 string_keys_to_dict( 198 'ZRANGE ZRANGEBYSCORE ZREVRANGE ZREVRANGEBYSCORE', 199 zset_score_pairs 200 ), 201 string_keys_to_dict('ZRANK ZREVRANK', int_or_none), 202 { 203 'BGREWRITEAOF': ( 204 lambda r: r == 'Background rewriting of AOF file started' 205 ), 206 'BGSAVE': lambda r: r == 'Background saving started', 207 'BRPOPLPUSH': lambda r: r and r or None, 208 'CONFIG': parse_config, 209 'DEBUG': parse_debug_object, 210 'HGETALL': lambda r: r and pairs_to_dict(r) or {}, 211 'INFO': parse_info, 212 'LASTSAVE': timestamp_to_datetime, 213 'OBJECT': parse_object, 214 'PING': lambda r: nativestr(r) == 'PONG', 215 'RANDOMKEY': lambda r: r and r or None, 216 'SCRIPT': parse_script, 217 'TIME': lambda x: (int(x[0]), int(x[1])) 218 } 219 ) 220 221 @classmethod
222 - def from_url(cls, url, db=None, **kwargs):
223 """ 224 Return a Redis client object configured from the given URL. 225 226 For example:: 227 228 redis://username:password@localhost:6379/0 229 230 If ``db`` is None, this method will attempt to extract the database ID 231 from the URL path component. 232 233 Any additional keyword arguments will be passed along to the Redis 234 class's initializer. 235 """ 236 url = urlparse(url) 237 238 # We only support redis:// schemes. 239 assert url.scheme == 'redis' or not url.scheme 240 241 # Extract the database ID from the path component if hasn't been given. 242 if db is None: 243 try: 244 db = int(url.path.replace('/', '')) 245 except (AttributeError, ValueError): 246 db = 0 247 248 return cls(host=url.hostname, port=url.port, db=db, 249 password=url.password, **kwargs)
250
251 - def __init__(self, host='localhost', port=6379, 252 db=0, password=None, socket_timeout=None, 253 connection_pool=None, charset='utf-8', 254 errors='strict', decode_responses=False, 255 unix_socket_path=None):
256 if not connection_pool: 257 kwargs = { 258 'db': db, 259 'password': password, 260 'socket_timeout': socket_timeout, 261 'encoding': charset, 262 'encoding_errors': errors, 263 'decode_responses': decode_responses, 264 } 265 # based on input, setup appropriate connection args 266 if unix_socket_path: 267 kwargs.update({ 268 'path': unix_socket_path, 269 'connection_class': UnixDomainSocketConnection 270 }) 271 else: 272 kwargs.update({ 273 'host': host, 274 'port': port 275 }) 276 connection_pool = ConnectionPool(**kwargs) 277 self.connection_pool = connection_pool 278 279 self.response_callbacks = self.__class__.RESPONSE_CALLBACKS.copy()
280
281 - def set_response_callback(self, command, callback):
282 "Set a custom Response Callback" 283 self.response_callbacks[command] = callback
284
285 - def pipeline(self, transaction=True, shard_hint=None):
286 """ 287 Return a new pipeline object that can queue multiple commands for 288 later execution. ``transaction`` indicates whether all commands 289 should be executed atomically. Apart from making a group of operations 290 atomic, pipelines are useful for reducing the back-and-forth overhead 291 between the client and server. 292 """ 293 return StrictPipeline( 294 self.connection_pool, 295 self.response_callbacks, 296 transaction, 297 shard_hint)
298
299 - def transaction(self, func, *watches, **kwargs):
300 """ 301 Convenience method for executing the callable `func` as a transaction 302 while watching all keys specified in `watches`. The 'func' callable 303 should expect a single arguement which is a Pipeline object. 304 """ 305 shard_hint = kwargs.pop('shard_hint', None) 306 with self.pipeline(True, shard_hint) as pipe: 307 while 1: 308 try: 309 if watches: 310 pipe.watch(*watches) 311 func(pipe) 312 return pipe.execute() 313 except WatchError: 314 continue
315
316 - def lock(self, name, timeout=None, sleep=0.1):
317 """ 318 Return a new Lock object using key ``name`` that mimics 319 the behavior of threading.Lock. 320 321 If specified, ``timeout`` indicates a maximum life for the lock. 322 By default, it will remain locked until release() is called. 323 324 ``sleep`` indicates the amount of time to sleep per loop iteration 325 when the lock is in blocking mode and another client is currently 326 holding the lock. 327 """ 328 return Lock(self, name, timeout=timeout, sleep=sleep)
329
330 - def pubsub(self, shard_hint=None):
331 """ 332 Return a Publish/Subscribe object. With this object, you can 333 subscribe to channels and listen for messages that get published to 334 them. 335 """ 336 return PubSub(self.connection_pool, shard_hint)
337 338 #### COMMAND EXECUTION AND PROTOCOL PARSING ####
339 - def execute_command(self, *args, **options):
340 "Execute a command and return a parsed response" 341 pool = self.connection_pool 342 command_name = args[0] 343 connection = pool.get_connection(command_name, **options) 344 try: 345 connection.send_command(*args) 346 return self.parse_response(connection, command_name, **options) 347 except ConnectionError: 348 connection.disconnect() 349 connection.send_command(*args) 350 return self.parse_response(connection, command_name, **options) 351 finally: 352 pool.release(connection)
353
354 - def parse_response(self, connection, command_name, **options):
355 "Parses a response from the Redis server" 356 response = connection.read_response() 357 if command_name in self.response_callbacks: 358 return self.response_callbacks[command_name](response, **options) 359 return response
360 361 #### SERVER INFORMATION ####
362 - def bgrewriteaof(self):
363 "Tell the Redis server to rewrite the AOF file from data in memory." 364 return self.execute_command('BGREWRITEAOF')
365
366 - def bgsave(self):
367 """ 368 Tell the Redis server to save its data to disk. Unlike save(), 369 this method is asynchronous and returns immediately. 370 """ 371 return self.execute_command('BGSAVE')
372
373 - def config_get(self, pattern="*"):
374 "Return a dictionary of configuration based on the ``pattern``" 375 return self.execute_command('CONFIG', 'GET', pattern, parse='GET')
376
377 - def config_set(self, name, value):
378 "Set config item ``name`` with ``value``" 379 return self.execute_command('CONFIG', 'SET', name, value, parse='SET')
380
381 - def dbsize(self):
382 "Returns the number of keys in the current database" 383 return self.execute_command('DBSIZE')
384
385 - def time(self):
386 """ 387 Returns the server time as a 2-item tuple of ints: 388 (seconds since epoch, microseconds into this second). 389 """ 390 return self.execute_command('TIME')
391
392 - def debug_object(self, key):
393 "Returns version specific metainformation about a give key" 394 return self.execute_command('DEBUG', 'OBJECT', key)
395
396 - def delete(self, *names):
397 "Delete one or more keys specified by ``names``" 398 return self.execute_command('DEL', *names)
399 __delitem__ = delete 400
401 - def echo(self, value):
402 "Echo the string back from the server" 403 return self.execute_command('ECHO', value)
404
405 - def flushall(self):
406 "Delete all keys in all databases on the current host" 407 return self.execute_command('FLUSHALL')
408
409 - def flushdb(self):
410 "Delete all keys in the current database" 411 return self.execute_command('FLUSHDB')
412
413 - def info(self):
414 "Returns a dictionary containing information about the Redis server" 415 return self.execute_command('INFO')
416
417 - def lastsave(self):
418 """ 419 Return a Python datetime object representing the last time the 420 Redis database was saved to disk 421 """ 422 return self.execute_command('LASTSAVE')
423
424 - def object(self, infotype, key):
425 "Return the encoding, idletime, or refcount about the key" 426 return self.execute_command('OBJECT', infotype, key, infotype=infotype)
427
428 - def ping(self):
429 "Ping the Redis server" 430 return self.execute_command('PING')
431
432 - def save(self):
433 """ 434 Tell the Redis server to save its data to disk, 435 blocking until the save is complete 436 """ 437 return self.execute_command('SAVE')
438
439 - def shutdown(self):
440 "Shutdown the server" 441 try: 442 self.execute_command('SHUTDOWN') 443 except ConnectionError: 444 # a ConnectionError here is expected 445 return 446 raise RedisError("SHUTDOWN seems to have failed.")
447
448 - def slaveof(self, host=None, port=None):
449 """ 450 Set the server to be a replicated slave of the instance identified 451 by the ``host`` and ``port``. If called without arguements, the 452 instance is promoted to a master instead. 453 """ 454 if host is None and port is None: 455 return self.execute_command("SLAVEOF", "NO", "ONE") 456 return self.execute_command("SLAVEOF", host, port)
457 458 #### BASIC KEY COMMANDS ####
459 - def append(self, key, value):
460 """ 461 Appends the string ``value`` to the value at ``key``. If ``key`` 462 doesn't already exist, create it with a value of ``value``. 463 Returns the new length of the value at ``key``. 464 """ 465 return self.execute_command('APPEND', key, value)
466
467 - def getrange(self, key, start, end):
468 """ 469 Returns the substring of the string value stored at ``key``, 470 determined by the offsets ``start`` and ``end`` (both are inclusive) 471 """ 472 return self.execute_command('GETRANGE', key, start, end)
473
474 - def bitcount(self, key, start=None, end=None):
475 """ 476 Returns the count of set bits in the value of ``key``. Optional 477 ``start`` and ``end`` paramaters indicate which bytes to consider 478 """ 479 params = [key] 480 if start and end: 481 params.append(start) 482 params.append(end) 483 elif (start and not end) or (end and not start): 484 raise RedisError("Both start and end must be specified") 485 return self.execute_command('BITCOUNT', *params)
486
487 - def bitop(self, operation, dest, *keys):
488 """ 489 Perform a bitwise operation using ``operation`` between ``keys`` and 490 store the result in ``dest``. 491 """ 492 return self.execute_command('BITOP', operation, dest, *keys)
493
494 - def decr(self, name, amount=1):
495 """ 496 Decrements the value of ``key`` by ``amount``. If no key exists, 497 the value will be initialized as 0 - ``amount`` 498 """ 499 return self.execute_command('DECRBY', name, amount)
500
501 - def exists(self, name):
502 "Returns a boolean indicating whether key ``name`` exists" 503 return self.execute_command('EXISTS', name)
504 __contains__ = exists 505
506 - def expire(self, name, time):
507 """ 508 Set an expire flag on key ``name`` for ``time`` seconds. ``time`` 509 can be represented by an integer or a Python timedelta object. 510 """ 511 if isinstance(time, datetime.timedelta): 512 time = int(time.total_seconds()) 513 return self.execute_command('EXPIRE', name, time)
514
515 - def expireat(self, name, when):
516 """ 517 Set an expire flag on key ``name``. ``when`` can be represented 518 as an integer indicating unix time or a Python datetime object. 519 """ 520 if isinstance(when, datetime.datetime): 521 when = int(mod_time.mktime(when.timetuple())) 522 return self.execute_command('EXPIREAT', name, when)
523
524 - def get(self, name):
525 """ 526 Return the value at key ``name``, or None if the key doesn't exist 527 """ 528 return self.execute_command('GET', name)
529
530 - def __getitem__(self, name):
531 """ 532 Return the value at key ``name``, raises a KeyError if the key 533 doesn't exist. 534 """ 535 value = self.get(name) 536 if value: 537 return value 538 raise KeyError(name)
539
540 - def getbit(self, name, offset):
541 "Returns a boolean indicating the value of ``offset`` in ``name``" 542 return self.execute_command('GETBIT', name, offset)
543
544 - def getset(self, name, value):
545 """ 546 Set the value at key ``name`` to ``value`` if key doesn't exist 547 Return the value at key ``name`` atomically 548 """ 549 return self.execute_command('GETSET', name, value)
550
551 - def incr(self, name, amount=1):
552 """ 553 Increments the value of ``key`` by ``amount``. If no key exists, 554 the value will be initialized as ``amount`` 555 """ 556 return self.execute_command('INCRBY', name, amount)
557
558 - def keys(self, pattern='*'):
559 "Returns a list of keys matching ``pattern``" 560 return self.execute_command('KEYS', pattern)
561
562 - def mget(self, keys, *args):
563 """ 564 Returns a list of values ordered identically to ``keys`` 565 """ 566 args = list_or_args(keys, args) 567 return self.execute_command('MGET', *args)
568
569 - def mset(self, mapping):
570 "Sets each key in the ``mapping`` dict to its corresponding value" 571 items = [] 572 for pair in iteritems(mapping): 573 items.extend(pair) 574 return self.execute_command('MSET', *items)
575
576 - def msetnx(self, mapping):
577 """ 578 Sets each key in the ``mapping`` dict to its corresponding value if 579 none of the keys are already set 580 """ 581 items = [] 582 for pair in iteritems(mapping): 583 items.extend(pair) 584 return self.execute_command('MSETNX', *items)
585
586 - def move(self, name, db):
587 "Moves the key ``name`` to a different Redis database ``db``" 588 return self.execute_command('MOVE', name, db)
589
590 - def persist(self, name):
591 "Removes an expiration on ``name``" 592 return self.execute_command('PERSIST', name)
593
594 - def randomkey(self):
595 "Returns the name of a random key" 596 return self.execute_command('RANDOMKEY')
597
598 - def rename(self, src, dst):
599 """ 600 Rename key ``src`` to ``dst`` 601 """ 602 return self.execute_command('RENAME', src, dst)
603
604 - def renamenx(self, src, dst):
605 "Rename key ``src`` to ``dst`` if ``dst`` doesn't already exist" 606 return self.execute_command('RENAMENX', src, dst)
607
608 - def set(self, name, value):
609 "Set the value at key ``name`` to ``value``" 610 return self.execute_command('SET', name, value)
611 __setitem__ = set 612
613 - def setbit(self, name, offset, value):
614 """ 615 Flag the ``offset`` in ``name`` as ``value``. Returns a boolean 616 indicating the previous value of ``offset``. 617 """ 618 value = value and 1 or 0 619 return self.execute_command('SETBIT', name, offset, value)
620
621 - def setex(self, name, time, value):
622 """ 623 Set the value of key ``name`` to ``value`` that expires in ``time`` 624 seconds. ``time`` can be represented by an integer or a Python 625 timedelta object. 626 """ 627 if isinstance(time, datetime.timedelta): 628 time = int(time.total_seconds()) 629 return self.execute_command('SETEX', name, time, value)
630
631 - def setnx(self, name, value):
632 "Set the value of key ``name`` to ``value`` if key doesn't exist" 633 return self.execute_command('SETNX', name, value)
634
635 - def setrange(self, name, offset, value):
636 """ 637 Overwrite bytes in the value of ``name`` starting at ``offset`` with 638 ``value``. If ``offset`` plus the length of ``value`` exceeds the 639 length of the original value, the new value will be larger than before. 640 If ``offset`` exceeds the length of the original value, null bytes 641 will be used to pad between the end of the previous value and the start 642 of what's being injected. 643 644 Returns the length of the new string. 645 """ 646 return self.execute_command('SETRANGE', name, offset, value)
647
648 - def strlen(self, name):
649 "Return the number of bytes stored in the value of ``name``" 650 return self.execute_command('STRLEN', name)
651
652 - def substr(self, name, start, end=-1):
653 """ 654 Return a substring of the string at key ``name``. ``start`` and ``end`` 655 are 0-based integers specifying the portion of the string to return. 656 """ 657 return self.execute_command('SUBSTR', name, start, end)
658
659 - def ttl(self, name):
660 "Returns the number of seconds until the key ``name`` will expire" 661 return self.execute_command('TTL', name)
662
663 - def type(self, name):
664 "Returns the type of key ``name``" 665 return self.execute_command('TYPE', name)
666
667 - def watch(self, *names):
668 """ 669 Watches the values at keys ``names``, or None if the key doesn't exist 670 """ 671 warnings.warn(DeprecationWarning('Call WATCH from a Pipeline object'))
672
673 - def unwatch(self):
674 """ 675 Unwatches the value at key ``name``, or None of the key doesn't exist 676 """ 677 warnings.warn( 678 DeprecationWarning('Call UNWATCH from a Pipeline object'))
679 680 #### LIST COMMANDS ####
681 - def blpop(self, keys, timeout=0):
682 """ 683 LPOP a value off of the first non-empty list 684 named in the ``keys`` list. 685 686 If none of the lists in ``keys`` has a value to LPOP, then block 687 for ``timeout`` seconds, or until a value gets pushed on to one 688 of the lists. 689 690 If timeout is 0, then block indefinitely. 691 """ 692 if timeout is None: 693 timeout = 0 694 if isinstance(keys, basestring): 695 keys = [keys] 696 else: 697 keys = list(keys) 698 keys.append(timeout) 699 return self.execute_command('BLPOP', *keys)
700
701 - def brpop(self, keys, timeout=0):
702 """ 703 RPOP a value off of the first non-empty list 704 named in the ``keys`` list. 705 706 If none of the lists in ``keys`` has a value to LPOP, then block 707 for ``timeout`` seconds, or until a value gets pushed on to one 708 of the lists. 709 710 If timeout is 0, then block indefinitely. 711 """ 712 if timeout is None: 713 timeout = 0 714 if isinstance(keys, basestring): 715 keys = [keys] 716 else: 717 keys = list(keys) 718 keys.append(timeout) 719 return self.execute_command('BRPOP', *keys)
720
721 - def brpoplpush(self, src, dst, timeout=0):
722 """ 723 Pop a value off the tail of ``src``, push it on the head of ``dst`` 724 and then return it. 725 726 This command blocks until a value is in ``src`` or until ``timeout`` 727 seconds elapse, whichever is first. A ``timeout`` value of 0 blocks 728 forever. 729 """ 730 if timeout is None: 731 timeout = 0 732 return self.execute_command('BRPOPLPUSH', src, dst, timeout)
733
734 - def lindex(self, name, index):
735 """ 736 Return the item from list ``name`` at position ``index`` 737 738 Negative indexes are supported and will return an item at the 739 end of the list 740 """ 741 return self.execute_command('LINDEX', name, index)
742
743 - def linsert(self, name, where, refvalue, value):
744 """ 745 Insert ``value`` in list ``name`` either immediately before or after 746 [``where``] ``refvalue`` 747 748 Returns the new length of the list on success or -1 if ``refvalue`` 749 is not in the list. 750 """ 751 return self.execute_command('LINSERT', name, where, refvalue, value)
752
753 - def llen(self, name):
754 "Return the length of the list ``name``" 755 return self.execute_command('LLEN', name)
756
757 - def lpop(self, name):
758 "Remove and return the first item of the list ``name``" 759 return self.execute_command('LPOP', name)
760
761 - def lpush(self, name, *values):
762 "Push ``values`` onto the head of the list ``name``" 763 return self.execute_command('LPUSH', name, *values)
764
765 - def lpushx(self, name, value):
766 "Push ``value`` onto the head of the list ``name`` if ``name`` exists" 767 return self.execute_command('LPUSHX', name, value)
768
769 - def lrange(self, name, start, end):
770 """ 771 Return a slice of the list ``name`` between 772 position ``start`` and ``end`` 773 774 ``start`` and ``end`` can be negative numbers just like 775 Python slicing notation 776 """ 777 return self.execute_command('LRANGE', name, start, end)
778
779 - def lrem(self, name, count, value):
780 """ 781 Remove the first ``count`` occurrences of elements equal to ``value`` 782 from the list stored at ``name``. 783 784 The count argument influences the operation in the following ways: 785 count > 0: Remove elements equal to value moving from head to tail. 786 count < 0: Remove elements equal to value moving from tail to head. 787 count = 0: Remove all elements equal to value. 788 """ 789 return self.execute_command('LREM', name, count, value)
790
791 - def lset(self, name, index, value):
792 "Set ``position`` of list ``name`` to ``value``" 793 return self.execute_command('LSET', name, index, value)
794
795 - def ltrim(self, name, start, end):
796 """ 797 Trim the list ``name``, removing all values not within the slice 798 between ``start`` and ``end`` 799 800 ``start`` and ``end`` can be negative numbers just like 801 Python slicing notation 802 """ 803 return self.execute_command('LTRIM', name, start, end)
804
805 - def rpop(self, name):
806 "Remove and return the last item of the list ``name``" 807 return self.execute_command('RPOP', name)
808
809 - def rpoplpush(self, src, dst):
810 """ 811 RPOP a value off of the ``src`` list and atomically LPUSH it 812 on to the ``dst`` list. Returns the value. 813 """ 814 return self.execute_command('RPOPLPUSH', src, dst)
815
816 - def rpush(self, name, *values):
817 "Push ``values`` onto the tail of the list ``name``" 818 return self.execute_command('RPUSH', name, *values)
819
820 - def rpushx(self, name, value):
821 "Push ``value`` onto the tail of the list ``name`` if ``name`` exists" 822 return self.execute_command('RPUSHX', name, value)
823
824 - def sort(self, name, start=None, num=None, by=None, get=None, 825 desc=False, alpha=False, store=None):
826 """ 827 Sort and return the list, set or sorted set at ``name``. 828 829 ``start`` and ``num`` allow for paging through the sorted data 830 831 ``by`` allows using an external key to weight and sort the items. 832 Use an "*" to indicate where in the key the item value is located 833 834 ``get`` allows for returning items from external keys rather than the 835 sorted data itself. Use an "*" to indicate where int he key 836 the item value is located 837 838 ``desc`` allows for reversing the sort 839 840 ``alpha`` allows for sorting lexicographically rather than numerically 841 842 ``store`` allows for storing the result of the sort into 843 the key ``store`` 844 """ 845 if (start is not None and num is None) or \ 846 (num is not None and start is None): 847 raise RedisError("``start`` and ``num`` must both be specified") 848 849 pieces = [name] 850 if by is not None: 851 pieces.append('BY') 852 pieces.append(by) 853 if start is not None and num is not None: 854 pieces.append('LIMIT') 855 pieces.append(start) 856 pieces.append(num) 857 if get is not None: 858 # If get is a string assume we want to get a single value. 859 # Otherwise assume it's an interable and we want to get multiple 860 # values. We can't just iterate blindly because strings are 861 # iterable. 862 if isinstance(get, basestring): 863 pieces.append('GET') 864 pieces.append(get) 865 else: 866 for g in get: 867 pieces.append('GET') 868 pieces.append(g) 869 if desc: 870 pieces.append('DESC') 871 if alpha: 872 pieces.append('ALPHA') 873 if store is not None: 874 pieces.append('STORE') 875 pieces.append(store) 876 return self.execute_command('SORT', *pieces)
877 878 #### SET COMMANDS ####
879 - def sadd(self, name, *values):
880 "Add ``value(s)`` to set ``name``" 881 return self.execute_command('SADD', name, *values)
882
883 - def scard(self, name):
884 "Return the number of elements in set ``name``" 885 return self.execute_command('SCARD', name)
886
887 - def sdiff(self, keys, *args):
888 "Return the difference of sets specified by ``keys``" 889 args = list_or_args(keys, args) 890 return self.execute_command('SDIFF', *args)
891
892 - def sdiffstore(self, dest, keys, *args):
893 """ 894 Store the difference of sets specified by ``keys`` into a new 895 set named ``dest``. Returns the number of keys in the new set. 896 """ 897 args = list_or_args(keys, args) 898 return self.execute_command('SDIFFSTORE', dest, *args)
899
900 - def sinter(self, keys, *args):
901 "Return the intersection of sets specified by ``keys``" 902 args = list_or_args(keys, args) 903 return self.execute_command('SINTER', *args)
904
905 - def sinterstore(self, dest, keys, *args):
906 """ 907 Store the intersection of sets specified by ``keys`` into a new 908 set named ``dest``. Returns the number of keys in the new set. 909 """ 910 args = list_or_args(keys, args) 911 return self.execute_command('SINTERSTORE', dest, *args)
912
913 - def sismember(self, name, value):
914 "Return a boolean indicating if ``value`` is a member of set ``name``" 915 return self.execute_command('SISMEMBER', name, value)
916
917 - def smembers(self, name):
918 "Return all members of the set ``name``" 919 return self.execute_command('SMEMBERS', name)
920
921 - def smove(self, src, dst, value):
922 "Move ``value`` from set ``src`` to set ``dst`` atomically" 923 return self.execute_command('SMOVE', src, dst, value)
924
925 - def spop(self, name):
926 "Remove and return a random member of set ``name``" 927 return self.execute_command('SPOP', name)
928
929 - def srandmember(self, name):
930 "Return a random member of set ``name``" 931 return self.execute_command('SRANDMEMBER', name)
932
933 - def srem(self, name, *values):
934 "Remove ``values`` from set ``name``" 935 return self.execute_command('SREM', name, *values)
936
937 - def sunion(self, keys, *args):
938 "Return the union of sets specifiued by ``keys``" 939 args = list_or_args(keys, args) 940 return self.execute_command('SUNION', *args)
941
942 - def sunionstore(self, dest, keys, *args):
943 """ 944 Store the union of sets specified by ``keys`` into a new 945 set named ``dest``. Returns the number of keys in the new set. 946 """ 947 args = list_or_args(keys, args) 948 return self.execute_command('SUNIONSTORE', dest, *args)
949 950 #### SORTED SET COMMANDS ####
951 - def zadd(self, name, *args, **kwargs):
952 """ 953 Set any number of score, element-name pairs to the key ``name``. Pairs 954 can be specified in two ways: 955 956 As *args, in the form of: score1, name1, score2, name2, ... 957 or as **kwargs, in the form of: name1=score1, name2=score2, ... 958 959 The following example would add four values to the 'my-key' key: 960 redis.zadd('my-key', 1.1, 'name1', 2.2, 'name2', name3=3.3, name4=4.4) 961 """ 962 pieces = [] 963 if args: 964 if len(args) % 2 != 0: 965 raise RedisError("ZADD requires an equal number of " 966 "values and scores") 967 pieces.extend(args) 968 for pair in iteritems(kwargs): 969 pieces.append(pair[1]) 970 pieces.append(pair[0]) 971 return self.execute_command('ZADD', name, *pieces)
972
973 - def zcard(self, name):
974 "Return the number of elements in the sorted set ``name``" 975 return self.execute_command('ZCARD', name)
976
977 - def zcount(self, name, min, max):
978 return self.execute_command('ZCOUNT', name, min, max)
979
980 - def zincrby(self, name, value, amount=1):
981 "Increment the score of ``value`` in sorted set ``name`` by ``amount``" 982 return self.execute_command('ZINCRBY', name, amount, value)
983
984 - def zinterstore(self, dest, keys, aggregate=None):
985 """ 986 Intersect multiple sorted sets specified by ``keys`` into 987 a new sorted set, ``dest``. Scores in the destination will be 988 aggregated based on the ``aggregate``, or SUM if none is provided. 989 """ 990 return self._zaggregate('ZINTERSTORE', dest, keys, aggregate)
991
992 - def zrange(self, name, start, end, desc=False, withscores=False, 993 score_cast_func=float):
994 """ 995 Return a range of values from sorted set ``name`` between 996 ``start`` and ``end`` sorted in ascending order. 997 998 ``start`` and ``end`` can be negative, indicating the end of the range. 999 1000 ``desc`` a boolean indicating whether to sort the results descendingly 1001 1002 ``withscores`` indicates to return the scores along with the values. 1003 The return type is a list of (value, score) pairs 1004 1005 ``score_cast_func`` a callable used to cast the score return value 1006 """ 1007 if desc: 1008 return self.zrevrange(name, start, end, withscores, 1009 score_cast_func) 1010 pieces = ['ZRANGE', name, start, end] 1011 if withscores: 1012 pieces.append('withscores') 1013 options = { 1014 'withscores': withscores, 'score_cast_func': score_cast_func} 1015 return self.execute_command(*pieces, **options)
1016
1017 - def zrangebyscore(self, name, min, max, start=None, num=None, 1018 withscores=False, score_cast_func=float):
1019 """ 1020 Return a range of values from the sorted set ``name`` with scores 1021 between ``min`` and ``max``. 1022 1023 If ``start`` and ``num`` are specified, then return a slice 1024 of the range. 1025 1026 ``withscores`` indicates to return the scores along with the values. 1027 The return type is a list of (value, score) pairs 1028 1029 `score_cast_func`` a callable used to cast the score return value 1030 """ 1031 if (start is not None and num is None) or \ 1032 (num is not None and start is None): 1033 raise RedisError("``start`` and ``num`` must both be specified") 1034 pieces = ['ZRANGEBYSCORE', name, min, max] 1035 if start is not None and num is not None: 1036 pieces.extend(['LIMIT', start, num]) 1037 if withscores: 1038 pieces.append('withscores') 1039 options = { 1040 'withscores': withscores, 'score_cast_func': score_cast_func} 1041 return self.execute_command(*pieces, **options)
1042
1043 - def zrank(self, name, value):
1044 """ 1045 Returns a 0-based value indicating the rank of ``value`` in sorted set 1046 ``name`` 1047 """ 1048 return self.execute_command('ZRANK', name, value)
1049
1050 - def zrem(self, name, *values):
1051 "Remove member ``values`` from sorted set ``name``" 1052 return self.execute_command('ZREM', name, *values)
1053
1054 - def zremrangebyrank(self, name, min, max):
1055 """ 1056 Remove all elements in the sorted set ``name`` with ranks between 1057 ``min`` and ``max``. Values are 0-based, ordered from smallest score 1058 to largest. Values can be negative indicating the highest scores. 1059 Returns the number of elements removed 1060 """ 1061 return self.execute_command('ZREMRANGEBYRANK', name, min, max)
1062
1063 - def zremrangebyscore(self, name, min, max):
1064 """ 1065 Remove all elements in the sorted set ``name`` with scores 1066 between ``min`` and ``max``. Returns the number of elements removed. 1067 """ 1068 return self.execute_command('ZREMRANGEBYSCORE', name, min, max)
1069
1070 - def zrevrange(self, name, start, num, withscores=False, 1071 score_cast_func=float):
1072 """ 1073 Return a range of values from sorted set ``name`` between 1074 ``start`` and ``num`` sorted in descending order. 1075 1076 ``start`` and ``num`` can be negative, indicating the end of the range. 1077 1078 ``withscores`` indicates to return the scores along with the values 1079 The return type is a list of (value, score) pairs 1080 1081 ``score_cast_func`` a callable used to cast the score return value 1082 """ 1083 pieces = ['ZREVRANGE', name, start, num] 1084 if withscores: 1085 pieces.append('withscores') 1086 options = { 1087 'withscores': withscores, 'score_cast_func': score_cast_func} 1088 return self.execute_command(*pieces, **options)
1089
1090 - def zrevrangebyscore(self, name, max, min, start=None, num=None, 1091 withscores=False, score_cast_func=float):
1092 """ 1093 Return a range of values from the sorted set ``name`` with scores 1094 between ``min`` and ``max`` in descending order. 1095 1096 If ``start`` and ``num`` are specified, then return a slice 1097 of the range. 1098 1099 ``withscores`` indicates to return the scores along with the values. 1100 The return type is a list of (value, score) pairs 1101 1102 ``score_cast_func`` a callable used to cast the score return value 1103 """ 1104 if (start is not None and num is None) or \ 1105 (num is not None and start is None): 1106 raise RedisError("``start`` and ``num`` must both be specified") 1107 pieces = ['ZREVRANGEBYSCORE', name, max, min] 1108 if start is not None and num is not None: 1109 pieces.extend(['LIMIT', start, num]) 1110 if withscores: 1111 pieces.append('withscores') 1112 options = { 1113 'withscores': withscores, 'score_cast_func': score_cast_func} 1114 return self.execute_command(*pieces, **options)
1115
1116 - def zrevrank(self, name, value):
1117 """ 1118 Returns a 0-based value indicating the descending rank of 1119 ``value`` in sorted set ``name`` 1120 """ 1121 return self.execute_command('ZREVRANK', name, value)
1122
1123 - def zscore(self, name, value):
1124 "Return the score of element ``value`` in sorted set ``name``" 1125 return self.execute_command('ZSCORE', name, value)
1126
1127 - def zunionstore(self, dest, keys, aggregate=None):
1128 """ 1129 Union multiple sorted sets specified by ``keys`` into 1130 a new sorted set, ``dest``. Scores in the destination will be 1131 aggregated based on the ``aggregate``, or SUM if none is provided. 1132 """ 1133 return self._zaggregate('ZUNIONSTORE', dest, keys, aggregate)
1134
1135 - def _zaggregate(self, command, dest, keys, aggregate=None):
1136 pieces = [command, dest, len(keys)] 1137 if isinstance(keys, dict): 1138 keys, weights = dictkeys(keys), dictvalues(keys) 1139 else: 1140 weights = None 1141 pieces.extend(keys) 1142 if weights: 1143 pieces.append('WEIGHTS') 1144 pieces.extend(weights) 1145 if aggregate: 1146 pieces.append('AGGREGATE') 1147 pieces.append(aggregate) 1148 return self.execute_command(*pieces)
1149 1150 #### HASH COMMANDS ####
1151 - def hdel(self, name, *keys):
1152 "Delete ``keys`` from hash ``name``" 1153 return self.execute_command('HDEL', name, *keys)
1154
1155 - def hexists(self, name, key):
1156 "Returns a boolean indicating if ``key`` exists within hash ``name``" 1157 return self.execute_command('HEXISTS', name, key)
1158
1159 - def hget(self, name, key):
1160 "Return the value of ``key`` within the hash ``name``" 1161 return self.execute_command('HGET', name, key)
1162
1163 - def hgetall(self, name):
1164 "Return a Python dict of the hash's name/value pairs" 1165 return self.execute_command('HGETALL', name)
1166
1167 - def hincrby(self, name, key, amount=1):
1168 "Increment the value of ``key`` in hash ``name`` by ``amount``" 1169 return self.execute_command('HINCRBY', name, key, amount)
1170
1171 - def hkeys(self, name):
1172 "Return the list of keys within hash ``name``" 1173 return self.execute_command('HKEYS', name)
1174
1175 - def hlen(self, name):
1176 "Return the number of elements in hash ``name``" 1177 return self.execute_command('HLEN', name)
1178
1179 - def hset(self, name, key, value):
1180 """ 1181 Set ``key`` to ``value`` within hash ``name`` 1182 Returns 1 if HSET created a new field, otherwise 0 1183 """ 1184 return self.execute_command('HSET', name, key, value)
1185
1186 - def hsetnx(self, name, key, value):
1187 """ 1188 Set ``key`` to ``value`` within hash ``name`` if ``key`` does not 1189 exist. Returns 1 if HSETNX created a field, otherwise 0. 1190 """ 1191 return self.execute_command("HSETNX", name, key, value)
1192
1193 - def hmset(self, name, mapping):
1194 """ 1195 Sets each key in the ``mapping`` dict to its corresponding value 1196 in the hash ``name`` 1197 """ 1198 if not mapping: 1199 raise DataError("'hmset' with 'mapping' of length 0") 1200 items = [] 1201 for pair in iteritems(mapping): 1202 items.extend(pair) 1203 return self.execute_command('HMSET', name, *items)
1204
1205 - def hmget(self, name, keys, *args):
1206 "Returns a list of values ordered identically to ``keys``" 1207 args = list_or_args(keys, args) 1208 return self.execute_command('HMGET', name, *args)
1209
1210 - def hvals(self, name):
1211 "Return the list of values within hash ``name``" 1212 return self.execute_command('HVALS', name)
1213
1214 - def publish(self, channel, message):
1215 """ 1216 Publish ``message`` on ``channel``. 1217 Returns the number of subscribers the message was delivered to. 1218 """ 1219 return self.execute_command('PUBLISH', channel, message)
1220
1221 - def eval(self, script, numkeys, *keys_and_args):
1222 """ 1223 Execute the LUA ``script``, specifying the ``numkeys`` the script 1224 will touch and the key names and argument values in ``keys_and_args``. 1225 Returns the result of the script. 1226 1227 In practice, use the object returned by ``register_script``. This 1228 function exists purely for Redis API completion. 1229 """ 1230 return self.execute_command('EVAL', script, numkeys, *keys_and_args)
1231
1232 - def evalsha(self, sha, numkeys, *keys_and_args):
1233 """ 1234 Use the ``sha`` to execute a LUA script already registered via EVAL 1235 or SCRIPT LOAD. Specify the ``numkeys`` the script will touch and the 1236 key names and argument values in ``keys_and_args``. Returns the result 1237 of the script. 1238 1239 In practice, use the object returned by ``register_script``. This 1240 function exists purely for Redis API completion. 1241 """ 1242 return self.execute_command('EVALSHA', sha, numkeys, *keys_and_args)
1243
1244 - def script_exists(self, *args):
1245 """ 1246 Check if a script exists in the script cache by specifying the SHAs of 1247 each script as ``args``. Returns a list of boolean values indicating if 1248 if each already script exists in the cache. 1249 """ 1250 options = {'parse': 'EXISTS'} 1251 return self.execute_command('SCRIPT', 'EXISTS', *args, **options)
1252
1253 - def script_flush(self):
1254 "Flush all scripts from the script cache" 1255 options = {'parse': 'FLUSH'} 1256 return self.execute_command('SCRIPT', 'FLUSH', **options)
1257
1258 - def script_kill(self):
1259 "Kill the currently executing LUA script" 1260 options = {'parse': 'KILL'} 1261 return self.execute_command('SCRIPT', 'KILL', **options)
1262
1263 - def script_load(self, script):
1264 "Load a LUA ``script`` into the script cache. Returns the SHA." 1265 options = {'parse': 'LOAD'} 1266 return self.execute_command('SCRIPT', 'LOAD', script, **options)
1267
1268 - def register_script(self, script):
1269 """ 1270 Register a LUA ``script`` specifying the ``keys`` it will touch. 1271 Returns a Script object that is callable and hides the complexity of 1272 deal with scripts, keys, and shas. This is the preferred way to work 1273 with LUA scripts. 1274 """ 1275 return Script(self, script)
1276
1277 -class Redis(StrictRedis):
1278 """ 1279 Provides backwards compatibility with older versions of redis-py that 1280 changed arguments to some commands to be more Pythonic, sane, or by 1281 accident. 1282 """ 1283 # Overridden callbacks 1284 RESPONSE_CALLBACKS = dict_merge( 1285 StrictRedis.RESPONSE_CALLBACKS, 1286 { 1287 'TTL': lambda r: r != -1 and r or None, 1288 } 1289 ) 1290
1291 - def pipeline(self, transaction=True, shard_hint=None):
1292 """ 1293 Return a new pipeline object that can queue multiple commands for 1294 later execution. ``transaction`` indicates whether all commands 1295 should be executed atomically. Apart from making a group of operations 1296 atomic, pipelines are useful for reducing the back-and-forth overhead 1297 between the client and server. 1298 """ 1299 return Pipeline( 1300 self.connection_pool, 1301 self.response_callbacks, 1302 transaction, 1303 shard_hint)
1304
1305 - def setex(self, name, value, time):
1306 """ 1307 Set the value of key ``name`` to ``value`` that expires in ``time`` 1308 seconds. ``time`` can be represented by an integer or a Python 1309 timedelta object. 1310 """ 1311 if isinstance(time, datetime.timedelta): 1312 time = int(time.total_seconds()) 1313 return self.execute_command('SETEX', name, time, value)
1314
1315 - def lrem(self, name, value, num=0):
1316 """ 1317 Remove the first ``num`` occurrences of elements equal to ``value`` 1318 from the list stored at ``name``. 1319 1320 The ``num`` argument influences the operation in the following ways: 1321 num > 0: Remove elements equal to value moving from head to tail. 1322 num < 0: Remove elements equal to value moving from tail to head. 1323 num = 0: Remove all elements equal to value. 1324 """ 1325 return self.execute_command('LREM', name, num, value)
1326
1327 - def zadd(self, name, *args, **kwargs):
1328 """ 1329 NOTE: The order of arguments differs from that of the official ZADD 1330 command. For backwards compatability, this method accepts arguments 1331 in the form of name1, score1, name2, score2, while the official Redis 1332 documents expects score1, name1, score2, name2. 1333 1334 If you're looking to use the standard syntax, consider using the 1335 StrictRedis class. See the API Reference section of the docs for more 1336 information. 1337 1338 Set any number of element-name, score pairs to the key ``name``. Pairs 1339 can be specified in two ways: 1340 1341 As *args, in the form of: name1, score1, name2, score2, ... 1342 or as **kwargs, in the form of: name1=score1, name2=score2, ... 1343 1344 The following example would add four values to the 'my-key' key: 1345 redis.zadd('my-key', 'name1', 1.1, 'name2', 2.2, name3=3.3, name4=4.4) 1346 """ 1347 pieces = [] 1348 if args: 1349 if len(args) % 2 != 0: 1350 raise RedisError("ZADD requires an equal number of " 1351 "values and scores") 1352 pieces.extend(reversed(args)) 1353 for pair in iteritems(kwargs): 1354 pieces.append(pair[1]) 1355 pieces.append(pair[0]) 1356 return self.execute_command('ZADD', name, *pieces)
1357
1358 1359 -class PubSub(object):
1360 """ 1361 PubSub provides publish, subscribe and listen support to Redis channels. 1362 1363 After subscribing to one or more channels, the listen() method will block 1364 until a message arrives on one of the subscribed channels. That message 1365 will be returned and it's safe to start listening again. 1366 """
1367 - def __init__(self, connection_pool, shard_hint=None):
1368 self.connection_pool = connection_pool 1369 self.shard_hint = shard_hint 1370 self.connection = None 1371 self.channels = set() 1372 self.patterns = set() 1373 self.subscription_count = 0 1374 self.subscribe_commands = set( 1375 ('subscribe', 'psubscribe', 'unsubscribe', 'punsubscribe') 1376 )
1377
1378 - def __del__(self):
1379 try: 1380 # if this object went out of scope prior to shutting down 1381 # subscriptions, close the connection manually before 1382 # returning it to the connection pool 1383 if self.connection and (self.channels or self.patterns): 1384 self.connection.disconnect() 1385 self.reset() 1386 except: 1387 pass
1388
1389 - def reset(self):
1390 if self.connection: 1391 self.connection.disconnect() 1392 self.connection_pool.release(self.connection) 1393 self.connection = None
1394
1395 - def close(self):
1396 self.reset()
1397
1398 - def execute_command(self, *args, **kwargs):
1399 "Execute a publish/subscribe command" 1400 1401 # NOTE: don't parse the response in this function. it could pull a 1402 # legitmate message off the stack if the connection is already 1403 # subscribed to one or more channels 1404 1405 if self.connection is None: 1406 self.connection = self.connection_pool.get_connection( 1407 'pubsub', 1408 self.shard_hint 1409 ) 1410 connection = self.connection 1411 try: 1412 connection.send_command(*args) 1413 except ConnectionError: 1414 connection.disconnect() 1415 # Connect manually here. If the Redis server is down, this will 1416 # fail and raise a ConnectionError as desired. 1417 connection.connect() 1418 # resubscribe to all channels and patterns before 1419 # resending the current command 1420 for channel in self.channels: 1421 self.subscribe(channel) 1422 for pattern in self.patterns: 1423 self.psubscribe(pattern) 1424 connection.send_command(*args)
1425
1426 - def parse_response(self):
1427 "Parse the response from a publish/subscribe command" 1428 response = self.connection.read_response() 1429 if nativestr(response[0]) in self.subscribe_commands: 1430 self.subscription_count = response[2] 1431 # if we've just unsubscribed from the remaining channels, 1432 # release the connection back to the pool 1433 if not self.subscription_count: 1434 self.reset() 1435 return response
1436
1437 - def psubscribe(self, patterns):
1438 "Subscribe to all channels matching any pattern in ``patterns``" 1439 if isinstance(patterns, basestring): 1440 patterns = [patterns] 1441 for pattern in patterns: 1442 self.patterns.add(pattern) 1443 return self.execute_command('PSUBSCRIBE', *patterns)
1444
1445 - def punsubscribe(self, patterns=[]):
1446 """ 1447 Unsubscribe from any channel matching any pattern in ``patterns``. 1448 If empty, unsubscribe from all channels. 1449 """ 1450 if isinstance(patterns, basestring): 1451 patterns = [patterns] 1452 for pattern in patterns: 1453 try: 1454 self.patterns.remove(pattern) 1455 except KeyError: 1456 pass 1457 return self.execute_command('PUNSUBSCRIBE', *patterns)
1458
1459 - def subscribe(self, channels):
1460 "Subscribe to ``channels``, waiting for messages to be published" 1461 if isinstance(channels, basestring): 1462 channels = [channels] 1463 for channel in channels: 1464 self.channels.add(channel) 1465 return self.execute_command('SUBSCRIBE', *channels)
1466
1467 - def unsubscribe(self, channels=[]):
1468 """ 1469 Unsubscribe from ``channels``. If empty, unsubscribe 1470 from all channels 1471 """ 1472 if isinstance(channels, basestring): 1473 channels = [channels] 1474 for channel in channels: 1475 try: 1476 self.channels.remove(channel) 1477 except KeyError: 1478 pass 1479 return self.execute_command('UNSUBSCRIBE', *channels)
1480
1481 - def listen(self):
1482 "Listen for messages on channels this client has been subscribed to" 1483 while self.subscription_count or self.channels or self.patterns: 1484 if self.connection == None: 1485 break 1486 1487 r = self.parse_response() 1488 msg_type = nativestr(r[0]) 1489 if msg_type == 'pmessage': 1490 msg = { 1491 'type': msg_type, 1492 'pattern': nativestr(r[1]), 1493 'channel': nativestr(r[2]), 1494 'data': r[3] 1495 } 1496 else: 1497 msg = { 1498 'type': msg_type, 1499 'pattern': None, 1500 'channel': nativestr(r[1]), 1501 'data': r[2] 1502 } 1503 yield msg
1504
1505 1506 -class BasePipeline(object):
1507 """ 1508 Pipelines provide a way to transmit multiple commands to the Redis server 1509 in one transmission. This is convenient for batch processing, such as 1510 saving all the values in a list to Redis. 1511 1512 All commands executed within a pipeline are wrapped with MULTI and EXEC 1513 calls. This guarantees all commands executed in the pipeline will be 1514 executed atomically. 1515 1516 Any command raising an exception does *not* halt the execution of 1517 subsequent commands in the pipeline. Instead, the exception is caught 1518 and its instance is placed into the response list returned by execute(). 1519 Code iterating over the response list should be able to deal with an 1520 instance of an exception as a potential value. In general, these will be 1521 ResponseError exceptions, such as those raised when issuing a command 1522 on a key of a different datatype. 1523 """ 1524 1525 UNWATCH_COMMANDS = set(('DISCARD', 'EXEC', 'UNWATCH')) 1526
1527 - def __init__(self, connection_pool, response_callbacks, transaction, 1528 shard_hint):
1529 self.connection_pool = connection_pool 1530 self.connection = None 1531 self.response_callbacks = response_callbacks 1532 self.transaction = transaction 1533 self.shard_hint = shard_hint 1534 1535 self.watching = False 1536 self.reset()
1537
1538 - def __enter__(self):
1539 return self
1540
1541 - def __exit__(self, exc_type, exc_value, traceback):
1542 self.reset()
1543
1544 - def __del__(self):
1545 try: 1546 self.reset() 1547 except: 1548 pass
1549
1550 - def reset(self):
1551 self.command_stack = [] 1552 self.scripts = set() 1553 # make sure to reset the connection state in the event that we were 1554 # watching something 1555 if self.watching and self.connection: 1556 try: 1557 # call this manually since our unwatch or 1558 # immediate_execute_command methods can call reset() 1559 self.connection.send_command('UNWATCH') 1560 self.connection.read_response() 1561 except ConnectionError: 1562 # disconnect will also remove any previous WATCHes 1563 self.connection.disconnect() 1564 # clean up the other instance attributes 1565 self.watching = False 1566 self.explicit_transaction = False 1567 # we can safely return the connection to the pool here since we're 1568 # sure we're no longer WATCHing anything 1569 if self.connection: 1570 self.connection_pool.release(self.connection) 1571 self.connection = None
1572
1573 - def multi(self):
1574 """ 1575 Start a transactional block of the pipeline after WATCH commands 1576 are issued. End the transactional block with `execute`. 1577 """ 1578 if self.explicit_transaction: 1579 raise RedisError('Cannot issue nested calls to MULTI') 1580 if self.command_stack: 1581 raise RedisError('Commands without an initial WATCH have already ' 1582 'been issued') 1583 self.explicit_transaction = True
1584
1585 - def execute_command(self, *args, **kwargs):
1586 if (self.watching or args[0] == 'WATCH') and \ 1587 not self.explicit_transaction: 1588 return self.immediate_execute_command(*args, **kwargs) 1589 return self.pipeline_execute_command(*args, **kwargs)
1590
1591 - def immediate_execute_command(self, *args, **options):
1592 """ 1593 Execute a command immediately, but don't auto-retry on a 1594 ConnectionError if we're already WATCHing a variable. Used when 1595 issuing WATCH or subsequent commands retrieving their values but before 1596 MULTI is called. 1597 """ 1598 command_name = args[0] 1599 conn = self.connection 1600 # if this is the first call, we need a connection 1601 if not conn: 1602 conn = self.connection_pool.get_connection(command_name, 1603 self.shard_hint) 1604 self.connection = conn 1605 try: 1606 conn.send_command(*args) 1607 return self.parse_response(conn, command_name, **options) 1608 except ConnectionError: 1609 conn.disconnect() 1610 # if we're not already watching, we can safely retry the command 1611 # assuming it was a connection timeout 1612 if not self.watching: 1613 conn.send_command(*args) 1614 return self.parse_response(conn, command_name, **options) 1615 self.reset() 1616 raise
1617
1618 - def pipeline_execute_command(self, *args, **options):
1619 """ 1620 Stage a command to be executed when execute() is next called 1621 1622 Returns the current Pipeline object back so commands can be 1623 chained together, such as: 1624 1625 pipe = pipe.set('foo', 'bar').incr('baz').decr('bang') 1626 1627 At some other point, you can then run: pipe.execute(), 1628 which will execute all commands queued in the pipe. 1629 """ 1630 self.command_stack.append((args, options)) 1631 return self
1632
1633 - def _execute_transaction(self, connection, commands):
1634 all_cmds = SYM_EMPTY.join( 1635 starmap(connection.pack_command, 1636 [args for args, options in commands])) 1637 connection.send_packed_command(all_cmds) 1638 # we don't care about the multi/exec any longer 1639 commands = commands[1:-1] 1640 # parse off the response for MULTI and all commands prior to EXEC. 1641 # the only data we care about is the response the EXEC 1642 # which is the last command 1643 for i in range(len(commands) + 1): 1644 self.parse_response(connection, '_') 1645 # parse the EXEC. 1646 response = self.parse_response(connection, '_') 1647 1648 if response is None: 1649 raise WatchError("Watched variable changed.") 1650 1651 if len(response) != len(commands): 1652 raise ResponseError("Wrong number of response items from " 1653 "pipeline execution") 1654 # We have to run response callbacks manually 1655 data = [] 1656 for r, cmd in izip(response, commands): 1657 if not isinstance(r, Exception): 1658 args, options = cmd 1659 command_name = args[0] 1660 if command_name in self.response_callbacks: 1661 r = self.response_callbacks[command_name](r, **options) 1662 data.append(r) 1663 return data
1664
1665 - def _execute_pipeline(self, connection, commands):
1666 # build up all commands into a single request to increase network perf 1667 all_cmds = SYM_EMPTY.join( 1668 starmap(connection.pack_command, 1669 [args for args, options in commands])) 1670 connection.send_packed_command(all_cmds) 1671 return [self.parse_response(connection, args[0], **options) 1672 for args, options in commands]
1673
1674 - def parse_response(self, connection, command_name, **options):
1675 result = StrictRedis.parse_response( 1676 self, connection, command_name, **options) 1677 if command_name in self.UNWATCH_COMMANDS: 1678 self.watching = False 1679 elif command_name == 'WATCH': 1680 self.watching = True 1681 return result
1682
1683 - def load_scripts(self):
1684 # make sure all scripts that are about to be run on this pipeline exist 1685 scripts = list(self.scripts) 1686 immediate = self.immediate_execute_command 1687 shas = [s.sha for s in scripts] 1688 exists = immediate('SCRIPT', 'EXISTS', *shas, **{'parse': 'EXISTS'}) 1689 if not all(exists): 1690 for s, exist in izip(scripts, exists): 1691 if not exist: 1692 immediate('SCRIPT', 'LOAD', s.script, **{'parse': 'LOAD'})
1693
1694 - def execute(self):
1695 "Execute all the commands in the current pipeline" 1696 if self.scripts: 1697 self.load_scripts() 1698 stack = self.command_stack 1699 if self.transaction or self.explicit_transaction: 1700 stack = [(('MULTI', ), {})] + stack + [(('EXEC', ), {})] 1701 execute = self._execute_transaction 1702 else: 1703 execute = self._execute_pipeline 1704 1705 conn = self.connection 1706 if not conn: 1707 conn = self.connection_pool.get_connection('MULTI', 1708 self.shard_hint) 1709 # assign to self.connection so reset() releases the connection 1710 # back to the pool after we're done 1711 self.connection = conn 1712 1713 try: 1714 return execute(conn, stack) 1715 except ConnectionError: 1716 conn.disconnect() 1717 # if we were watching a variable, the watch is no longer valid 1718 # since this connection has died. raise a WatchError, which 1719 # indicates the user should retry his transaction. If this is more 1720 # than a temporary failure, the WATCH that the user next issue 1721 # will fail, propegating the real ConnectionError 1722 if self.watching: 1723 raise WatchError("A ConnectionError occured on while watching " 1724 "one or more keys") 1725 # otherwise, it's safe to retry since the transaction isn't 1726 # predicated on any state 1727 return execute(conn, stack) 1728 finally: 1729 self.reset()
1730
1731 - def watch(self, *names):
1732 "Watches the values at keys ``names``" 1733 if self.explicit_transaction: 1734 raise RedisError('Cannot issue a WATCH after a MULTI') 1735 return self.execute_command('WATCH', *names)
1736
1737 - def unwatch(self):
1738 "Unwatches all previously specified keys" 1739 return self.watching and self.execute_command('UNWATCH') or True
1740
1741 - def script_load_for_pipeline(self, script):
1742 "Make sure scripts are loaded prior to pipeline execution" 1743 self.scripts.add(script)
1744
1745 1746 -class StrictPipeline(BasePipeline, StrictRedis):
1747 "Pipeline for the StrictRedis class" 1748 pass
1749
1750 1751 -class Pipeline(BasePipeline, Redis):
1752 "Pipeline for the Redis class" 1753 pass
1754
1755 1756 -class Script(object):
1757 "An executable LUA script object returned by ``register_script``" 1758
1759 - def __init__(self, registered_client, script):
1760 self.registered_client = registered_client 1761 self.script = script 1762 self.sha = registered_client.script_load(script)
1763
1764 - def __call__(self, keys=[], args=[], client=None):
1765 "Execute the script, passing any required ``args``" 1766 client = client or self.registered_client 1767 args = tuple(keys) + tuple(args) 1768 # make sure the Redis server knows about the script 1769 if isinstance(client, BasePipeline): 1770 # make sure this script is good to go on pipeline 1771 client.script_load_for_pipeline(self) 1772 try: 1773 return client.evalsha(self.sha, len(keys), *args) 1774 except NoScriptError: 1775 # Maybe the client is pointed to a differnet server than the client 1776 # that created this instance? 1777 self.sha = client.script_load(self.script) 1778 return client.evalsha(self.sha, len(keys), *args)
1779
1780 1781 -class LockError(RedisError):
1782 "Errors thrown from the Lock" 1783 pass
1784
1785 1786 -class Lock(object):
1787 """ 1788 A shared, distributed Lock. Using Redis for locking allows the Lock 1789 to be shared across processes and/or machines. 1790 1791 It's left to the user to resolve deadlock issues and make sure 1792 multiple clients play nicely together. 1793 """ 1794 1795 LOCK_FOREVER = float(2 ** 31 + 1) # 1 past max unix time 1796
1797 - def __init__(self, redis, name, timeout=None, sleep=0.1):
1798 """ 1799 Create a new Lock instnace named ``name`` using the Redis client 1800 supplied by ``redis``. 1801 1802 ``timeout`` indicates a maximum life for the lock. 1803 By default, it will remain locked until release() is called. 1804 1805 ``sleep`` indicates the amount of time to sleep per loop iteration 1806 when the lock is in blocking mode and another client is currently 1807 holding the lock. 1808 1809 Note: If using ``timeout``, you should make sure all the hosts 1810 that are running clients have their time synchronized with a network 1811 time service like ntp. 1812 """ 1813 self.redis = redis 1814 self.name = name 1815 self.acquired_until = None 1816 self.timeout = timeout 1817 self.sleep = sleep 1818 if self.timeout and self.sleep > self.timeout: 1819 raise LockError("'sleep' must be less than 'timeout'")
1820
1821 - def __enter__(self):
1822 return self.acquire()
1823
1824 - def __exit__(self, exc_type, exc_value, traceback):
1825 self.release()
1826
1827 - def acquire(self, blocking=True):
1828 """ 1829 Use Redis to hold a shared, distributed lock named ``name``. 1830 Returns True once the lock is acquired. 1831 1832 If ``blocking`` is False, always return immediately. If the lock 1833 was acquired, return True, otherwise return False. 1834 """ 1835 sleep = self.sleep 1836 timeout = self.timeout 1837 while 1: 1838 unixtime = int(mod_time.time()) 1839 if timeout: 1840 timeout_at = unixtime + timeout 1841 else: 1842 timeout_at = Lock.LOCK_FOREVER 1843 timeout_at = float(timeout_at) 1844 if self.redis.setnx(self.name, timeout_at): 1845 self.acquired_until = timeout_at 1846 return True 1847 # We want blocking, but didn't acquire the lock 1848 # check to see if the current lock is expired 1849 existing = float(self.redis.get(self.name) or 1) 1850 if existing < unixtime: 1851 # the previous lock is expired, attempt to overwrite it 1852 existing = float(self.redis.getset(self.name, timeout_at) or 1) 1853 if existing < unixtime: 1854 # we successfully acquired the lock 1855 self.acquired_until = timeout_at 1856 return True 1857 if not blocking: 1858 return False 1859 mod_time.sleep(sleep)
1860
1861 - def release(self):
1862 "Releases the already acquired lock" 1863 if self.acquired_until is None: 1864 raise ValueError("Cannot release an unlocked lock") 1865 existing = float(self.redis.get(self.name) or 1) 1866 # if the lock time is in the future, delete the lock 1867 if existing >= self.acquired_until: 1868 self.redis.delete(self.name) 1869 self.acquired_until = None
1870