compression_test.py
Go to the documentation of this file.
1 # Copyright 2020 The gRPC Authors
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Tests behavior around the compression mechanism."""
15 
16 import asyncio
17 import logging
18 import platform
19 import random
20 import unittest
21 
22 import grpc
23 from grpc.experimental import aio
24 
25 from tests_aio.unit import _common
26 from tests_aio.unit._test_base import AioTestBase
27 
28 _GZIP_CHANNEL_ARGUMENT = ('grpc.default_compression_algorithm', 2)
29 _GZIP_DISABLED_CHANNEL_ARGUMENT = ('grpc.compression_enabled_algorithms_bitset',
30  3)
31 _DEFLATE_DISABLED_CHANNEL_ARGUMENT = (
32  'grpc.compression_enabled_algorithms_bitset', 5)
33 
34 _TEST_UNARY_UNARY = '/test/TestUnaryUnary'
35 _TEST_SET_COMPRESSION = '/test/TestSetCompression'
36 _TEST_DISABLE_COMPRESSION_UNARY = '/test/TestDisableCompressionUnary'
37 _TEST_DISABLE_COMPRESSION_STREAM = '/test/TestDisableCompressionStream'
38 
39 _REQUEST = b'\x01' * 100
40 _RESPONSE = b'\x02' * 100
41 
42 
43 async def _test_unary_unary(unused_request, unused_context):
44  return _RESPONSE
45 
46 
47 async def _test_set_compression(unused_request_iterator, context):
48  assert _REQUEST == await context.read()
49  context.set_compression(grpc.Compression.Deflate)
50  await context.write(_RESPONSE)
51  try:
52  context.set_compression(grpc.Compression.Deflate)
53  except RuntimeError:
54  # NOTE(lidiz) Testing if the servicer context raises exception when
55  # the set_compression method is called after initial_metadata sent.
56  # After the initial_metadata sent, the server-side has no control over
57  # which compression algorithm it should use.
58  pass
59  else:
60  raise ValueError(
61  'Expecting exceptions if set_compression is not effective')
62 
63 
64 async def _test_disable_compression_unary(request, context):
65  assert _REQUEST == request
66  context.set_compression(grpc.Compression.Deflate)
67  context.disable_next_message_compression()
68  return _RESPONSE
69 
70 
71 async def _test_disable_compression_stream(unused_request_iterator, context):
72  assert _REQUEST == await context.read()
73  context.set_compression(grpc.Compression.Deflate)
74  await context.write(_RESPONSE)
75  context.disable_next_message_compression()
76  await context.write(_RESPONSE)
77  await context.write(_RESPONSE)
78 
79 
80 _ROUTING_TABLE = {
81  _TEST_UNARY_UNARY:
82  grpc.unary_unary_rpc_method_handler(_test_unary_unary),
83  _TEST_SET_COMPRESSION:
84  grpc.stream_stream_rpc_method_handler(_test_set_compression),
85  _TEST_DISABLE_COMPRESSION_UNARY:
86  grpc.unary_unary_rpc_method_handler(_test_disable_compression_unary),
87  _TEST_DISABLE_COMPRESSION_STREAM:
88  grpc.stream_stream_rpc_method_handler(_test_disable_compression_stream),
89 }
90 
91 
93 
94  def service(self, handler_call_details):
95  return _ROUTING_TABLE.get(handler_call_details.method)
96 
97 
98 async def _start_test_server(options=None):
99  server = aio.server(options=options)
100  port = server.add_insecure_port('[::]:0')
101  server.add_generic_rpc_handlers((_GenericHandler(),))
102  await server.start()
103  return f'localhost:{port}', server
104 
105 
107 
108  async def setUp(self):
109  server_options = (_GZIP_DISABLED_CHANNEL_ARGUMENT,)
110  self._address, self._server = await _start_test_server(server_options)
111  self._channel = aio.insecure_channel(self._address)
112 
113  async def tearDown(self):
114  await self._channel.close()
115  await self._server.stop(None)
116 
118  # GZIP is disabled, this call should fail
119  async with aio.insecure_channel(
120  self._address, compression=grpc.Compression.Gzip) as channel:
121  multicallable = channel.unary_unary(_TEST_UNARY_UNARY)
122  call = multicallable(_REQUEST)
123  with self.assertRaises(aio.AioRpcError) as exception_context:
124  await call
125  rpc_error = exception_context.exception
126  self.assertEqual(grpc.StatusCode.UNIMPLEMENTED, rpc_error.code())
127 
129  # Deflate is allowed, this call should succeed
130  async with aio.insecure_channel(
131  self._address, compression=grpc.Compression.Deflate) as channel:
132  multicallable = channel.unary_unary(_TEST_UNARY_UNARY)
133  call = multicallable(_REQUEST)
134  self.assertEqual(grpc.StatusCode.OK, await call.code())
135 
137  multicallable = self._channel.unary_unary(_TEST_UNARY_UNARY)
138 
139  # GZIP is disabled, this call should fail
140  call = multicallable(_REQUEST, compression=grpc.Compression.Gzip)
141  with self.assertRaises(aio.AioRpcError) as exception_context:
142  await call
143  rpc_error = exception_context.exception
144  self.assertEqual(grpc.StatusCode.UNIMPLEMENTED, rpc_error.code())
145 
147  multicallable = self._channel.unary_unary(_TEST_UNARY_UNARY)
148 
149  # Deflate is allowed, this call should succeed
150  call = multicallable(_REQUEST, compression=grpc.Compression.Deflate)
151  self.assertEqual(grpc.StatusCode.OK, await call.code())
152 
154  multicallable = self._channel.stream_stream(_TEST_SET_COMPRESSION)
155  call = multicallable()
156  await call.write(_REQUEST)
157  await call.done_writing()
158  self.assertEqual(_RESPONSE, await call.read())
159  self.assertEqual(grpc.StatusCode.OK, await call.code())
160 
162  multicallable = self._channel.unary_unary(
163  _TEST_DISABLE_COMPRESSION_UNARY)
164  call = multicallable(_REQUEST)
165  self.assertEqual(_RESPONSE, await call)
166  self.assertEqual(grpc.StatusCode.OK, await call.code())
167 
169  multicallable = self._channel.stream_stream(
170  _TEST_DISABLE_COMPRESSION_STREAM)
171  call = multicallable()
172  await call.write(_REQUEST)
173  await call.done_writing()
174  self.assertEqual(_RESPONSE, await call.read())
175  self.assertEqual(_RESPONSE, await call.read())
176  self.assertEqual(_RESPONSE, await call.read())
177  self.assertEqual(grpc.StatusCode.OK, await call.code())
178 
180  server = aio.server(compression=grpc.Compression.Deflate)
181  port = server.add_insecure_port('[::]:0')
182  server.add_generic_rpc_handlers((_GenericHandler(),))
183  await server.start()
184 
185  async with aio.insecure_channel(f'localhost:{port}') as channel:
186  multicallable = channel.unary_unary(_TEST_UNARY_UNARY)
187  call = multicallable(_REQUEST)
188  self.assertEqual(_RESPONSE, await call)
189  self.assertEqual(grpc.StatusCode.OK, await call.code())
190 
191  await server.stop(None)
192 
193 
194 if __name__ == '__main__':
195  logging.basicConfig(level=logging.DEBUG)
196  unittest.main(verbosity=2)
grpc.unary_unary_rpc_method_handler
def unary_unary_rpc_method_handler(behavior, request_deserializer=None, response_serializer=None)
Definition: src/python/grpcio/grpc/__init__.py:1510
tests_aio.unit
Definition: src/python/grpcio_tests/tests_aio/unit/__init__.py:1
tests_aio.unit.compression_test.TestCompression.test_channel_level_compression_allowed_compression
def test_channel_level_compression_allowed_compression(self)
Definition: compression_test.py:128
tests_aio.unit.compression_test._test_set_compression
def _test_set_compression(unused_request_iterator, context)
Definition: compression_test.py:47
tests_aio.unit.compression_test._GenericHandler.service
def service(self, handler_call_details)
Definition: compression_test.py:94
tests_aio.unit.compression_test.TestCompression.test_server_disable_compression_unary
def test_server_disable_compression_unary(self)
Definition: compression_test.py:161
tests_aio.unit._test_base
Definition: _test_base.py:1
tests_aio.unit.compression_test.TestCompression.tearDown
def tearDown(self)
Definition: compression_test.py:113
tests_aio.unit.compression_test.TestCompression.test_client_call_level_compression_allowed_compression
def test_client_call_level_compression_allowed_compression(self)
Definition: compression_test.py:146
tests_aio.unit.compression_test.TestCompression._channel
_channel
Definition: compression_test.py:111
tests_aio.unit.compression_test._start_test_server
def _start_test_server(options=None)
Definition: compression_test.py:98
grpc::experimental
Definition: include/grpcpp/channel.h:46
tests_aio.unit.compression_test.TestCompression.setUp
def setUp(self)
Definition: compression_test.py:108
grpc._simple_stubs.stream_stream
Iterator[ResponseType] stream_stream(Iterator[RequestType] request_iterator, str target, str method, Optional[Callable[[Any], bytes]] request_serializer=None, Optional[Callable[[bytes], Any]] response_deserializer=None, Sequence[Tuple[AnyStr, AnyStr]] options=(), Optional[grpc.ChannelCredentials] channel_credentials=None, bool insecure=False, Optional[grpc.CallCredentials] call_credentials=None, Optional[grpc.Compression] compression=None, Optional[bool] wait_for_ready=None, Optional[float] timeout=_DEFAULT_TIMEOUT, Optional[Sequence[Tuple[str, Union[str, bytes]]]] metadata=None)
Definition: _simple_stubs.py:410
grpc.GenericRpcHandler
Definition: src/python/grpcio/grpc/__init__.py:1333
close
#define close
Definition: test-fs.c:48
tests_aio.unit.compression_test.TestCompression.test_server_disable_compression_stream
def test_server_disable_compression_stream(self)
Definition: compression_test.py:168
tests_aio.unit.compression_test._GenericHandler
Definition: compression_test.py:92
grpc._simple_stubs.unary_unary
ResponseType unary_unary(RequestType request, str target, str method, Optional[Callable[[Any], bytes]] request_serializer=None, Optional[Callable[[bytes], Any]] response_deserializer=None, Sequence[Tuple[AnyStr, AnyStr]] options=(), Optional[grpc.ChannelCredentials] channel_credentials=None, bool insecure=False, Optional[grpc.CallCredentials] call_credentials=None, Optional[grpc.Compression] compression=None, Optional[bool] wait_for_ready=None, Optional[float] timeout=_DEFAULT_TIMEOUT, Optional[Sequence[Tuple[str, Union[str, bytes]]]] metadata=None)
Definition: _simple_stubs.py:169
tests_aio.unit.compression_test.TestCompression.test_client_call_level_compression_baned_compression
def test_client_call_level_compression_baned_compression(self)
Definition: compression_test.py:136
tests_aio.unit.compression_test._test_unary_unary
def _test_unary_unary(unused_request, unused_context)
Definition: compression_test.py:43
tests_aio.unit.compression_test.TestCompression.test_server_call_level_compression
def test_server_call_level_compression(self)
Definition: compression_test.py:153
tests_aio.unit.compression_test._test_disable_compression_stream
def _test_disable_compression_stream(unused_request_iterator, context)
Definition: compression_test.py:71
tests_aio.unit.compression_test.TestCompression.test_channel_level_compression_baned_compression
def test_channel_level_compression_baned_compression(self)
Definition: compression_test.py:117
grpc.stream_stream_rpc_method_handler
def stream_stream_rpc_method_handler(behavior, request_deserializer=None, response_serializer=None)
Definition: src/python/grpcio/grpc/__init__.py:1570
stop
static const char stop[]
Definition: benchmark-async-pummel.c:35
tests_aio.unit.compression_test.TestCompression
Definition: compression_test.py:106
tests_aio.unit.compression_test._test_disable_compression_unary
def _test_disable_compression_unary(request, context)
Definition: compression_test.py:64
tests_aio.unit.compression_test.TestCompression._server
_server
Definition: compression_test.py:110
tests_aio.unit.compression_test.TestCompression.test_server_default_compression_algorithm
def test_server_default_compression_algorithm(self)
Definition: compression_test.py:179
tests_aio.unit._test_base.AioTestBase
Definition: _test_base.py:49


grpc
Author(s):
autogenerated on Fri May 16 2025 02:58:00