Go to the documentation of this file.00001 import unittest
00002
00003 from redis._compat import b, next
00004 from redis.exceptions import ConnectionError
00005 import redis
00006
00007
00008 class PubSubTestCase(unittest.TestCase):
00009 def setUp(self):
00010 self.connection_pool = redis.ConnectionPool()
00011 self.client = redis.Redis(connection_pool=self.connection_pool)
00012 self.pubsub = self.client.pubsub()
00013
00014 def tearDown(self):
00015 self.connection_pool.disconnect()
00016
00017 def test_channel_subscribe(self):
00018
00019 self.assertEquals(
00020 self.pubsub.subscribe('foo'),
00021 None
00022 )
00023
00024 self.assertEquals(self.client.publish('foo', 'hello foo'), 1)
00025
00026
00027 self.assertEquals(
00028 next(self.pubsub.listen()),
00029 {
00030 'type': 'subscribe',
00031 'pattern': None,
00032 'channel': 'foo',
00033 'data': 1
00034 }
00035 )
00036 self.assertEquals(
00037 next(self.pubsub.listen()),
00038 {
00039 'type': 'message',
00040 'pattern': None,
00041 'channel': 'foo',
00042 'data': b('hello foo')
00043 }
00044 )
00045
00046
00047 self.assertEquals(
00048 self.pubsub.unsubscribe('foo'),
00049 None
00050 )
00051
00052 self.assertEquals(
00053 next(self.pubsub.listen()),
00054 {
00055 'type': 'unsubscribe',
00056 'pattern': None,
00057 'channel': 'foo',
00058 'data': 0
00059 }
00060 )
00061
00062 def test_pattern_subscribe(self):
00063
00064 self.assertEquals(
00065 self.pubsub.psubscribe('f*'),
00066 None
00067 )
00068
00069 self.assertEquals(self.client.publish('foo', 'hello foo'), 1)
00070
00071
00072 self.assertEquals(
00073 next(self.pubsub.listen()),
00074 {
00075 'type': 'psubscribe',
00076 'pattern': None,
00077 'channel': 'f*',
00078 'data': 1
00079 }
00080 )
00081 self.assertEquals(
00082 next(self.pubsub.listen()),
00083 {
00084 'type': 'pmessage',
00085 'pattern': 'f*',
00086 'channel': 'foo',
00087 'data': b('hello foo')
00088 }
00089 )
00090
00091
00092 self.assertEquals(
00093 self.pubsub.punsubscribe('f*'),
00094 None
00095 )
00096
00097 self.assertEquals(
00098 next(self.pubsub.listen()),
00099 {
00100 'type': 'punsubscribe',
00101 'pattern': None,
00102 'channel': 'f*',
00103 'data': 0
00104 }
00105 )
00106
00107
00108 class PubSubRedisDownTestCase(unittest.TestCase):
00109 def setUp(self):
00110 self.connection_pool = redis.ConnectionPool(port=6390)
00111 self.client = redis.Redis(connection_pool=self.connection_pool)
00112 self.pubsub = self.client.pubsub()
00113
00114 def tearDown(self):
00115 self.connection_pool.disconnect()
00116
00117 def test_channel_subscribe(self):
00118 got_exception = False
00119 try:
00120 self.pubsub.subscribe('foo')
00121 except ConnectionError:
00122 got_exception = True
00123 self.assertTrue(got_exception)