compression_example_test.py
Go to the documentation of this file.
1 # Copyright 2019 the gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Test for compression example."""
15 
16 import contextlib
17 import os
18 import socket
19 import subprocess
20 import unittest
21 
22 _BINARY_DIR = os.path.realpath(
23  os.path.join(os.path.dirname(os.path.abspath(__file__)), '..'))
24 _SERVER_PATH = os.path.join(_BINARY_DIR, 'server')
25 _CLIENT_PATH = os.path.join(_BINARY_DIR, 'client')
26 
27 
28 @contextlib.contextmanager
29 def _get_port():
30  sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
31  sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
32  if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 0:
33  raise RuntimeError("Failed to set SO_REUSEPORT.")
34  sock.bind(('', 0))
35  try:
36  yield sock.getsockname()[1]
37  finally:
38  sock.close()
39 
40 
41 class CompressionExampleTest(unittest.TestCase):
42 
44  with _get_port() as test_port:
45  server_process = subprocess.Popen(
46  (_SERVER_PATH, '--port', str(test_port), '--server_compression',
47  'gzip'))
48  try:
49  server_target = 'localhost:{}'.format(test_port)
50  client_process = subprocess.Popen(
51  (_CLIENT_PATH, '--server', server_target,
52  '--channel_compression', 'gzip'))
53  client_return_code = client_process.wait()
54  self.assertEqual(0, client_return_code)
55  self.assertIsNone(server_process.poll())
56  finally:
57  server_process.kill()
58  server_process.wait()
59 
60 
61 if __name__ == '__main__':
62  unittest.main(verbosity=2)
xds_interop_client.str
str
Definition: xds_interop_client.py:487
http2_test_server.format
format
Definition: http2_test_server.py:118
compression_example_test.CompressionExampleTest
Definition: compression_example_test.py:41
compression_example_test._get_port
def _get_port()
Definition: compression_example_test.py:29
compression_example_test.CompressionExampleTest.test_compression_example
def test_compression_example(self)
Definition: compression_example_test.py:43


grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:58:52