_signal_handling_test.py
Go to the documentation of this file.
1 # Copyright 2019 the gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Test of responsiveness to signals."""
15 
16 import logging
17 import os
18 import signal
19 import subprocess
20 import sys
21 import tempfile
22 import threading
23 import unittest
24 
25 import grpc
26 
27 from tests.unit import _signal_client
28 from tests.unit import test_common
29 
30 _CLIENT_PATH = None
31 if sys.executable is not None:
32  _CLIENT_PATH = os.path.abspath(os.path.realpath(_signal_client.__file__))
33 else:
34  # NOTE(rbellevi): For compatibility with internal testing.
35  if len(sys.argv) != 2:
36  raise RuntimeError("Must supply path to executable client.")
37  client_name = sys.argv[1].split("/")[-1]
38  del sys.argv[1] # For compatibility with test runner.
39  _CLIENT_PATH = os.path.realpath(
40  os.path.join(os.path.dirname(os.path.abspath(__file__)), client_name))
41 
42 _HOST = 'localhost'
43 
44 # The gevent test harness cannot run the monkeypatch code for the child process,
45 # so we need to instrument it manually.
46 _GEVENT_ARG = ("--gevent",) if test_common.running_under_gevent() else ()
47 
48 
50 
51  def __init__(self):
52  self._connected_clients_lock = threading.RLock()
53  self._connected_clients_event = threading.Event()
55 
60 
61  def _on_client_connect(self):
62  with self._connected_clients_lock:
63  self._connected_clients += 1
65 
67  with self._connected_clients_lock:
68  self._connected_clients -= 1
69  if self._connected_clients == 0:
70  self._connected_clients_event.clear()
71 
73  """Blocks until a client connects to the server."""
75 
76  def _handle_unary_unary(self, request, servicer_context):
77  """Handles a unary RPC.
78 
79  Blocks until the client disconnects and then echoes.
80  """
81  stop_event = threading.Event()
82 
83  def on_rpc_end():
85  stop_event.set()
86 
87  servicer_context.add_callback(on_rpc_end)
88  self._on_client_connect()
89  stop_event.wait()
90  return request
91 
92  def _handle_unary_stream(self, request, servicer_context):
93  """Handles a server streaming RPC.
94 
95  Blocks until the client disconnects and then echoes.
96  """
97  stop_event = threading.Event()
98 
99  def on_rpc_end():
100  self._on_client_disconnect()
101  stop_event.set()
102 
103  servicer_context.add_callback(on_rpc_end)
104  self._on_client_connect()
105  stop_event.wait()
106  yield request
107 
108  def service(self, handler_call_details):
109  if handler_call_details.method == _signal_client.UNARY_UNARY:
110  return self._unary_unary_handler
111  elif handler_call_details.method == _signal_client.UNARY_STREAM:
112  return self._unary_stream_handler
113  else:
114  return None
115 
116 
117 def _read_stream(stream):
118  stream.seek(0)
119  return stream.read()
120 
121 
122 def _start_client(args, stdout, stderr):
123  invocation = None
124  if sys.executable is not None:
125  invocation = (sys.executable, _CLIENT_PATH) + tuple(args)
126  else:
127  invocation = (_CLIENT_PATH,) + tuple(args)
128  return subprocess.Popen(invocation, stdout=stdout, stderr=stderr)
129 
130 
131 class SignalHandlingTest(unittest.TestCase):
132 
133  def setUp(self):
134  self._server = test_common.test_server()
135  self._port = self._server.add_insecure_port('{}:0'.format(_HOST))
137  self._server.add_generic_rpc_handlers((self._handler,))
138  self._server.start()
139 
140  def tearDown(self):
141  self._server.stop(None)
142 
143  @unittest.skipIf(os.name == 'nt', 'SIGINT not supported on windows')
144  def testUnary(self):
145  """Tests that the server unary code path does not stall signal handlers."""
146  server_target = '{}:{}'.format(_HOST, self._port)
147  with tempfile.TemporaryFile(mode='r') as client_stdout:
148  with tempfile.TemporaryFile(mode='r') as client_stderr:
149  client = _start_client((server_target, 'unary') + _GEVENT_ARG,
150  client_stdout, client_stderr)
151  self._handler.await_connected_client()
152  client.send_signal(signal.SIGINT)
153  self.assertFalse(client.wait(), msg=_read_stream(client_stderr))
154  client_stdout.seek(0)
155  self.assertIn(_signal_client.SIGTERM_MESSAGE,
156  client_stdout.read())
157 
158  @unittest.skipIf(os.name == 'nt', 'SIGINT not supported on windows')
159  def testStreaming(self):
160  """Tests that the server streaming code path does not stall signal handlers."""
161  server_target = '{}:{}'.format(_HOST, self._port)
162  with tempfile.TemporaryFile(mode='r') as client_stdout:
163  with tempfile.TemporaryFile(mode='r') as client_stderr:
164  client = _start_client(
165  (server_target, 'streaming') + _GEVENT_ARG, client_stdout,
166  client_stderr)
167  self._handler.await_connected_client()
168  client.send_signal(signal.SIGINT)
169  self.assertFalse(client.wait(), msg=_read_stream(client_stderr))
170  client_stdout.seek(0)
171  self.assertIn(_signal_client.SIGTERM_MESSAGE,
172  client_stdout.read())
173 
174  @unittest.skipIf(os.name == 'nt', 'SIGINT not supported on windows')
176  server_target = '{}:{}'.format(_HOST, self._port)
177  with tempfile.TemporaryFile(mode='r') as client_stdout:
178  with tempfile.TemporaryFile(mode='r') as client_stderr:
179  client = _start_client(
180  ('--exception', server_target, 'unary') + _GEVENT_ARG,
181  client_stdout, client_stderr)
182  self._handler.await_connected_client()
183  client.send_signal(signal.SIGINT)
184  client.wait()
185  self.assertEqual(0, client.returncode)
186 
187  @unittest.skipIf(os.name == 'nt', 'SIGINT not supported on windows')
189  server_target = '{}:{}'.format(_HOST, self._port)
190  with tempfile.TemporaryFile(mode='r') as client_stdout:
191  with tempfile.TemporaryFile(mode='r') as client_stderr:
192  client = _start_client(
193  ('--exception', server_target, 'streaming') + _GEVENT_ARG,
194  client_stdout, client_stderr)
195  self._handler.await_connected_client()
196  client.send_signal(signal.SIGINT)
197  client.wait()
198  print(_read_stream(client_stderr))
199  self.assertEqual(0, client.returncode)
200 
201 
202 if __name__ == '__main__':
203  logging.basicConfig()
204  unittest.main(verbosity=2)
grpc.unary_unary_rpc_method_handler
def unary_unary_rpc_method_handler(behavior, request_deserializer=None, response_serializer=None)
Definition: src/python/grpcio/grpc/__init__.py:1510
tests.unit._signal_handling_test._GenericHandler._connected_clients
_connected_clients
Definition: _signal_handling_test.py:54
http2_test_server.format
format
Definition: http2_test_server.py:118
tests.unit._signal_handling_test.SignalHandlingTest._port
_port
Definition: _signal_handling_test.py:135
tests.unit._signal_handling_test.SignalHandlingTest.testStreaming
def testStreaming(self)
Definition: _signal_handling_test.py:159
grpc.unary_stream_rpc_method_handler
def unary_stream_rpc_method_handler(behavior, request_deserializer=None, response_serializer=None)
Definition: src/python/grpcio/grpc/__init__.py:1530
tests.unit._signal_handling_test.SignalHandlingTest.setUp
def setUp(self)
Definition: _signal_handling_test.py:133
tests.unit._signal_handling_test._GenericHandler
Definition: _signal_handling_test.py:49
tests.unit._signal_handling_test.SignalHandlingTest.testUnary
def testUnary(self)
Definition: _signal_handling_test.py:144
tests.unit._signal_handling_test._GenericHandler._handle_unary_stream
def _handle_unary_stream(self, request, servicer_context)
Definition: _signal_handling_test.py:92
tests.unit._signal_handling_test.SignalHandlingTest.testUnaryWithException
def testUnaryWithException(self)
Definition: _signal_handling_test.py:175
start
static uint64_t start
Definition: benchmark-pound.c:74
tests.unit._signal_handling_test._start_client
def _start_client(args, stdout, stderr)
Definition: _signal_handling_test.py:122
tests.unit._signal_handling_test.SignalHandlingTest._handler
_handler
Definition: _signal_handling_test.py:136
tests.unit._signal_handling_test.SignalHandlingTest
Definition: _signal_handling_test.py:131
grpc.GenericRpcHandler
Definition: src/python/grpcio/grpc/__init__.py:1333
tests.unit._signal_handling_test._GenericHandler._handle_unary_unary
def _handle_unary_unary(self, request, servicer_context)
Definition: _signal_handling_test.py:76
tests.unit._signal_handling_test._read_stream
def _read_stream(stream)
Definition: _signal_handling_test.py:117
tests.unit._signal_handling_test._GenericHandler._unary_unary_handler
_unary_unary_handler
Definition: _signal_handling_test.py:56
tests.unit._signal_handling_test._GenericHandler.await_connected_client
def await_connected_client(self)
Definition: _signal_handling_test.py:72
tests.unit._signal_handling_test._GenericHandler._connected_clients_lock
_connected_clients_lock
Definition: _signal_handling_test.py:52
tests.unit._signal_handling_test.SignalHandlingTest.testStreamingHandlerWithException
def testStreamingHandlerWithException(self)
Definition: _signal_handling_test.py:188
tests.unit
Definition: src/python/grpcio_tests/tests/unit/__init__.py:1
tests.unit._signal_handling_test._GenericHandler._on_client_connect
def _on_client_connect(self)
Definition: _signal_handling_test.py:61
tests.unit._signal_handling_test.SignalHandlingTest._server
_server
Definition: _signal_handling_test.py:134
tests.unit._signal_handling_test.SignalHandlingTest.tearDown
def tearDown(self)
Definition: _signal_handling_test.py:140
cpp.gmock_class.set
set
Definition: bloaty/third_party/googletest/googlemock/scripts/generator/cpp/gmock_class.py:44
wait
static void wait(notification *n)
Definition: alts_tsi_handshaker_test.cc:114
tests.unit._signal_handling_test._GenericHandler._connected_clients_event
_connected_clients_event
Definition: _signal_handling_test.py:53
stop
static const char stop[]
Definition: benchmark-async-pummel.c:35
tests.unit._signal_handling_test._GenericHandler._unary_stream_handler
_unary_stream_handler
Definition: _signal_handling_test.py:58
tests.unit._signal_handling_test._GenericHandler.__init__
def __init__(self)
Definition: _signal_handling_test.py:51
len
int len
Definition: abseil-cpp/absl/base/internal/low_level_alloc_test.cc:46
split
static void split(const char *s, char ***ss, size_t *ns)
Definition: debug/trace.cc:111
tests.unit._signal_handling_test._GenericHandler._on_client_disconnect
def _on_client_disconnect(self)
Definition: _signal_handling_test.py:66
tests.unit._signal_handling_test._GenericHandler.service
def service(self, handler_call_details)
Definition: _signal_handling_test.py:108


grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:58:27