_fork_interop_test.py
Go to the documentation of this file.
1 # Copyright 2019 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Client-side fork interop tests as a unit test."""
15 
16 import subprocess
17 import sys
18 import tempfile
19 import threading
20 import unittest
21 
22 from grpc._cython import cygrpc
23 import six
24 
25 from tests.fork import methods
26 
27 # New instance of multiprocessing.Process using fork without exec can and will
28 # freeze if the Python process has any other threads running. This includes the
29 # additional thread spawned by our _runner.py class. So in order to test our
30 # compatibility with multiprocessing, we first fork+exec a new process to ensure
31 # we don't have any conflicting background threads.
32 _CLIENT_FORK_SCRIPT_TEMPLATE = """if True:
33  import os
34  import sys
35  from grpc._cython import cygrpc
36  from tests.fork import methods
37 
38  cygrpc._GRPC_ENABLE_FORK_SUPPORT = True
39  os.environ['GRPC_POLL_STRATEGY'] = 'epoll1'
40  methods.TestCase.%s.run_test({
41  'server_host': 'localhost',
42  'server_port': %d,
43  'use_tls': False
44  })
45 """
46 _SUBPROCESS_TIMEOUT_S = 30
47 
48 
49 @unittest.skipUnless(
50  sys.platform.startswith("linux"),
51  "not supported on windows, and fork+exec networking blocked on mac")
52 @unittest.skipUnless(six.PY2, "https://github.com/grpc/grpc/issues/18075")
53 class ForkInteropTest(unittest.TestCase):
54 
55  def setUp(self):
56  start_server_script = """if True:
57  import sys
58  import time
59 
60  import grpc
61  from src.proto.grpc.testing import test_pb2_grpc
62  from tests.interop import service as interop_service
63  from tests.unit import test_common
64 
65  server = test_common.test_server()
66  test_pb2_grpc.add_TestServiceServicer_to_server(
67  interop_service.TestService(), server)
68  port = server.add_insecure_port('[::]:0')
69  server.start()
70  print(port)
71  sys.stdout.flush()
72  while True:
73  time.sleep(1)
74  """
75  streams = tuple(tempfile.TemporaryFile() for _ in range(2))
76  self._server_process = subprocess.Popen(
77  [sys.executable, '-c', start_server_script],
78  stdout=streams[0],
79  stderr=streams[1])
80  timer = threading.Timer(_SUBPROCESS_TIMEOUT_S,
81  self._server_process.kill)
82  try:
83  timer.start()
84  while True:
85  streams[0].seek(0)
86  s = streams[0].readline()
87  if not s:
88  continue
89  else:
90  self._port = int(s)
91  break
92  except ValueError:
93  raise Exception('Failed to get port from server')
94  finally:
95  timer.cancel()
96 
98  self._verifyTestCase(methods.TestCase.CONNECTIVITY_WATCH)
99 
101  self._verifyTestCase(methods.TestCase.CLOSE_CHANNEL_BEFORE_FORK)
102 
104  self._verifyTestCase(methods.TestCase.ASYNC_UNARY_SAME_CHANNEL)
105 
107  self._verifyTestCase(methods.TestCase.ASYNC_UNARY_NEW_CHANNEL)
108 
110  self._verifyTestCase(methods.TestCase.BLOCKING_UNARY_SAME_CHANNEL)
111 
113  self._verifyTestCase(methods.TestCase.BLOCKING_UNARY_NEW_CHANNEL)
114 
116  self._verifyTestCase(methods.TestCase.IN_PROGRESS_BIDI_CONTINUE_CALL)
117 
119  self._verifyTestCase(
120  methods.TestCase.IN_PROGRESS_BIDI_SAME_CHANNEL_ASYNC_CALL)
121 
123  self._verifyTestCase(
124  methods.TestCase.IN_PROGRESS_BIDI_SAME_CHANNEL_BLOCKING_CALL)
125 
127  self._verifyTestCase(
128  methods.TestCase.IN_PROGRESS_BIDI_NEW_CHANNEL_ASYNC_CALL)
129 
131  self._verifyTestCase(
132  methods.TestCase.IN_PROGRESS_BIDI_NEW_CHANNEL_BLOCKING_CALL)
133 
134  def tearDown(self):
135  self._server_process.kill()
136 
137  def _verifyTestCase(self, test_case):
138  script = _CLIENT_FORK_SCRIPT_TEMPLATE % (test_case.name, self._port)
139  streams = tuple(tempfile.TemporaryFile() for _ in range(2))
140  process = subprocess.Popen([sys.executable, '-c', script],
141  stdout=streams[0],
142  stderr=streams[1])
143  timer = threading.Timer(_SUBPROCESS_TIMEOUT_S, process.kill)
144  timer.start()
145  process.wait()
146  timer.cancel()
147  outputs = []
148  for stream in streams:
149  stream.seek(0)
150  outputs.append(stream.read())
151  self.assertEqual(
152  0, process.returncode,
153  'process failed with exit code %d (stdout: "%s", stderr: "%s")' %
154  (process.returncode, outputs[0], outputs[1]))
155 
156 
157 if __name__ == '__main__':
158  unittest.main(verbosity=2)
tests.fork
Definition: src/python/grpcio_tests/tests/fork/__init__.py:1
capstone.range
range
Definition: third_party/bloaty/third_party/capstone/bindings/python/capstone/__init__.py:6
tests.fork._fork_interop_test.ForkInteropTest.testCloseChannelBeforeFork
def testCloseChannelBeforeFork(self)
Definition: _fork_interop_test.py:100
tests.fork._fork_interop_test.ForkInteropTest.testInProgressBidiSameChannelBlockingCall
def testInProgressBidiSameChannelBlockingCall(self)
Definition: _fork_interop_test.py:122
tests.fork._fork_interop_test.ForkInteropTest.testBlockingUnaryNewChannel
def testBlockingUnaryNewChannel(self)
Definition: _fork_interop_test.py:112
tests.fork._fork_interop_test.ForkInteropTest._port
_port
Definition: _fork_interop_test.py:90
tests.fork._fork_interop_test.ForkInteropTest.testBlockingUnarySameChannel
def testBlockingUnarySameChannel(self)
Definition: _fork_interop_test.py:109
xds_interop_client.int
int
Definition: xds_interop_client.py:113
tests.fork._fork_interop_test.ForkInteropTest
Definition: _fork_interop_test.py:53
tests.fork._fork_interop_test.ForkInteropTest.testAsyncUnarySameChannel
def testAsyncUnarySameChannel(self)
Definition: _fork_interop_test.py:103
tests.fork._fork_interop_test.ForkInteropTest.tearDown
def tearDown(self)
Definition: _fork_interop_test.py:134
tests.fork._fork_interop_test.ForkInteropTest.testInProgressBidiNewChannelBlockingCall
def testInProgressBidiNewChannelBlockingCall(self)
Definition: _fork_interop_test.py:130
tests.fork._fork_interop_test.ForkInteropTest.testInProgressBidiNewChannelAsyncCall
def testInProgressBidiNewChannelAsyncCall(self)
Definition: _fork_interop_test.py:126
tests.fork._fork_interop_test.ForkInteropTest.setUp
def setUp(self)
Definition: _fork_interop_test.py:55
tests.fork._fork_interop_test.ForkInteropTest.testInProgressBidiContinueCall
def testInProgressBidiContinueCall(self)
Definition: _fork_interop_test.py:115
tests.fork._fork_interop_test.ForkInteropTest.testConnectivityWatch
def testConnectivityWatch(self)
Definition: _fork_interop_test.py:97
tests.fork._fork_interop_test.ForkInteropTest._server_process
_server_process
Definition: _fork_interop_test.py:76
grpc._cython
Definition: src/python/grpcio/grpc/_cython/__init__.py:1
tests.fork._fork_interop_test.ForkInteropTest._verifyTestCase
def _verifyTestCase(self, test_case)
Definition: _fork_interop_test.py:137
tests.fork._fork_interop_test.ForkInteropTest.testInProgressBidiSameChannelAsyncCall
def testInProgressBidiSameChannelAsyncCall(self)
Definition: _fork_interop_test.py:118
tests.fork._fork_interop_test.ForkInteropTest.testAsyncUnaryNewChannel
def testAsyncUnaryNewChannel(self)
Definition: _fork_interop_test.py:106


grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:58:27