14 """Tests behavior around the compression mechanism."""
28 _GZIP_CHANNEL_ARGUMENT = (
'grpc.default_compression_algorithm', 2)
29 _GZIP_DISABLED_CHANNEL_ARGUMENT = (
'grpc.compression_enabled_algorithms_bitset',
31 _DEFLATE_DISABLED_CHANNEL_ARGUMENT = (
32 'grpc.compression_enabled_algorithms_bitset', 5)
34 _TEST_UNARY_UNARY =
'/test/TestUnaryUnary'
35 _TEST_SET_COMPRESSION =
'/test/TestSetCompression'
36 _TEST_DISABLE_COMPRESSION_UNARY =
'/test/TestDisableCompressionUnary'
37 _TEST_DISABLE_COMPRESSION_STREAM =
'/test/TestDisableCompressionStream'
39 _REQUEST = b
'\x01' * 100
40 _RESPONSE = b
'\x02' * 100
48 assert _REQUEST == await context.read()
49 context.set_compression(grpc.Compression.Deflate)
50 await context.write(_RESPONSE)
52 context.set_compression(grpc.Compression.Deflate)
61 'Expecting exceptions if set_compression is not effective')
65 assert _REQUEST == request
66 context.set_compression(grpc.Compression.Deflate)
67 context.disable_next_message_compression()
72 assert _REQUEST == await context.read()
73 context.set_compression(grpc.Compression.Deflate)
74 await context.write(_RESPONSE)
75 context.disable_next_message_compression()
76 await context.write(_RESPONSE)
77 await context.write(_RESPONSE)
83 _TEST_SET_COMPRESSION:
85 _TEST_DISABLE_COMPRESSION_UNARY:
87 _TEST_DISABLE_COMPRESSION_STREAM:
95 return _ROUTING_TABLE.get(handler_call_details.method)
99 server = aio.server(options=options)
100 port = server.add_insecure_port(
'[::]:0')
103 return f
'localhost:{port}', server
109 server_options = (_GZIP_DISABLED_CHANNEL_ARGUMENT,)
111 self.
_channel = aio.insecure_channel(self._address)
119 async
with aio.insecure_channel(
120 self._address, compression=grpc.Compression.Gzip)
as channel:
121 multicallable = channel.unary_unary(_TEST_UNARY_UNARY)
122 call = multicallable(_REQUEST)
123 with self.assertRaises(aio.AioRpcError)
as exception_context:
125 rpc_error = exception_context.exception
126 self.assertEqual(grpc.StatusCode.UNIMPLEMENTED, rpc_error.code())
130 async
with aio.insecure_channel(
131 self._address, compression=grpc.Compression.Deflate)
as channel:
132 multicallable = channel.unary_unary(_TEST_UNARY_UNARY)
133 call = multicallable(_REQUEST)
134 self.assertEqual(grpc.StatusCode.OK, await call.code())
140 call = multicallable(_REQUEST, compression=grpc.Compression.Gzip)
141 with self.assertRaises(aio.AioRpcError)
as exception_context:
143 rpc_error = exception_context.exception
144 self.assertEqual(grpc.StatusCode.UNIMPLEMENTED, rpc_error.code())
150 call = multicallable(_REQUEST, compression=grpc.Compression.Deflate)
151 self.assertEqual(grpc.StatusCode.OK, await call.code())
155 call = multicallable()
156 await call.write(_REQUEST)
157 await call.done_writing()
158 self.assertEqual(_RESPONSE, await call.read())
159 self.assertEqual(grpc.StatusCode.OK, await call.code())
163 _TEST_DISABLE_COMPRESSION_UNARY)
164 call = multicallable(_REQUEST)
165 self.assertEqual(_RESPONSE, await call)
166 self.assertEqual(grpc.StatusCode.OK, await call.code())
170 _TEST_DISABLE_COMPRESSION_STREAM)
171 call = multicallable()
172 await call.write(_REQUEST)
173 await call.done_writing()
174 self.assertEqual(_RESPONSE, await call.read())
175 self.assertEqual(_RESPONSE, await call.read())
176 self.assertEqual(_RESPONSE, await call.read())
177 self.assertEqual(grpc.StatusCode.OK, await call.code())
180 server = aio.server(compression=grpc.Compression.Deflate)
181 port = server.add_insecure_port(
'[::]:0')
185 async
with aio.insecure_channel(f
'localhost:{port}')
as channel:
186 multicallable = channel.unary_unary(_TEST_UNARY_UNARY)
187 call = multicallable(_REQUEST)
188 self.assertEqual(_RESPONSE, await call)
189 self.assertEqual(grpc.StatusCode.OK, await call.code())
191 await server.stop(
None)
194 if __name__ ==
'__main__':
195 logging.basicConfig(level=logging.DEBUG)
196 unittest.main(verbosity=2)