14 """Test for compression example."""
22 _BINARY_DIR = os.path.realpath(
23 os.path.join(os.path.dirname(os.path.abspath(__file__)),
'..'))
24 _SERVER_PATH = os.path.join(_BINARY_DIR,
'server')
25 _CLIENT_PATH = os.path.join(_BINARY_DIR,
'client')
28 @contextlib.contextmanager
30 sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
31 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
32 if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 0:
33 raise RuntimeError(
"Failed to set SO_REUSEPORT.")
36 yield sock.getsockname()[1]
45 server_process = subprocess.Popen(
46 (_SERVER_PATH,
'--port',
str(test_port),
'--server_compression',
49 server_target =
'localhost:{}'.
format(test_port)
50 client_process = subprocess.Popen(
51 (_CLIENT_PATH,
'--server', server_target,
52 '--channel_compression',
'gzip'))
53 client_return_code = client_process.wait()
54 self.assertEqual(0, client_return_code)
55 self.assertIsNone(server_process.poll())
61 if __name__ ==
'__main__':
62 unittest.main(verbosity=2)