14 """A smoke test for memory leaks on short-lived channels without close.
16 This test doesn't guarantee all resources are cleaned if `Channel.close` is not
17 explicitly invoked. The recommended way of using Channel object is using `with`
18 clause, and let context manager automatically close the channel.
21 from concurrent.futures
import ThreadPoolExecutor
30 _TEST_METHOD =
'/test/Test'
31 _REQUEST = b
'\x23\x33'
32 _LARGE_NUM_OF_ITERATIONS = 5000
35 _FAIL_THRESHOLD = 25 * 1024 * 1024
39 return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
43 if x > 1024 * 1024 * 1024:
44 return "%.2f GiB" % (x / 1024.0 / 1024 / 1024)
46 return "%.2f MiB" % (x / 1024.0 / 1024)
48 return "%.2f KiB" % (x / 1024.0)
56 if handler_call_details.method == _TEST_METHOD:
61 server =
grpc.server(ThreadPoolExecutor(max_workers=1),
62 options=((
'grpc.so_reuseport', 0),))
64 port = server.add_insecure_port(
'localhost:0')
66 return 'localhost:%d' % port, server
71 multicallable = channel.unary_unary(_TEST_METHOD)
72 response = multicallable(_REQUEST)
73 assert _REQUEST == response
85 for n
in range(_LARGE_NUM_OF_ITERATIONS):
90 if diff > _FAIL_THRESHOLD:
91 self.fail(
"Max RSS inflated {} > {}".
format(
96 if __name__ ==
"__main__":
97 logging.basicConfig(level=logging.DEBUG)
98 unittest.main(verbosity=2)