_compression_test.py
Go to the documentation of this file.
1 # Copyright 2016 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Tests server and client side compression."""
15 
16 from concurrent import futures
17 import contextlib
18 import functools
19 import itertools
20 import logging
21 import os
22 import unittest
23 
24 import grpc
25 from grpc import _grpcio_metadata
26 
27 from tests.unit import _tcp_proxy
28 from tests.unit.framework.common import test_constants
29 
30 _UNARY_UNARY = '/test/UnaryUnary'
31 _UNARY_STREAM = '/test/UnaryStream'
32 _STREAM_UNARY = '/test/StreamUnary'
33 _STREAM_STREAM = '/test/StreamStream'
34 
35 # Cut down on test time.
36 _STREAM_LENGTH = test_constants.STREAM_LENGTH // 16
37 
38 _HOST = 'localhost'
39 
40 _REQUEST = b'\x00' * 100
41 _COMPRESSION_RATIO_THRESHOLD = 0.05
42 _COMPRESSION_METHODS = (
43  None,
44  # Disabled for test tractability.
45  # grpc.Compression.NoCompression,
46  # grpc.Compression.Deflate,
47  grpc.Compression.Gzip,
48 )
49 _COMPRESSION_NAMES = {
50  None: 'Uncompressed',
51  grpc.Compression.NoCompression: 'NoCompression',
52  grpc.Compression.Deflate: 'DeflateCompression',
53  grpc.Compression.Gzip: 'GzipCompression',
54 }
55 
56 _TEST_OPTIONS = {
57  'client_streaming': (True, False),
58  'server_streaming': (True, False),
59  'channel_compression': _COMPRESSION_METHODS,
60  'multicallable_compression': _COMPRESSION_METHODS,
61  'server_compression': _COMPRESSION_METHODS,
62  'server_call_compression': _COMPRESSION_METHODS,
63 }
64 
65 
66 def _make_handle_unary_unary(pre_response_callback):
67 
68  def _handle_unary(request, servicer_context):
69  if pre_response_callback:
70  pre_response_callback(request, servicer_context)
71  return request
72 
73  return _handle_unary
74 
75 
76 def _make_handle_unary_stream(pre_response_callback):
77 
78  def _handle_unary_stream(request, servicer_context):
79  if pre_response_callback:
80  pre_response_callback(request, servicer_context)
81  for _ in range(_STREAM_LENGTH):
82  yield request
83 
84  return _handle_unary_stream
85 
86 
87 def _make_handle_stream_unary(pre_response_callback):
88 
89  def _handle_stream_unary(request_iterator, servicer_context):
90  if pre_response_callback:
91  pre_response_callback(request_iterator, servicer_context)
92  response = None
93  for request in request_iterator:
94  if not response:
95  response = request
96  return response
97 
98  return _handle_stream_unary
99 
100 
101 def _make_handle_stream_stream(pre_response_callback):
102 
103  def _handle_stream(request_iterator, servicer_context):
104  # TODO(issue:#6891) We should be able to remove this loop,
105  # and replace with return; yield
106  for request in request_iterator:
107  if pre_response_callback:
108  pre_response_callback(request, servicer_context)
109  yield request
110 
111  return _handle_stream
112 
113 
114 def set_call_compression(compression_method, request_or_iterator,
115  servicer_context):
116  del request_or_iterator
117  servicer_context.set_compression(compression_method)
118 
119 
120 def disable_next_compression(request, servicer_context):
121  del request
122  servicer_context.disable_next_message_compression()
123 
124 
125 def disable_first_compression(request, servicer_context):
126  if int(request.decode('ascii')) == 0:
127  servicer_context.disable_next_message_compression()
128 
129 
131 
132  def __init__(self, request_streaming, response_streaming,
133  pre_response_callback):
134  self.request_streaming = request_streaming
135  self.response_streaming = response_streaming
138  self.unary_unary = None
139  self.unary_stream = None
140  self.stream_unary = None
141  self.stream_stream = None
142 
143  if self.request_streaming and self.response_streaming:
145  pre_response_callback)
146  elif not self.request_streaming and not self.response_streaming:
147  self.unary_unary = _make_handle_unary_unary(pre_response_callback)
148  elif not self.request_streaming and self.response_streaming:
149  self.unary_stream = _make_handle_unary_stream(pre_response_callback)
150  else:
151  self.stream_unary = _make_handle_stream_unary(pre_response_callback)
152 
153 
155 
156  def __init__(self, pre_response_callback):
157  self._pre_response_callback = pre_response_callback
158 
159  def service(self, handler_call_details):
160  if handler_call_details.method == _UNARY_UNARY:
161  return _MethodHandler(False, False, self._pre_response_callback)
162  elif handler_call_details.method == _UNARY_STREAM:
163  return _MethodHandler(False, True, self._pre_response_callback)
164  elif handler_call_details.method == _STREAM_UNARY:
165  return _MethodHandler(True, False, self._pre_response_callback)
166  elif handler_call_details.method == _STREAM_STREAM:
167  return _MethodHandler(True, True, self._pre_response_callback)
168  else:
169  return None
170 
171 
172 @contextlib.contextmanager
173 def _instrumented_client_server_pair(channel_kwargs, server_kwargs,
174  server_handler):
175  server = grpc.server(futures.ThreadPoolExecutor(), **server_kwargs)
176  server.add_generic_rpc_handlers((server_handler,))
177  server_port = server.add_insecure_port('{}:0'.format(_HOST))
178  server.start()
179  with _tcp_proxy.TcpProxy(_HOST, _HOST, server_port) as proxy:
180  proxy_port = proxy.get_port()
181  with grpc.insecure_channel('{}:{}'.format(_HOST, proxy_port),
182  **channel_kwargs) as client_channel:
183  try:
184  yield client_channel, proxy, server
185  finally:
186  server.stop(None)
187 
188 
189 def _get_byte_counts(channel_kwargs, multicallable_kwargs, client_function,
190  server_kwargs, server_handler, message):
191  with _instrumented_client_server_pair(channel_kwargs, server_kwargs,
192  server_handler) as pipeline:
193  client_channel, proxy, server = pipeline
194  client_function(client_channel, multicallable_kwargs, message)
195  return proxy.get_byte_count()
196 
197 
198 def _get_compression_ratios(client_function, first_channel_kwargs,
199  first_multicallable_kwargs, first_server_kwargs,
200  first_server_handler, second_channel_kwargs,
201  second_multicallable_kwargs, second_server_kwargs,
202  second_server_handler, message):
203  first_bytes_sent, first_bytes_received = _get_byte_counts(
204  first_channel_kwargs, first_multicallable_kwargs, client_function,
205  first_server_kwargs, first_server_handler, message)
206  second_bytes_sent, second_bytes_received = _get_byte_counts(
207  second_channel_kwargs, second_multicallable_kwargs, client_function,
208  second_server_kwargs, second_server_handler, message)
209  return ((second_bytes_sent - first_bytes_sent) / float(first_bytes_sent),
210  (second_bytes_received - first_bytes_received) /
211  float(first_bytes_received))
212 
213 
214 def _unary_unary_client(channel, multicallable_kwargs, message):
215  multi_callable = channel.unary_unary(_UNARY_UNARY)
216  response = multi_callable(message, **multicallable_kwargs)
217  if response != message:
218  raise RuntimeError("Request '{}' != Response '{}'".format(
219  message, response))
220 
221 
222 def _unary_stream_client(channel, multicallable_kwargs, message):
223  multi_callable = channel.unary_stream(_UNARY_STREAM)
224  response_iterator = multi_callable(message, **multicallable_kwargs)
225  for response in response_iterator:
226  if response != message:
227  raise RuntimeError("Request '{}' != Response '{}'".format(
228  message, response))
229 
230 
231 def _stream_unary_client(channel, multicallable_kwargs, message):
232  multi_callable = channel.stream_unary(_STREAM_UNARY)
233  requests = (_REQUEST for _ in range(_STREAM_LENGTH))
234  response = multi_callable(requests, **multicallable_kwargs)
235  if response != message:
236  raise RuntimeError("Request '{}' != Response '{}'".format(
237  message, response))
238 
239 
240 def _stream_stream_client(channel, multicallable_kwargs, message):
241  multi_callable = channel.stream_stream(_STREAM_STREAM)
242  request_prefix = str(0).encode('ascii') * 100
243  requests = (
244  request_prefix + str(i).encode('ascii') for i in range(_STREAM_LENGTH))
245  response_iterator = multi_callable(requests, **multicallable_kwargs)
246  for i, response in enumerate(response_iterator):
247  if int(response.decode('ascii')) != i:
248  raise RuntimeError("Request '{}' != Response '{}'".format(
249  i, response))
250 
251 
252 class CompressionTest(unittest.TestCase):
253 
254  def assertCompressed(self, compression_ratio):
255  self.assertLess(
256  compression_ratio,
257  -1.0 * _COMPRESSION_RATIO_THRESHOLD,
258  msg='Actual compression ratio: {}'.format(compression_ratio))
259 
260  def assertNotCompressed(self, compression_ratio):
261  self.assertGreaterEqual(
262  compression_ratio,
263  -1.0 * _COMPRESSION_RATIO_THRESHOLD,
264  msg='Actual compession ratio: {}'.format(compression_ratio))
265 
266  def assertConfigurationCompressed(self, client_streaming, server_streaming,
267  channel_compression,
268  multicallable_compression,
269  server_compression,
270  server_call_compression):
271  client_side_compressed = channel_compression or multicallable_compression
272  server_side_compressed = server_compression or server_call_compression
273  channel_kwargs = {
274  'compression': channel_compression,
275  } if channel_compression else {}
276  multicallable_kwargs = {
277  'compression': multicallable_compression,
278  } if multicallable_compression else {}
279 
280  client_function = None
281  if not client_streaming and not server_streaming:
282  client_function = _unary_unary_client
283  elif not client_streaming and server_streaming:
284  client_function = _unary_stream_client
285  elif client_streaming and not server_streaming:
286  client_function = _stream_unary_client
287  else:
288  client_function = _stream_stream_client
289 
290  server_kwargs = {
291  'compression': server_compression,
292  } if server_compression else {}
293  server_handler = _GenericHandler(
294  functools.partial(set_call_compression, grpc.Compression.Gzip)
295  ) if server_call_compression else _GenericHandler(None)
296  _get_compression_ratios(client_function, {}, {}, {},
297  _GenericHandler(None), channel_kwargs,
298  multicallable_kwargs, server_kwargs,
299  server_handler, _REQUEST)
300 
302  server_kwargs = {
303  'compression': grpc.Compression.Deflate,
304  }
305  _get_compression_ratios(_stream_stream_client, {}, {}, {},
306  _GenericHandler(None), {}, {}, server_kwargs,
307  _GenericHandler(disable_next_compression),
308  _REQUEST)
309 
311  server_kwargs = {
312  'compression': grpc.Compression.Deflate,
313  }
314  _get_compression_ratios(_stream_stream_client, {}, {}, {},
315  _GenericHandler(None), {}, {}, server_kwargs,
316  _GenericHandler(disable_first_compression),
317  _REQUEST)
318 
319 
320 def _get_compression_str(name, value):
321  return '{}{}'.format(name, _COMPRESSION_NAMES[value])
322 
323 
324 def _get_compression_test_name(client_streaming, server_streaming,
325  channel_compression, multicallable_compression,
326  server_compression, server_call_compression):
327  client_arity = 'Stream' if client_streaming else 'Unary'
328  server_arity = 'Stream' if server_streaming else 'Unary'
329  arity = '{}{}'.format(client_arity, server_arity)
330  channel_compression_str = _get_compression_str('Channel',
331  channel_compression)
332  multicallable_compression_str = _get_compression_str(
333  'Multicallable', multicallable_compression)
334  server_compression_str = _get_compression_str('Server', server_compression)
335  server_call_compression_str = _get_compression_str('ServerCall',
336  server_call_compression)
337  return 'test{}{}{}{}{}'.format(arity, channel_compression_str,
338  multicallable_compression_str,
339  server_compression_str,
340  server_call_compression_str)
341 
342 
344  for test_parameters in itertools.product(*_TEST_OPTIONS.values()):
345  yield dict(zip(_TEST_OPTIONS.keys(), test_parameters))
346 
347 
348 for options in _test_options():
349 
350  def test_compression(**kwargs):
351 
352  def _test_compression(self):
353  self.assertConfigurationCompressed(**kwargs)
354 
355  return _test_compression
356 
357  setattr(CompressionTest, _get_compression_test_name(**options),
358  test_compression(**options))
359 
360 if __name__ == '__main__':
361  logging.basicConfig()
362  unittest.main(verbosity=2)
tests.unit._compression_test._test_options
def _test_options()
Definition: _compression_test.py:343
tests.unit._compression_test._get_compression_ratios
def _get_compression_ratios(client_function, first_channel_kwargs, first_multicallable_kwargs, first_server_kwargs, first_server_handler, second_channel_kwargs, second_multicallable_kwargs, second_server_kwargs, second_server_handler, message)
Definition: _compression_test.py:198
xds_interop_client.str
str
Definition: xds_interop_client.py:487
grpc._server._handle_stream_unary
def _handle_stream_unary(rpc_event, state, method_handler, default_thread_pool)
Definition: grpc/_server.py:654
grpc.insecure_channel
def insecure_channel(target, options=None, compression=None)
Definition: src/python/grpcio/grpc/__init__.py:1962
tests.unit._compression_test._MethodHandler.request_streaming
request_streaming
Definition: _compression_test.py:133
grpc._server._handle_unary_stream
def _handle_unary_stream(rpc_event, state, method_handler, default_thread_pool)
Definition: grpc/_server.py:643
http2_test_server.format
format
Definition: http2_test_server.py:118
tests.unit._compression_test._GenericHandler._pre_response_callback
_pre_response_callback
Definition: _compression_test.py:157
tests.unit._compression_test._get_byte_counts
def _get_byte_counts(channel_kwargs, multicallable_kwargs, client_function, server_kwargs, server_handler, message)
Definition: _compression_test.py:189
tests.unit._compression_test._make_handle_unary_stream
def _make_handle_unary_stream(pre_response_callback)
Definition: _compression_test.py:76
capstone.range
range
Definition: third_party/bloaty/third_party/capstone/bindings/python/capstone/__init__.py:6
tests.unit._compression_test._MethodHandler.stream_stream
stream_stream
Definition: _compression_test.py:140
tests.unit._tcp_proxy.TcpProxy
Definition: _tcp_proxy.py:41
tests.unit._compression_test._unary_unary_client
def _unary_unary_client(channel, multicallable_kwargs, message)
Definition: _compression_test.py:214
grpc._common.encode
def encode(s)
Definition: grpc/_common.py:68
tests.unit._compression_test._MethodHandler.unary_stream
unary_stream
Definition: _compression_test.py:138
tests.unit._compression_test.CompressionTest.assertCompressed
def assertCompressed(self, compression_ratio)
Definition: _compression_test.py:254
tests.unit._compression_test.disable_first_compression
def disable_first_compression(request, servicer_context)
Definition: _compression_test.py:125
tests.unit._compression_test._get_compression_test_name
def _get_compression_test_name(client_streaming, server_streaming, channel_compression, multicallable_compression, server_compression, server_call_compression)
Definition: _compression_test.py:324
tests.unit._exit_scenarios.multi_callable
multi_callable
Definition: _exit_scenarios.py:216
tests.unit._compression_test._make_handle_stream_unary
def _make_handle_stream_unary(pre_response_callback)
Definition: _compression_test.py:87
xds_interop_client.int
int
Definition: xds_interop_client.py:113
tests.unit._compression_test._MethodHandler
Definition: _compression_test.py:130
tests.unit._compression_test._MethodHandler.response_serializer
response_serializer
Definition: _compression_test.py:136
grpc.GenericRpcHandler
Definition: src/python/grpcio/grpc/__init__.py:1333
tests.unit._compression_test._MethodHandler.__init__
def __init__(self, request_streaming, response_streaming, pre_response_callback)
Definition: _compression_test.py:132
tests.unit._compression_test._get_compression_str
def _get_compression_str(name, value)
Definition: _compression_test.py:320
tests.unit._compression_test.set_call_compression
def set_call_compression(compression_method, request_or_iterator, servicer_context)
Definition: _compression_test.py:114
tests.unit._compression_test._make_handle_unary_unary
def _make_handle_unary_unary(pre_response_callback)
Definition: _compression_test.py:66
tests.unit._compression_test._stream_unary_client
def _stream_unary_client(channel, multicallable_kwargs, message)
Definition: _compression_test.py:231
grpc.server
def server(thread_pool, handlers=None, interceptors=None, options=None, maximum_concurrent_rpcs=None, compression=None, xds=False)
Definition: src/python/grpcio/grpc/__init__.py:2034
tests.unit._compression_test.disable_next_compression
def disable_next_compression(request, servicer_context)
Definition: _compression_test.py:120
tests.unit._compression_test.CompressionTest.assertConfigurationCompressed
def assertConfigurationCompressed(self, client_streaming, server_streaming, channel_compression, multicallable_compression, server_compression, server_call_compression)
Definition: _compression_test.py:266
tests.unit._compression_test._unary_stream_client
def _unary_stream_client(channel, multicallable_kwargs, message)
Definition: _compression_test.py:222
tests.unit
Definition: src/python/grpcio_tests/tests/unit/__init__.py:1
tests.unit._compression_test._MethodHandler.request_deserializer
request_deserializer
Definition: _compression_test.py:135
tests.unit._compression_test.CompressionTest
Definition: _compression_test.py:252
tests.unit._compression_test._stream_stream_client
def _stream_stream_client(channel, multicallable_kwargs, message)
Definition: _compression_test.py:240
tests.unit._compression_test._GenericHandler
Definition: _compression_test.py:154
tests.unit._compression_test.CompressionTest.testDisableNextCompressionStreaming
def testDisableNextCompressionStreaming(self)
Definition: _compression_test.py:301
tests.unit._compression_test.test_compression
def test_compression(**kwargs)
Definition: _compression_test.py:350
tests.unit._compression_test._GenericHandler.service
def service(self, handler_call_details)
Definition: _compression_test.py:159
tests.unit._compression_test.CompressionTest.assertNotCompressed
def assertNotCompressed(self, compression_ratio)
Definition: _compression_test.py:260
tests.unit.framework.common
Definition: src/python/grpcio_tests/tests/unit/framework/common/__init__.py:1
tests.unit._compression_test.CompressionTest.testDisableNextCompressionStreamingResets
def testDisableNextCompressionStreamingResets(self)
Definition: _compression_test.py:310
tests.unit._compression_test._make_handle_stream_stream
def _make_handle_stream_stream(pre_response_callback)
Definition: _compression_test.py:101
tests.unit._compression_test._MethodHandler.response_streaming
response_streaming
Definition: _compression_test.py:134
tests.unit._compression_test._MethodHandler.stream_unary
stream_unary
Definition: _compression_test.py:139
tests.unit._compression_test._GenericHandler.__init__
def __init__(self, pre_response_callback)
Definition: _compression_test.py:156
tests.unit._compression_test._MethodHandler.unary_unary
unary_unary
Definition: _compression_test.py:137
grpc.RpcMethodHandler
Definition: src/python/grpcio/grpc/__init__.py:1288
tests.unit._compression_test._instrumented_client_server_pair
def _instrumented_client_server_pair(channel_kwargs, server_kwargs, server_handler)
Definition: _compression_test.py:173


grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:58:27