1 from __future__ import with_statement
2 from itertools import starmap
3 import datetime
4 import warnings
5 import time as mod_time
6 from redis._compat import (b, izip, imap, iteritems, dictkeys, dictvalues,
7 basestring, long, nativestr, urlparse)
8 from redis.connection import ConnectionPool, UnixDomainSocketConnection
9 from redis.exceptions import (
10 ConnectionError,
11 DataError,
12 RedisError,
13 ResponseError,
14 WatchError,
15 NoScriptError
16 )
17
18 SYM_EMPTY = b('')
22
23 try:
24 iter(keys)
25
26
27 if isinstance(keys, basestring):
28 keys = [keys]
29 except TypeError:
30 keys = [keys]
31 if args:
32 keys.extend(args)
33 return keys
34
37 "Converts a unix timestamp to a Python datetime object"
38 if not response:
39 return None
40 try:
41 response = int(response)
42 except ValueError:
43 return None
44 return datetime.datetime.fromtimestamp(response)
45
48 return dict.fromkeys(key_string.split(), callback)
49
52 merged = {}
53 [merged.update(d) for d in dicts]
54 return merged
55
58 "Parse the results of Redis's DEBUG OBJECT command into a Python dict"
59
60
61 response = nativestr(response)
62 response = 'type:' + response
63 response = dict([kv.split(':') for kv in response.split()])
64
65
66
67 int_fields = ('refcount', 'serializedlength', 'lru', 'lru_seconds_idle')
68 for field in int_fields:
69 if field in response:
70 response[field] = int(response[field])
71
72 return response
73
76 "Parse the results of an OBJECT command"
77 if infotype in ('idletime', 'refcount'):
78 return int(response)
79 return response
80
83 "Parse the result of Redis's INFO command into a Python dict"
84 info = {}
85 response = nativestr(response)
86
87 def get_value(value):
88 if ',' not in value or '=' not in value:
89 return value
90
91 sub_dict = {}
92 for item in value.split(','):
93 k, v = item.rsplit('=', 1)
94 try:
95 sub_dict[k] = int(v)
96 except ValueError:
97 sub_dict[k] = v
98 return sub_dict
99
100 for line in response.splitlines():
101 if line and not line.startswith('#'):
102 key, value = line.split(':')
103 try:
104 if '.' in value:
105 info[key] = float(value)
106 else:
107 info[key] = int(value)
108 except ValueError:
109 info[key] = get_value(value)
110 return info
111
114 "Create a dict given a list of key/value pairs"
115 it = iter(response)
116 return dict(izip(it, it))
117
120 """
121 If ``withscores`` is specified in the options, return the response as
122 a list of (value, score) pairs
123 """
124 if not response or not options['withscores']:
125 return response
126 score_cast_func = options.get('score_cast_func', float)
127 it = iter(response)
128 return list(izip(it, imap(score_cast_func, it)))
129
132 if response is None:
133 return None
134 return int(response)
135
138 if response is None:
139 return None
140 return float(response)
141
144 if options['parse'] == 'GET':
145 response = [nativestr(i) if i is not None else None for i in response]
146 return response and pairs_to_dict(response) or {}
147 return nativestr(response) == 'OK'
148
151 parse = options['parse']
152 if parse in ('FLUSH', 'KILL'):
153 return response == 'OK'
154 if parse == 'EXISTS':
155 return list(imap(bool, response))
156 return response
157
160 """
161 Implementation of the Redis protocol.
162
163 This abstract class provides a Python interface to all Redis commands
164 and an implementation of the Redis protocol.
165
166 Connection and Pipeline derive from this, implementing how
167 the commands are sent and received to the Redis server
168 """
169 RESPONSE_CALLBACKS = dict_merge(
170 string_keys_to_dict(
171 'AUTH DEL EXISTS EXPIRE EXPIREAT HDEL HEXISTS HMSET MOVE MSETNX '
172 'PERSIST RENAMENX SISMEMBER SMOVE SETEX SETNX SREM ZREM',
173 bool
174 ),
175 string_keys_to_dict(
176 'BITCOUNT DECRBY GETBIT HLEN INCRBY LINSERT LLEN LPUSHX RPUSHX '
177 'SADD SCARD SDIFFSTORE SETBIT SETRANGE SINTERSTORE STRLEN '
178 'SUNIONSTORE ZADD ZCARD ZREMRANGEBYRANK ZREMRANGEBYSCORE',
179 int
180 ),
181 string_keys_to_dict(
182
183 'LPUSH RPUSH',
184 lambda r: isinstance(r, long) and r or nativestr(r) == 'OK'
185 ),
186 string_keys_to_dict('ZSCORE ZINCRBY', float_or_none),
187 string_keys_to_dict(
188 'FLUSHALL FLUSHDB LSET LTRIM MSET RENAME '
189 'SAVE SELECT SET SHUTDOWN SLAVEOF WATCH UNWATCH',
190 lambda r: nativestr(r) == 'OK'
191 ),
192 string_keys_to_dict('BLPOP BRPOP', lambda r: r and tuple(r) or None),
193 string_keys_to_dict(
194 'SDIFF SINTER SMEMBERS SUNION',
195 lambda r: r and set(r) or set()
196 ),
197 string_keys_to_dict(
198 'ZRANGE ZRANGEBYSCORE ZREVRANGE ZREVRANGEBYSCORE',
199 zset_score_pairs
200 ),
201 string_keys_to_dict('ZRANK ZREVRANK', int_or_none),
202 {
203 'BGREWRITEAOF': (
204 lambda r: r == 'Background rewriting of AOF file started'
205 ),
206 'BGSAVE': lambda r: r == 'Background saving started',
207 'BRPOPLPUSH': lambda r: r and r or None,
208 'CONFIG': parse_config,
209 'DEBUG': parse_debug_object,
210 'HGETALL': lambda r: r and pairs_to_dict(r) or {},
211 'INFO': parse_info,
212 'LASTSAVE': timestamp_to_datetime,
213 'OBJECT': parse_object,
214 'PING': lambda r: nativestr(r) == 'PONG',
215 'RANDOMKEY': lambda r: r and r or None,
216 'SCRIPT': parse_script,
217 'TIME': lambda x: (int(x[0]), int(x[1]))
218 }
219 )
220
221 @classmethod
222 - def from_url(cls, url, db=None, **kwargs):
223 """
224 Return a Redis client object configured from the given URL.
225
226 For example::
227
228 redis://username:password@localhost:6379/0
229
230 If ``db`` is None, this method will attempt to extract the database ID
231 from the URL path component.
232
233 Any additional keyword arguments will be passed along to the Redis
234 class's initializer.
235 """
236 url = urlparse(url)
237
238
239 assert url.scheme == 'redis' or not url.scheme
240
241
242 if db is None:
243 try:
244 db = int(url.path.replace('/', ''))
245 except (AttributeError, ValueError):
246 db = 0
247
248 return cls(host=url.hostname, port=url.port, db=db,
249 password=url.password, **kwargs)
250
251 - def __init__(self, host='localhost', port=6379,
252 db=0, password=None, socket_timeout=None,
253 connection_pool=None, charset='utf-8',
254 errors='strict', decode_responses=False,
255 unix_socket_path=None):
256 if not connection_pool:
257 kwargs = {
258 'db': db,
259 'password': password,
260 'socket_timeout': socket_timeout,
261 'encoding': charset,
262 'encoding_errors': errors,
263 'decode_responses': decode_responses,
264 }
265
266 if unix_socket_path:
267 kwargs.update({
268 'path': unix_socket_path,
269 'connection_class': UnixDomainSocketConnection
270 })
271 else:
272 kwargs.update({
273 'host': host,
274 'port': port
275 })
276 connection_pool = ConnectionPool(**kwargs)
277 self.connection_pool = connection_pool
278
279 self.response_callbacks = self.__class__.RESPONSE_CALLBACKS.copy()
280
282 "Set a custom Response Callback"
283 self.response_callbacks[command] = callback
284
285 - def pipeline(self, transaction=True, shard_hint=None):
286 """
287 Return a new pipeline object that can queue multiple commands for
288 later execution. ``transaction`` indicates whether all commands
289 should be executed atomically. Apart from making a group of operations
290 atomic, pipelines are useful for reducing the back-and-forth overhead
291 between the client and server.
292 """
293 return StrictPipeline(
294 self.connection_pool,
295 self.response_callbacks,
296 transaction,
297 shard_hint)
298
300 """
301 Convenience method for executing the callable `func` as a transaction
302 while watching all keys specified in `watches`. The 'func' callable
303 should expect a single arguement which is a Pipeline object.
304 """
305 shard_hint = kwargs.pop('shard_hint', None)
306 with self.pipeline(True, shard_hint) as pipe:
307 while 1:
308 try:
309 if watches:
310 pipe.watch(*watches)
311 func(pipe)
312 return pipe.execute()
313 except WatchError:
314 continue
315
316 - def lock(self, name, timeout=None, sleep=0.1):
317 """
318 Return a new Lock object using key ``name`` that mimics
319 the behavior of threading.Lock.
320
321 If specified, ``timeout`` indicates a maximum life for the lock.
322 By default, it will remain locked until release() is called.
323
324 ``sleep`` indicates the amount of time to sleep per loop iteration
325 when the lock is in blocking mode and another client is currently
326 holding the lock.
327 """
328 return Lock(self, name, timeout=timeout, sleep=sleep)
329
330 - def pubsub(self, shard_hint=None):
331 """
332 Return a Publish/Subscribe object. With this object, you can
333 subscribe to channels and listen for messages that get published to
334 them.
335 """
336 return PubSub(self.connection_pool, shard_hint)
337
338
353
355 "Parses a response from the Redis server"
356 response = connection.read_response()
357 if command_name in self.response_callbacks:
358 return self.response_callbacks[command_name](response, **options)
359 return response
360
361
363 "Tell the Redis server to rewrite the AOF file from data in memory."
364 return self.execute_command('BGREWRITEAOF')
365
367 """
368 Tell the Redis server to save its data to disk. Unlike save(),
369 this method is asynchronous and returns immediately.
370 """
371 return self.execute_command('BGSAVE')
372
374 "Return a dictionary of configuration based on the ``pattern``"
375 return self.execute_command('CONFIG', 'GET', pattern, parse='GET')
376
378 "Set config item ``name`` with ``value``"
379 return self.execute_command('CONFIG', 'SET', name, value, parse='SET')
380
382 "Returns the number of keys in the current database"
383 return self.execute_command('DBSIZE')
384
386 """
387 Returns the server time as a 2-item tuple of ints:
388 (seconds since epoch, microseconds into this second).
389 """
390 return self.execute_command('TIME')
391
393 "Returns version specific metainformation about a give key"
394 return self.execute_command('DEBUG', 'OBJECT', key)
395
397 "Delete one or more keys specified by ``names``"
398 return self.execute_command('DEL', *names)
399 __delitem__ = delete
400
401 - def echo(self, value):
402 "Echo the string back from the server"
403 return self.execute_command('ECHO', value)
404
406 "Delete all keys in all databases on the current host"
407 return self.execute_command('FLUSHALL')
408
410 "Delete all keys in the current database"
411 return self.execute_command('FLUSHDB')
412
414 "Returns a dictionary containing information about the Redis server"
415 return self.execute_command('INFO')
416
418 """
419 Return a Python datetime object representing the last time the
420 Redis database was saved to disk
421 """
422 return self.execute_command('LASTSAVE')
423
424 - def object(self, infotype, key):
425 "Return the encoding, idletime, or refcount about the key"
426 return self.execute_command('OBJECT', infotype, key, infotype=infotype)
427
431
433 """
434 Tell the Redis server to save its data to disk,
435 blocking until the save is complete
436 """
437 return self.execute_command('SAVE')
438
447
448 - def slaveof(self, host=None, port=None):
449 """
450 Set the server to be a replicated slave of the instance identified
451 by the ``host`` and ``port``. If called without arguements, the
452 instance is promoted to a master instead.
453 """
454 if host is None and port is None:
455 return self.execute_command("SLAVEOF", "NO", "ONE")
456 return self.execute_command("SLAVEOF", host, port)
457
458
459 - def append(self, key, value):
460 """
461 Appends the string ``value`` to the value at ``key``. If ``key``
462 doesn't already exist, create it with a value of ``value``.
463 Returns the new length of the value at ``key``.
464 """
465 return self.execute_command('APPEND', key, value)
466
468 """
469 Returns the substring of the string value stored at ``key``,
470 determined by the offsets ``start`` and ``end`` (both are inclusive)
471 """
472 return self.execute_command('GETRANGE', key, start, end)
473
474 - def bitcount(self, key, start=None, end=None):
475 """
476 Returns the count of set bits in the value of ``key``. Optional
477 ``start`` and ``end`` paramaters indicate which bytes to consider
478 """
479 params = [key]
480 if start and end:
481 params.append(start)
482 params.append(end)
483 elif (start and not end) or (end and not start):
484 raise RedisError("Both start and end must be specified")
485 return self.execute_command('BITCOUNT', *params)
486
487 - def bitop(self, operation, dest, *keys):
488 """
489 Perform a bitwise operation using ``operation`` between ``keys`` and
490 store the result in ``dest``.
491 """
492 return self.execute_command('BITOP', operation, dest, *keys)
493
494 - def decr(self, name, amount=1):
495 """
496 Decrements the value of ``key`` by ``amount``. If no key exists,
497 the value will be initialized as 0 - ``amount``
498 """
499 return self.execute_command('DECRBY', name, amount)
500
502 "Returns a boolean indicating whether key ``name`` exists"
503 return self.execute_command('EXISTS', name)
504 __contains__ = exists
505
506 - def expire(self, name, time):
507 """
508 Set an expire flag on key ``name`` for ``time`` seconds. ``time``
509 can be represented by an integer or a Python timedelta object.
510 """
511 if isinstance(time, datetime.timedelta):
512 time = int(time.total_seconds())
513 return self.execute_command('EXPIRE', name, time)
514
516 """
517 Set an expire flag on key ``name``. ``when`` can be represented
518 as an integer indicating unix time or a Python datetime object.
519 """
520 if isinstance(when, datetime.datetime):
521 when = int(mod_time.mktime(when.timetuple()))
522 return self.execute_command('EXPIREAT', name, when)
523
524 - def get(self, name):
525 """
526 Return the value at key ``name``, or None if the key doesn't exist
527 """
528 return self.execute_command('GET', name)
529
531 """
532 Return the value at key ``name``, raises a KeyError if the key
533 doesn't exist.
534 """
535 value = self.get(name)
536 if value:
537 return value
538 raise KeyError(name)
539
540 - def getbit(self, name, offset):
541 "Returns a boolean indicating the value of ``offset`` in ``name``"
542 return self.execute_command('GETBIT', name, offset)
543
544 - def getset(self, name, value):
545 """
546 Set the value at key ``name`` to ``value`` if key doesn't exist
547 Return the value at key ``name`` atomically
548 """
549 return self.execute_command('GETSET', name, value)
550
551 - def incr(self, name, amount=1):
552 """
553 Increments the value of ``key`` by ``amount``. If no key exists,
554 the value will be initialized as ``amount``
555 """
556 return self.execute_command('INCRBY', name, amount)
557
558 - def keys(self, pattern='*'):
559 "Returns a list of keys matching ``pattern``"
560 return self.execute_command('KEYS', pattern)
561
562 - def mget(self, keys, *args):
563 """
564 Returns a list of values ordered identically to ``keys``
565 """
566 args = list_or_args(keys, args)
567 return self.execute_command('MGET', *args)
568
569 - def mset(self, mapping):
570 "Sets each key in the ``mapping`` dict to its corresponding value"
571 items = []
572 for pair in iteritems(mapping):
573 items.extend(pair)
574 return self.execute_command('MSET', *items)
575
577 """
578 Sets each key in the ``mapping`` dict to its corresponding value if
579 none of the keys are already set
580 """
581 items = []
582 for pair in iteritems(mapping):
583 items.extend(pair)
584 return self.execute_command('MSETNX', *items)
585
586 - def move(self, name, db):
587 "Moves the key ``name`` to a different Redis database ``db``"
588 return self.execute_command('MOVE', name, db)
589
591 "Removes an expiration on ``name``"
592 return self.execute_command('PERSIST', name)
593
595 "Returns the name of a random key"
596 return self.execute_command('RANDOMKEY')
597
599 """
600 Rename key ``src`` to ``dst``
601 """
602 return self.execute_command('RENAME', src, dst)
603
605 "Rename key ``src`` to ``dst`` if ``dst`` doesn't already exist"
606 return self.execute_command('RENAMENX', src, dst)
607
608 - def set(self, name, value):
609 "Set the value at key ``name`` to ``value``"
610 return self.execute_command('SET', name, value)
611 __setitem__ = set
612
613 - def setbit(self, name, offset, value):
614 """
615 Flag the ``offset`` in ``name`` as ``value``. Returns a boolean
616 indicating the previous value of ``offset``.
617 """
618 value = value and 1 or 0
619 return self.execute_command('SETBIT', name, offset, value)
620
621 - def setex(self, name, time, value):
622 """
623 Set the value of key ``name`` to ``value`` that expires in ``time``
624 seconds. ``time`` can be represented by an integer or a Python
625 timedelta object.
626 """
627 if isinstance(time, datetime.timedelta):
628 time = int(time.total_seconds())
629 return self.execute_command('SETEX', name, time, value)
630
631 - def setnx(self, name, value):
632 "Set the value of key ``name`` to ``value`` if key doesn't exist"
633 return self.execute_command('SETNX', name, value)
634
635 - def setrange(self, name, offset, value):
636 """
637 Overwrite bytes in the value of ``name`` starting at ``offset`` with
638 ``value``. If ``offset`` plus the length of ``value`` exceeds the
639 length of the original value, the new value will be larger than before.
640 If ``offset`` exceeds the length of the original value, null bytes
641 will be used to pad between the end of the previous value and the start
642 of what's being injected.
643
644 Returns the length of the new string.
645 """
646 return self.execute_command('SETRANGE', name, offset, value)
647
649 "Return the number of bytes stored in the value of ``name``"
650 return self.execute_command('STRLEN', name)
651
652 - def substr(self, name, start, end=-1):
653 """
654 Return a substring of the string at key ``name``. ``start`` and ``end``
655 are 0-based integers specifying the portion of the string to return.
656 """
657 return self.execute_command('SUBSTR', name, start, end)
658
659 - def ttl(self, name):
660 "Returns the number of seconds until the key ``name`` will expire"
661 return self.execute_command('TTL', name)
662
663 - def type(self, name):
664 "Returns the type of key ``name``"
665 return self.execute_command('TYPE', name)
666
667 - def watch(self, *names):
668 """
669 Watches the values at keys ``names``, or None if the key doesn't exist
670 """
671 warnings.warn(DeprecationWarning('Call WATCH from a Pipeline object'))
672
674 """
675 Unwatches the value at key ``name``, or None of the key doesn't exist
676 """
677 warnings.warn(
678 DeprecationWarning('Call UNWATCH from a Pipeline object'))
679
680
681 - def blpop(self, keys, timeout=0):
682 """
683 LPOP a value off of the first non-empty list
684 named in the ``keys`` list.
685
686 If none of the lists in ``keys`` has a value to LPOP, then block
687 for ``timeout`` seconds, or until a value gets pushed on to one
688 of the lists.
689
690 If timeout is 0, then block indefinitely.
691 """
692 if timeout is None:
693 timeout = 0
694 if isinstance(keys, basestring):
695 keys = [keys]
696 else:
697 keys = list(keys)
698 keys.append(timeout)
699 return self.execute_command('BLPOP', *keys)
700
701 - def brpop(self, keys, timeout=0):
702 """
703 RPOP a value off of the first non-empty list
704 named in the ``keys`` list.
705
706 If none of the lists in ``keys`` has a value to LPOP, then block
707 for ``timeout`` seconds, or until a value gets pushed on to one
708 of the lists.
709
710 If timeout is 0, then block indefinitely.
711 """
712 if timeout is None:
713 timeout = 0
714 if isinstance(keys, basestring):
715 keys = [keys]
716 else:
717 keys = list(keys)
718 keys.append(timeout)
719 return self.execute_command('BRPOP', *keys)
720
722 """
723 Pop a value off the tail of ``src``, push it on the head of ``dst``
724 and then return it.
725
726 This command blocks until a value is in ``src`` or until ``timeout``
727 seconds elapse, whichever is first. A ``timeout`` value of 0 blocks
728 forever.
729 """
730 if timeout is None:
731 timeout = 0
732 return self.execute_command('BRPOPLPUSH', src, dst, timeout)
733
734 - def lindex(self, name, index):
735 """
736 Return the item from list ``name`` at position ``index``
737
738 Negative indexes are supported and will return an item at the
739 end of the list
740 """
741 return self.execute_command('LINDEX', name, index)
742
743 - def linsert(self, name, where, refvalue, value):
744 """
745 Insert ``value`` in list ``name`` either immediately before or after
746 [``where``] ``refvalue``
747
748 Returns the new length of the list on success or -1 if ``refvalue``
749 is not in the list.
750 """
751 return self.execute_command('LINSERT', name, where, refvalue, value)
752
753 - def llen(self, name):
754 "Return the length of the list ``name``"
755 return self.execute_command('LLEN', name)
756
757 - def lpop(self, name):
758 "Remove and return the first item of the list ``name``"
759 return self.execute_command('LPOP', name)
760
761 - def lpush(self, name, *values):
762 "Push ``values`` onto the head of the list ``name``"
763 return self.execute_command('LPUSH', name, *values)
764
765 - def lpushx(self, name, value):
766 "Push ``value`` onto the head of the list ``name`` if ``name`` exists"
767 return self.execute_command('LPUSHX', name, value)
768
769 - def lrange(self, name, start, end):
770 """
771 Return a slice of the list ``name`` between
772 position ``start`` and ``end``
773
774 ``start`` and ``end`` can be negative numbers just like
775 Python slicing notation
776 """
777 return self.execute_command('LRANGE', name, start, end)
778
779 - def lrem(self, name, count, value):
780 """
781 Remove the first ``count`` occurrences of elements equal to ``value``
782 from the list stored at ``name``.
783
784 The count argument influences the operation in the following ways:
785 count > 0: Remove elements equal to value moving from head to tail.
786 count < 0: Remove elements equal to value moving from tail to head.
787 count = 0: Remove all elements equal to value.
788 """
789 return self.execute_command('LREM', name, count, value)
790
791 - def lset(self, name, index, value):
792 "Set ``position`` of list ``name`` to ``value``"
793 return self.execute_command('LSET', name, index, value)
794
795 - def ltrim(self, name, start, end):
796 """
797 Trim the list ``name``, removing all values not within the slice
798 between ``start`` and ``end``
799
800 ``start`` and ``end`` can be negative numbers just like
801 Python slicing notation
802 """
803 return self.execute_command('LTRIM', name, start, end)
804
805 - def rpop(self, name):
806 "Remove and return the last item of the list ``name``"
807 return self.execute_command('RPOP', name)
808
810 """
811 RPOP a value off of the ``src`` list and atomically LPUSH it
812 on to the ``dst`` list. Returns the value.
813 """
814 return self.execute_command('RPOPLPUSH', src, dst)
815
816 - def rpush(self, name, *values):
817 "Push ``values`` onto the tail of the list ``name``"
818 return self.execute_command('RPUSH', name, *values)
819
820 - def rpushx(self, name, value):
821 "Push ``value`` onto the tail of the list ``name`` if ``name`` exists"
822 return self.execute_command('RPUSHX', name, value)
823
824 - def sort(self, name, start=None, num=None, by=None, get=None,
825 desc=False, alpha=False, store=None):
826 """
827 Sort and return the list, set or sorted set at ``name``.
828
829 ``start`` and ``num`` allow for paging through the sorted data
830
831 ``by`` allows using an external key to weight and sort the items.
832 Use an "*" to indicate where in the key the item value is located
833
834 ``get`` allows for returning items from external keys rather than the
835 sorted data itself. Use an "*" to indicate where int he key
836 the item value is located
837
838 ``desc`` allows for reversing the sort
839
840 ``alpha`` allows for sorting lexicographically rather than numerically
841
842 ``store`` allows for storing the result of the sort into
843 the key ``store``
844 """
845 if (start is not None and num is None) or \
846 (num is not None and start is None):
847 raise RedisError("``start`` and ``num`` must both be specified")
848
849 pieces = [name]
850 if by is not None:
851 pieces.append('BY')
852 pieces.append(by)
853 if start is not None and num is not None:
854 pieces.append('LIMIT')
855 pieces.append(start)
856 pieces.append(num)
857 if get is not None:
858
859
860
861
862 if isinstance(get, basestring):
863 pieces.append('GET')
864 pieces.append(get)
865 else:
866 for g in get:
867 pieces.append('GET')
868 pieces.append(g)
869 if desc:
870 pieces.append('DESC')
871 if alpha:
872 pieces.append('ALPHA')
873 if store is not None:
874 pieces.append('STORE')
875 pieces.append(store)
876 return self.execute_command('SORT', *pieces)
877
878
879 - def sadd(self, name, *values):
880 "Add ``value(s)`` to set ``name``"
881 return self.execute_command('SADD', name, *values)
882
884 "Return the number of elements in set ``name``"
885 return self.execute_command('SCARD', name)
886
887 - def sdiff(self, keys, *args):
891
893 """
894 Store the difference of sets specified by ``keys`` into a new
895 set named ``dest``. Returns the number of keys in the new set.
896 """
897 args = list_or_args(keys, args)
898 return self.execute_command('SDIFFSTORE', dest, *args)
899
900 - def sinter(self, keys, *args):
904
906 """
907 Store the intersection of sets specified by ``keys`` into a new
908 set named ``dest``. Returns the number of keys in the new set.
909 """
910 args = list_or_args(keys, args)
911 return self.execute_command('SINTERSTORE', dest, *args)
912
914 "Return a boolean indicating if ``value`` is a member of set ``name``"
915 return self.execute_command('SISMEMBER', name, value)
916
918 "Return all members of the set ``name``"
919 return self.execute_command('SMEMBERS', name)
920
921 - def smove(self, src, dst, value):
922 "Move ``value`` from set ``src`` to set ``dst`` atomically"
923 return self.execute_command('SMOVE', src, dst, value)
924
925 - def spop(self, name):
926 "Remove and return a random member of set ``name``"
927 return self.execute_command('SPOP', name)
928
930 "Return a random member of set ``name``"
931 return self.execute_command('SRANDMEMBER', name)
932
933 - def srem(self, name, *values):
934 "Remove ``values`` from set ``name``"
935 return self.execute_command('SREM', name, *values)
936
937 - def sunion(self, keys, *args):
941
943 """
944 Store the union of sets specified by ``keys`` into a new
945 set named ``dest``. Returns the number of keys in the new set.
946 """
947 args = list_or_args(keys, args)
948 return self.execute_command('SUNIONSTORE', dest, *args)
949
950
951 - def zadd(self, name, *args, **kwargs):
952 """
953 Set any number of score, element-name pairs to the key ``name``. Pairs
954 can be specified in two ways:
955
956 As *args, in the form of: score1, name1, score2, name2, ...
957 or as **kwargs, in the form of: name1=score1, name2=score2, ...
958
959 The following example would add four values to the 'my-key' key:
960 redis.zadd('my-key', 1.1, 'name1', 2.2, 'name2', name3=3.3, name4=4.4)
961 """
962 pieces = []
963 if args:
964 if len(args) % 2 != 0:
965 raise RedisError("ZADD requires an equal number of "
966 "values and scores")
967 pieces.extend(args)
968 for pair in iteritems(kwargs):
969 pieces.append(pair[1])
970 pieces.append(pair[0])
971 return self.execute_command('ZADD', name, *pieces)
972
974 "Return the number of elements in the sorted set ``name``"
975 return self.execute_command('ZCARD', name)
976
977 - def zcount(self, name, min, max):
979
980 - def zincrby(self, name, value, amount=1):
981 "Increment the score of ``value`` in sorted set ``name`` by ``amount``"
982 return self.execute_command('ZINCRBY', name, amount, value)
983
985 """
986 Intersect multiple sorted sets specified by ``keys`` into
987 a new sorted set, ``dest``. Scores in the destination will be
988 aggregated based on the ``aggregate``, or SUM if none is provided.
989 """
990 return self._zaggregate('ZINTERSTORE', dest, keys, aggregate)
991
992 - def zrange(self, name, start, end, desc=False, withscores=False,
993 score_cast_func=float):
994 """
995 Return a range of values from sorted set ``name`` between
996 ``start`` and ``end`` sorted in ascending order.
997
998 ``start`` and ``end`` can be negative, indicating the end of the range.
999
1000 ``desc`` a boolean indicating whether to sort the results descendingly
1001
1002 ``withscores`` indicates to return the scores along with the values.
1003 The return type is a list of (value, score) pairs
1004
1005 ``score_cast_func`` a callable used to cast the score return value
1006 """
1007 if desc:
1008 return self.zrevrange(name, start, end, withscores,
1009 score_cast_func)
1010 pieces = ['ZRANGE', name, start, end]
1011 if withscores:
1012 pieces.append('withscores')
1013 options = {
1014 'withscores': withscores, 'score_cast_func': score_cast_func}
1015 return self.execute_command(*pieces, **options)
1016
1017 - def zrangebyscore(self, name, min, max, start=None, num=None,
1018 withscores=False, score_cast_func=float):
1019 """
1020 Return a range of values from the sorted set ``name`` with scores
1021 between ``min`` and ``max``.
1022
1023 If ``start`` and ``num`` are specified, then return a slice
1024 of the range.
1025
1026 ``withscores`` indicates to return the scores along with the values.
1027 The return type is a list of (value, score) pairs
1028
1029 `score_cast_func`` a callable used to cast the score return value
1030 """
1031 if (start is not None and num is None) or \
1032 (num is not None and start is None):
1033 raise RedisError("``start`` and ``num`` must both be specified")
1034 pieces = ['ZRANGEBYSCORE', name, min, max]
1035 if start is not None and num is not None:
1036 pieces.extend(['LIMIT', start, num])
1037 if withscores:
1038 pieces.append('withscores')
1039 options = {
1040 'withscores': withscores, 'score_cast_func': score_cast_func}
1041 return self.execute_command(*pieces, **options)
1042
1043 - def zrank(self, name, value):
1044 """
1045 Returns a 0-based value indicating the rank of ``value`` in sorted set
1046 ``name``
1047 """
1048 return self.execute_command('ZRANK', name, value)
1049
1050 - def zrem(self, name, *values):
1051 "Remove member ``values`` from sorted set ``name``"
1052 return self.execute_command('ZREM', name, *values)
1053
1055 """
1056 Remove all elements in the sorted set ``name`` with ranks between
1057 ``min`` and ``max``. Values are 0-based, ordered from smallest score
1058 to largest. Values can be negative indicating the highest scores.
1059 Returns the number of elements removed
1060 """
1061 return self.execute_command('ZREMRANGEBYRANK', name, min, max)
1062
1064 """
1065 Remove all elements in the sorted set ``name`` with scores
1066 between ``min`` and ``max``. Returns the number of elements removed.
1067 """
1068 return self.execute_command('ZREMRANGEBYSCORE', name, min, max)
1069
1070 - def zrevrange(self, name, start, num, withscores=False,
1071 score_cast_func=float):
1072 """
1073 Return a range of values from sorted set ``name`` between
1074 ``start`` and ``num`` sorted in descending order.
1075
1076 ``start`` and ``num`` can be negative, indicating the end of the range.
1077
1078 ``withscores`` indicates to return the scores along with the values
1079 The return type is a list of (value, score) pairs
1080
1081 ``score_cast_func`` a callable used to cast the score return value
1082 """
1083 pieces = ['ZREVRANGE', name, start, num]
1084 if withscores:
1085 pieces.append('withscores')
1086 options = {
1087 'withscores': withscores, 'score_cast_func': score_cast_func}
1088 return self.execute_command(*pieces, **options)
1089
1090 - def zrevrangebyscore(self, name, max, min, start=None, num=None,
1091 withscores=False, score_cast_func=float):
1092 """
1093 Return a range of values from the sorted set ``name`` with scores
1094 between ``min`` and ``max`` in descending order.
1095
1096 If ``start`` and ``num`` are specified, then return a slice
1097 of the range.
1098
1099 ``withscores`` indicates to return the scores along with the values.
1100 The return type is a list of (value, score) pairs
1101
1102 ``score_cast_func`` a callable used to cast the score return value
1103 """
1104 if (start is not None and num is None) or \
1105 (num is not None and start is None):
1106 raise RedisError("``start`` and ``num`` must both be specified")
1107 pieces = ['ZREVRANGEBYSCORE', name, max, min]
1108 if start is not None and num is not None:
1109 pieces.extend(['LIMIT', start, num])
1110 if withscores:
1111 pieces.append('withscores')
1112 options = {
1113 'withscores': withscores, 'score_cast_func': score_cast_func}
1114 return self.execute_command(*pieces, **options)
1115
1117 """
1118 Returns a 0-based value indicating the descending rank of
1119 ``value`` in sorted set ``name``
1120 """
1121 return self.execute_command('ZREVRANK', name, value)
1122
1123 - def zscore(self, name, value):
1124 "Return the score of element ``value`` in sorted set ``name``"
1125 return self.execute_command('ZSCORE', name, value)
1126
1128 """
1129 Union multiple sorted sets specified by ``keys`` into
1130 a new sorted set, ``dest``. Scores in the destination will be
1131 aggregated based on the ``aggregate``, or SUM if none is provided.
1132 """
1133 return self._zaggregate('ZUNIONSTORE', dest, keys, aggregate)
1134
1135 - def _zaggregate(self, command, dest, keys, aggregate=None):
1136 pieces = [command, dest, len(keys)]
1137 if isinstance(keys, dict):
1138 keys, weights = dictkeys(keys), dictvalues(keys)
1139 else:
1140 weights = None
1141 pieces.extend(keys)
1142 if weights:
1143 pieces.append('WEIGHTS')
1144 pieces.extend(weights)
1145 if aggregate:
1146 pieces.append('AGGREGATE')
1147 pieces.append(aggregate)
1148 return self.execute_command(*pieces)
1149
1150
1151 - def hdel(self, name, *keys):
1152 "Delete ``keys`` from hash ``name``"
1153 return self.execute_command('HDEL', name, *keys)
1154
1156 "Returns a boolean indicating if ``key`` exists within hash ``name``"
1157 return self.execute_command('HEXISTS', name, key)
1158
1159 - def hget(self, name, key):
1160 "Return the value of ``key`` within the hash ``name``"
1161 return self.execute_command('HGET', name, key)
1162
1164 "Return a Python dict of the hash's name/value pairs"
1165 return self.execute_command('HGETALL', name)
1166
1167 - def hincrby(self, name, key, amount=1):
1168 "Increment the value of ``key`` in hash ``name`` by ``amount``"
1169 return self.execute_command('HINCRBY', name, key, amount)
1170
1172 "Return the list of keys within hash ``name``"
1173 return self.execute_command('HKEYS', name)
1174
1175 - def hlen(self, name):
1176 "Return the number of elements in hash ``name``"
1177 return self.execute_command('HLEN', name)
1178
1179 - def hset(self, name, key, value):
1180 """
1181 Set ``key`` to ``value`` within hash ``name``
1182 Returns 1 if HSET created a new field, otherwise 0
1183 """
1184 return self.execute_command('HSET', name, key, value)
1185
1186 - def hsetnx(self, name, key, value):
1187 """
1188 Set ``key`` to ``value`` within hash ``name`` if ``key`` does not
1189 exist. Returns 1 if HSETNX created a field, otherwise 0.
1190 """
1191 return self.execute_command("HSETNX", name, key, value)
1192
1193 - def hmset(self, name, mapping):
1194 """
1195 Sets each key in the ``mapping`` dict to its corresponding value
1196 in the hash ``name``
1197 """
1198 if not mapping:
1199 raise DataError("'hmset' with 'mapping' of length 0")
1200 items = []
1201 for pair in iteritems(mapping):
1202 items.extend(pair)
1203 return self.execute_command('HMSET', name, *items)
1204
1205 - def hmget(self, name, keys, *args):
1206 "Returns a list of values ordered identically to ``keys``"
1207 args = list_or_args(keys, args)
1208 return self.execute_command('HMGET', name, *args)
1209
1211 "Return the list of values within hash ``name``"
1212 return self.execute_command('HVALS', name)
1213
1214 - def publish(self, channel, message):
1215 """
1216 Publish ``message`` on ``channel``.
1217 Returns the number of subscribers the message was delivered to.
1218 """
1219 return self.execute_command('PUBLISH', channel, message)
1220
1221 - def eval(self, script, numkeys, *keys_and_args):
1222 """
1223 Execute the LUA ``script``, specifying the ``numkeys`` the script
1224 will touch and the key names and argument values in ``keys_and_args``.
1225 Returns the result of the script.
1226
1227 In practice, use the object returned by ``register_script``. This
1228 function exists purely for Redis API completion.
1229 """
1230 return self.execute_command('EVAL', script, numkeys, *keys_and_args)
1231
1232 - def evalsha(self, sha, numkeys, *keys_and_args):
1233 """
1234 Use the ``sha`` to execute a LUA script already registered via EVAL
1235 or SCRIPT LOAD. Specify the ``numkeys`` the script will touch and the
1236 key names and argument values in ``keys_and_args``. Returns the result
1237 of the script.
1238
1239 In practice, use the object returned by ``register_script``. This
1240 function exists purely for Redis API completion.
1241 """
1242 return self.execute_command('EVALSHA', sha, numkeys, *keys_and_args)
1243
1245 """
1246 Check if a script exists in the script cache by specifying the SHAs of
1247 each script as ``args``. Returns a list of boolean values indicating if
1248 if each already script exists in the cache.
1249 """
1250 options = {'parse': 'EXISTS'}
1251 return self.execute_command('SCRIPT', 'EXISTS', *args, **options)
1252
1254 "Flush all scripts from the script cache"
1255 options = {'parse': 'FLUSH'}
1256 return self.execute_command('SCRIPT', 'FLUSH', **options)
1257
1259 "Kill the currently executing LUA script"
1260 options = {'parse': 'KILL'}
1261 return self.execute_command('SCRIPT', 'KILL', **options)
1262
1264 "Load a LUA ``script`` into the script cache. Returns the SHA."
1265 options = {'parse': 'LOAD'}
1266 return self.execute_command('SCRIPT', 'LOAD', script, **options)
1267
1269 """
1270 Register a LUA ``script`` specifying the ``keys`` it will touch.
1271 Returns a Script object that is callable and hides the complexity of
1272 deal with scripts, keys, and shas. This is the preferred way to work
1273 with LUA scripts.
1274 """
1275 return Script(self, script)
1276
1277 -class Redis(StrictRedis):
1278 """
1279 Provides backwards compatibility with older versions of redis-py that
1280 changed arguments to some commands to be more Pythonic, sane, or by
1281 accident.
1282 """
1283
1284 RESPONSE_CALLBACKS = dict_merge(
1285 StrictRedis.RESPONSE_CALLBACKS,
1286 {
1287 'TTL': lambda r: r != -1 and r or None,
1288 }
1289 )
1290
1291 - def pipeline(self, transaction=True, shard_hint=None):
1292 """
1293 Return a new pipeline object that can queue multiple commands for
1294 later execution. ``transaction`` indicates whether all commands
1295 should be executed atomically. Apart from making a group of operations
1296 atomic, pipelines are useful for reducing the back-and-forth overhead
1297 between the client and server.
1298 """
1299 return Pipeline(
1300 self.connection_pool,
1301 self.response_callbacks,
1302 transaction,
1303 shard_hint)
1304
1305 - def setex(self, name, value, time):
1306 """
1307 Set the value of key ``name`` to ``value`` that expires in ``time``
1308 seconds. ``time`` can be represented by an integer or a Python
1309 timedelta object.
1310 """
1311 if isinstance(time, datetime.timedelta):
1312 time = int(time.total_seconds())
1313 return self.execute_command('SETEX', name, time, value)
1314
1315 - def lrem(self, name, value, num=0):
1316 """
1317 Remove the first ``num`` occurrences of elements equal to ``value``
1318 from the list stored at ``name``.
1319
1320 The ``num`` argument influences the operation in the following ways:
1321 num > 0: Remove elements equal to value moving from head to tail.
1322 num < 0: Remove elements equal to value moving from tail to head.
1323 num = 0: Remove all elements equal to value.
1324 """
1325 return self.execute_command('LREM', name, num, value)
1326
1327 - def zadd(self, name, *args, **kwargs):
1328 """
1329 NOTE: The order of arguments differs from that of the official ZADD
1330 command. For backwards compatability, this method accepts arguments
1331 in the form of name1, score1, name2, score2, while the official Redis
1332 documents expects score1, name1, score2, name2.
1333
1334 If you're looking to use the standard syntax, consider using the
1335 StrictRedis class. See the API Reference section of the docs for more
1336 information.
1337
1338 Set any number of element-name, score pairs to the key ``name``. Pairs
1339 can be specified in two ways:
1340
1341 As *args, in the form of: name1, score1, name2, score2, ...
1342 or as **kwargs, in the form of: name1=score1, name2=score2, ...
1343
1344 The following example would add four values to the 'my-key' key:
1345 redis.zadd('my-key', 'name1', 1.1, 'name2', 2.2, name3=3.3, name4=4.4)
1346 """
1347 pieces = []
1348 if args:
1349 if len(args) % 2 != 0:
1350 raise RedisError("ZADD requires an equal number of "
1351 "values and scores")
1352 pieces.extend(reversed(args))
1353 for pair in iteritems(kwargs):
1354 pieces.append(pair[1])
1355 pieces.append(pair[0])
1356 return self.execute_command('ZADD', name, *pieces)
1357
1360 """
1361 PubSub provides publish, subscribe and listen support to Redis channels.
1362
1363 After subscribing to one or more channels, the listen() method will block
1364 until a message arrives on one of the subscribed channels. That message
1365 will be returned and it's safe to start listening again.
1366 """
1367 - def __init__(self, connection_pool, shard_hint=None):
1368 self.connection_pool = connection_pool
1369 self.shard_hint = shard_hint
1370 self.connection = None
1371 self.channels = set()
1372 self.patterns = set()
1373 self.subscription_count = 0
1374 self.subscribe_commands = set(
1375 ('subscribe', 'psubscribe', 'unsubscribe', 'punsubscribe')
1376 )
1377
1388
1394
1397
1425
1427 "Parse the response from a publish/subscribe command"
1428 response = self.connection.read_response()
1429 if nativestr(response[0]) in self.subscribe_commands:
1430 self.subscription_count = response[2]
1431
1432
1433 if not self.subscription_count:
1434 self.reset()
1435 return response
1436
1438 "Subscribe to all channels matching any pattern in ``patterns``"
1439 if isinstance(patterns, basestring):
1440 patterns = [patterns]
1441 for pattern in patterns:
1442 self.patterns.add(pattern)
1443 return self.execute_command('PSUBSCRIBE', *patterns)
1444
1446 """
1447 Unsubscribe from any channel matching any pattern in ``patterns``.
1448 If empty, unsubscribe from all channels.
1449 """
1450 if isinstance(patterns, basestring):
1451 patterns = [patterns]
1452 for pattern in patterns:
1453 try:
1454 self.patterns.remove(pattern)
1455 except KeyError:
1456 pass
1457 return self.execute_command('PUNSUBSCRIBE', *patterns)
1458
1460 "Subscribe to ``channels``, waiting for messages to be published"
1461 if isinstance(channels, basestring):
1462 channels = [channels]
1463 for channel in channels:
1464 self.channels.add(channel)
1465 return self.execute_command('SUBSCRIBE', *channels)
1466
1468 """
1469 Unsubscribe from ``channels``. If empty, unsubscribe
1470 from all channels
1471 """
1472 if isinstance(channels, basestring):
1473 channels = [channels]
1474 for channel in channels:
1475 try:
1476 self.channels.remove(channel)
1477 except KeyError:
1478 pass
1479 return self.execute_command('UNSUBSCRIBE', *channels)
1480
1482 "Listen for messages on channels this client has been subscribed to"
1483 while self.subscription_count or self.channels or self.patterns:
1484 if self.connection == None:
1485 break
1486
1487 r = self.parse_response()
1488 msg_type = nativestr(r[0])
1489 if msg_type == 'pmessage':
1490 msg = {
1491 'type': msg_type,
1492 'pattern': nativestr(r[1]),
1493 'channel': nativestr(r[2]),
1494 'data': r[3]
1495 }
1496 else:
1497 msg = {
1498 'type': msg_type,
1499 'pattern': None,
1500 'channel': nativestr(r[1]),
1501 'data': r[2]
1502 }
1503 yield msg
1504
1507 """
1508 Pipelines provide a way to transmit multiple commands to the Redis server
1509 in one transmission. This is convenient for batch processing, such as
1510 saving all the values in a list to Redis.
1511
1512 All commands executed within a pipeline are wrapped with MULTI and EXEC
1513 calls. This guarantees all commands executed in the pipeline will be
1514 executed atomically.
1515
1516 Any command raising an exception does *not* halt the execution of
1517 subsequent commands in the pipeline. Instead, the exception is caught
1518 and its instance is placed into the response list returned by execute().
1519 Code iterating over the response list should be able to deal with an
1520 instance of an exception as a potential value. In general, these will be
1521 ResponseError exceptions, such as those raised when issuing a command
1522 on a key of a different datatype.
1523 """
1524
1525 UNWATCH_COMMANDS = set(('DISCARD', 'EXEC', 'UNWATCH'))
1526
1527 - def __init__(self, connection_pool, response_callbacks, transaction,
1528 shard_hint):
1529 self.connection_pool = connection_pool
1530 self.connection = None
1531 self.response_callbacks = response_callbacks
1532 self.transaction = transaction
1533 self.shard_hint = shard_hint
1534
1535 self.watching = False
1536 self.reset()
1537
1540
1541 - def __exit__(self, exc_type, exc_value, traceback):
1543
1545 try:
1546 self.reset()
1547 except:
1548 pass
1549
1572
1574 """
1575 Start a transactional block of the pipeline after WATCH commands
1576 are issued. End the transactional block with `execute`.
1577 """
1578 if self.explicit_transaction:
1579 raise RedisError('Cannot issue nested calls to MULTI')
1580 if self.command_stack:
1581 raise RedisError('Commands without an initial WATCH have already '
1582 'been issued')
1583 self.explicit_transaction = True
1584
1590
1617
1619 """
1620 Stage a command to be executed when execute() is next called
1621
1622 Returns the current Pipeline object back so commands can be
1623 chained together, such as:
1624
1625 pipe = pipe.set('foo', 'bar').incr('baz').decr('bang')
1626
1627 At some other point, you can then run: pipe.execute(),
1628 which will execute all commands queued in the pipe.
1629 """
1630 self.command_stack.append((args, options))
1631 return self
1632
1634 all_cmds = SYM_EMPTY.join(
1635 starmap(connection.pack_command,
1636 [args for args, options in commands]))
1637 connection.send_packed_command(all_cmds)
1638
1639 commands = commands[1:-1]
1640
1641
1642
1643 for i in range(len(commands) + 1):
1644 self.parse_response(connection, '_')
1645
1646 response = self.parse_response(connection, '_')
1647
1648 if response is None:
1649 raise WatchError("Watched variable changed.")
1650
1651 if len(response) != len(commands):
1652 raise ResponseError("Wrong number of response items from "
1653 "pipeline execution")
1654
1655 data = []
1656 for r, cmd in izip(response, commands):
1657 if not isinstance(r, Exception):
1658 args, options = cmd
1659 command_name = args[0]
1660 if command_name in self.response_callbacks:
1661 r = self.response_callbacks[command_name](r, **options)
1662 data.append(r)
1663 return data
1664
1673
1682
1684
1685 scripts = list(self.scripts)
1686 immediate = self.immediate_execute_command
1687 shas = [s.sha for s in scripts]
1688 exists = immediate('SCRIPT', 'EXISTS', *shas, **{'parse': 'EXISTS'})
1689 if not all(exists):
1690 for s, exist in izip(scripts, exists):
1691 if not exist:
1692 immediate('SCRIPT', 'LOAD', s.script, **{'parse': 'LOAD'})
1693
1695 "Execute all the commands in the current pipeline"
1696 if self.scripts:
1697 self.load_scripts()
1698 stack = self.command_stack
1699 if self.transaction or self.explicit_transaction:
1700 stack = [(('MULTI', ), {})] + stack + [(('EXEC', ), {})]
1701 execute = self._execute_transaction
1702 else:
1703 execute = self._execute_pipeline
1704
1705 conn = self.connection
1706 if not conn:
1707 conn = self.connection_pool.get_connection('MULTI',
1708 self.shard_hint)
1709
1710
1711 self.connection = conn
1712
1713 try:
1714 return execute(conn, stack)
1715 except ConnectionError:
1716 conn.disconnect()
1717
1718
1719
1720
1721
1722 if self.watching:
1723 raise WatchError("A ConnectionError occured on while watching "
1724 "one or more keys")
1725
1726
1727 return execute(conn, stack)
1728 finally:
1729 self.reset()
1730
1731 - def watch(self, *names):
1732 "Watches the values at keys ``names``"
1733 if self.explicit_transaction:
1734 raise RedisError('Cannot issue a WATCH after a MULTI')
1735 return self.execute_command('WATCH', *names)
1736
1738 "Unwatches all previously specified keys"
1739 return self.watching and self.execute_command('UNWATCH') or True
1740
1742 "Make sure scripts are loaded prior to pipeline execution"
1743 self.scripts.add(script)
1744
1747 "Pipeline for the StrictRedis class"
1748 pass
1749
1750
1751 -class Pipeline(BasePipeline, Redis):
1752 "Pipeline for the Redis class"
1753 pass
1754
1757 "An executable LUA script object returned by ``register_script``"
1758
1759 - def __init__(self, registered_client, script):
1760 self.registered_client = registered_client
1761 self.script = script
1762 self.sha = registered_client.script_load(script)
1763
1764 - def __call__(self, keys=[], args=[], client=None):
1779
1782 "Errors thrown from the Lock"
1783 pass
1784
1785
1786 -class Lock(object):
1787 """
1788 A shared, distributed Lock. Using Redis for locking allows the Lock
1789 to be shared across processes and/or machines.
1790
1791 It's left to the user to resolve deadlock issues and make sure
1792 multiple clients play nicely together.
1793 """
1794
1795 LOCK_FOREVER = float(2 ** 31 + 1)
1796
1797 - def __init__(self, redis, name, timeout=None, sleep=0.1):
1798 """
1799 Create a new Lock instnace named ``name`` using the Redis client
1800 supplied by ``redis``.
1801
1802 ``timeout`` indicates a maximum life for the lock.
1803 By default, it will remain locked until release() is called.
1804
1805 ``sleep`` indicates the amount of time to sleep per loop iteration
1806 when the lock is in blocking mode and another client is currently
1807 holding the lock.
1808
1809 Note: If using ``timeout``, you should make sure all the hosts
1810 that are running clients have their time synchronized with a network
1811 time service like ntp.
1812 """
1813 self.redis = redis
1814 self.name = name
1815 self.acquired_until = None
1816 self.timeout = timeout
1817 self.sleep = sleep
1818 if self.timeout and self.sleep > self.timeout:
1819 raise LockError("'sleep' must be less than 'timeout'")
1820
1823
1824 - def __exit__(self, exc_type, exc_value, traceback):
1826
1827 - def acquire(self, blocking=True):
1828 """
1829 Use Redis to hold a shared, distributed lock named ``name``.
1830 Returns True once the lock is acquired.
1831
1832 If ``blocking`` is False, always return immediately. If the lock
1833 was acquired, return True, otherwise return False.
1834 """
1835 sleep = self.sleep
1836 timeout = self.timeout
1837 while 1:
1838 unixtime = int(mod_time.time())
1839 if timeout:
1840 timeout_at = unixtime + timeout
1841 else:
1842 timeout_at = Lock.LOCK_FOREVER
1843 timeout_at = float(timeout_at)
1844 if self.redis.setnx(self.name, timeout_at):
1845 self.acquired_until = timeout_at
1846 return True
1847
1848
1849 existing = float(self.redis.get(self.name) or 1)
1850 if existing < unixtime:
1851
1852 existing = float(self.redis.getset(self.name, timeout_at) or 1)
1853 if existing < unixtime:
1854
1855 self.acquired_until = timeout_at
1856 return True
1857 if not blocking:
1858 return False
1859 mod_time.sleep(sleep)
1860
1862 "Releases the already acquired lock"
1863 if self.acquired_until is None:
1864 raise ValueError("Cannot release an unlocked lock")
1865 existing = float(self.redis.get(self.name) or 1)
1866
1867 if existing >= self.acquired_until:
1868 self.redis.delete(self.name)
1869 self.acquired_until = None
1870