lock.py
Go to the documentation of this file.
00001 from __future__ import with_statement
00002 import time
00003 import unittest
00004 
00005 import rocon_python_redis as redis
00006 from redis.client import Lock, LockError
00007 
00008 
00009 class LockTestCase(unittest.TestCase):
00010     def setUp(self):
00011         self.client = redis.Redis(host='localhost', port=6379, db=9)
00012         self.client.flushdb()
00013 
00014     def tearDown(self):
00015         self.client.flushdb()
00016 
00017     def test_lock(self):
00018         lock = self.client.lock('foo')
00019         self.assert_(lock.acquire())
00020         self.assertEquals(self.client['foo'], str(Lock.LOCK_FOREVER).encode())
00021         lock.release()
00022         self.assertEquals(self.client.get('foo'), None)
00023 
00024     def test_competing_locks(self):
00025         lock1 = self.client.lock('foo')
00026         lock2 = self.client.lock('foo')
00027         self.assert_(lock1.acquire())
00028         self.assertFalse(lock2.acquire(blocking=False))
00029         lock1.release()
00030         self.assert_(lock2.acquire())
00031         self.assertFalse(lock1.acquire(blocking=False))
00032         lock2.release()
00033 
00034     def test_timeouts(self):
00035         lock1 = self.client.lock('foo', timeout=1)
00036         lock2 = self.client.lock('foo')
00037         self.assert_(lock1.acquire())
00038         self.assertEquals(lock1.acquired_until, float(int(time.time())) + 1)
00039         self.assertEquals(lock1.acquired_until, float(self.client['foo']))
00040         self.assertFalse(lock2.acquire(blocking=False))
00041         time.sleep(2)  # need to wait up to 2 seconds for lock to timeout
00042         self.assert_(lock2.acquire(blocking=False))
00043         lock2.release()
00044 
00045     def test_non_blocking(self):
00046         lock1 = self.client.lock('foo')
00047         self.assert_(lock1.acquire(blocking=False))
00048         self.assert_(lock1.acquired_until)
00049         lock1.release()
00050         self.assert_(lock1.acquired_until is None)
00051 
00052     def test_context_manager(self):
00053         with self.client.lock('foo'):
00054             self.assertEquals(
00055                 self.client['foo'],
00056                 str(Lock.LOCK_FOREVER).encode())
00057         self.assertEquals(self.client.get('foo'), None)
00058 
00059     def test_float_timeout(self):
00060         lock1 = self.client.lock('foo', timeout=1.5)
00061         lock2 = self.client.lock('foo', timeout=1.5)
00062         self.assert_(lock1.acquire())
00063         self.assertFalse(lock2.acquire(blocking=False))
00064         lock1.release()
00065 
00066     def test_high_sleep_raises_error(self):
00067         "If sleep is higher than timeout, it should raise an error"
00068         self.assertRaises(
00069             LockError,
00070             self.client.lock, 'foo', timeout=1, sleep=2
00071         )


rocon_python_redis
Author(s): Andy McCurdy
autogenerated on Fri May 2 2014 10:35:49