Go to the documentation of this file.00001 from __future__ import with_statement
00002 import time
00003 import unittest
00004
00005 from redis.client import Lock, LockError
00006 import redis
00007
00008
00009 class LockTestCase(unittest.TestCase):
00010 def setUp(self):
00011 self.client = redis.Redis(host='localhost', port=6379, db=9)
00012 self.client.flushdb()
00013
00014 def tearDown(self):
00015 self.client.flushdb()
00016
00017 def test_lock(self):
00018 lock = self.client.lock('foo')
00019 self.assert_(lock.acquire())
00020 self.assertEquals(self.client['foo'], str(Lock.LOCK_FOREVER).encode())
00021 lock.release()
00022 self.assertEquals(self.client.get('foo'), None)
00023
00024 def test_competing_locks(self):
00025 lock1 = self.client.lock('foo')
00026 lock2 = self.client.lock('foo')
00027 self.assert_(lock1.acquire())
00028 self.assertFalse(lock2.acquire(blocking=False))
00029 lock1.release()
00030 self.assert_(lock2.acquire())
00031 self.assertFalse(lock1.acquire(blocking=False))
00032 lock2.release()
00033
00034 def test_timeouts(self):
00035 lock1 = self.client.lock('foo', timeout=1)
00036 lock2 = self.client.lock('foo')
00037 self.assert_(lock1.acquire())
00038 self.assertEquals(lock1.acquired_until, float(int(time.time())) + 1)
00039 self.assertEquals(lock1.acquired_until, float(self.client['foo']))
00040 self.assertFalse(lock2.acquire(blocking=False))
00041 time.sleep(2)
00042 self.assert_(lock2.acquire(blocking=False))
00043 lock2.release()
00044
00045 def test_non_blocking(self):
00046 lock1 = self.client.lock('foo')
00047 self.assert_(lock1.acquire(blocking=False))
00048 self.assert_(lock1.acquired_until)
00049 lock1.release()
00050 self.assert_(lock1.acquired_until is None)
00051
00052 def test_context_manager(self):
00053 with self.client.lock('foo'):
00054 self.assertEquals(
00055 self.client['foo'],
00056 str(Lock.LOCK_FOREVER).encode())
00057 self.assertEquals(self.client.get('foo'), None)
00058
00059 def test_float_timeout(self):
00060 lock1 = self.client.lock('foo', timeout=1.5)
00061 lock2 = self.client.lock('foo', timeout=1.5)
00062 self.assert_(lock1.acquire())
00063 self.assertFalse(lock2.acquire(blocking=False))
00064 lock1.release()
00065
00066 def test_high_sleep_raises_error(self):
00067 "If sleep is higher than timeout, it should raise an error"
00068 self.assertRaises(
00069 LockError,
00070 self.client.lock, 'foo', timeout=1, sleep=2
00071 )