tests_gevent/unit/close_channel_test.py
Go to the documentation of this file.
1 # Copyright 2021 The gRPC Authors
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 
15 import sys
16 import unittest
17 
18 import gevent
19 from gevent.pool import Group
20 import grpc
21 
22 from src.proto.grpc.testing import messages_pb2
23 from src.proto.grpc.testing import test_pb2_grpc
24 from tests_gevent.unit._test_server import start_test_server
25 
26 _UNARY_CALL_METHOD_WITH_SLEEP = '/grpc.testing.TestService/UnaryCallWithSleep'
27 
28 
29 class CloseChannelTest(unittest.TestCase):
30 
31  def setUp(self):
32  self._server_target, self._server = start_test_server()
33  self._channel = grpc.insecure_channel(self._server_target)
34  self._unhandled_exception = False
35  sys.excepthook = self._global_exception_handler
36 
37  def tearDown(self):
38  self._channel.close()
39  self._server.stop(None)
40 
42  stub = test_pb2_grpc.TestServiceStub(self._channel)
43  _, response = stub.UnaryCall.with_call(messages_pb2.SimpleRequest())
44 
45  self._channel.close()
46 
47  self.assertEqual(grpc.StatusCode.OK, response.code())
48 
50  group = Group()
51  stub = test_pb2_grpc.TestServiceStub(self._channel)
52  greenlet = group.spawn(self._run_client, stub.UnaryCall)
53  # release loop so that greenlet can take control
54  gevent.sleep()
55  self._channel.close()
56  group.killone(greenlet)
57  self.assertFalse(self._unhandled_exception, "Unhandled GreenletExit")
58  try:
59  greenlet.get()
60  except Exception as e: # pylint: disable=broad-except
61  self.fail(f"Unexpected exception in greenlet: {e}")
62 
64  group = Group()
65  UnaryCallWithSleep = self._channel.unary_unary(
66  _UNARY_CALL_METHOD_WITH_SLEEP,
67  request_serializer=messages_pb2.SimpleRequest.SerializeToString,
68  response_deserializer=messages_pb2.SimpleResponse.FromString,
69  )
70  greenlet = group.spawn(self._run_client, UnaryCallWithSleep)
71  # release loop so that greenlet can take control
72  gevent.sleep()
73  group.killone(greenlet)
74  self.assertFalse(self._unhandled_exception, "Unhandled GreenletExit")
75 
77  group = Group()
78  UnaryCallWithSleep = self._channel.unary_unary(
79  _UNARY_CALL_METHOD_WITH_SLEEP,
80  request_serializer=messages_pb2.SimpleRequest.SerializeToString,
81  response_deserializer=messages_pb2.SimpleResponse.FromString,
82  )
83  greenlet = group.spawn(self._run_client, UnaryCallWithSleep)
84  # release loop so that greenlet can take control
85  gevent.sleep()
86  group.killone(greenlet, exception=Exception)
87  self.assertFalse(self._unhandled_exception, "Unhandled exception")
88  self.assertRaises(Exception, greenlet.get)
89 
90  def _run_client(self, call):
91  try:
92  call.with_call(messages_pb2.SimpleRequest())
93  except grpc.RpcError as e:
94  if e.code() != grpc.StatusCode.CANCELLED:
95  raise
96 
97  def _global_exception_handler(self, exctype, value, tb):
98  if exctype == gevent.GreenletExit:
99  self._unhandled_exception = True
100  return
101  sys.__excepthook__(exctype, value, tb)
102 
103 
104 if __name__ == '__main__':
105  unittest.main(verbosity=2)
tests_gevent.unit.close_channel_test.CloseChannelTest
Definition: tests_gevent/unit/close_channel_test.py:29
tests_gevent.unit.close_channel_test.CloseChannelTest.test_kill_greenlet_with_generic_exception
def test_kill_greenlet_with_generic_exception(self)
Definition: tests_gevent/unit/close_channel_test.py:76
messages_pb2.SimpleRequest
SimpleRequest
Definition: messages_pb2.py:597
grpc.insecure_channel
def insecure_channel(target, options=None, compression=None)
Definition: src/python/grpcio/grpc/__init__.py:1962
tests_gevent.unit.close_channel_test.CloseChannelTest._channel
_channel
Definition: tests_gevent/unit/close_channel_test.py:33
tests_gevent.unit.close_channel_test.CloseChannelTest.test_ungraceful_close_in_greenlet
def test_ungraceful_close_in_greenlet(self)
Definition: tests_gevent/unit/close_channel_test.py:63
tests_gevent.unit.close_channel_test.CloseChannelTest.test_graceful_close
def test_graceful_close(self)
Definition: tests_gevent/unit/close_channel_test.py:41
Group
TypeAndValue Group(UnknownFields nested)
Definition: upb/upb/util/compare_test.cc:101
tests_aio.unit._test_server.start_test_server
def start_test_server(port=0, secure=False, server_credentials=None, interceptors=None)
Definition: tests_aio/unit/_test_server.py:128
tests_gevent.unit.close_channel_test.CloseChannelTest._unhandled_exception
_unhandled_exception
Definition: tests_gevent/unit/close_channel_test.py:34
grpc.RpcError
Definition: src/python/grpcio/grpc/__init__.py:302
tests_gevent.unit.close_channel_test.CloseChannelTest._run_client
def _run_client(self, call)
Definition: tests_gevent/unit/close_channel_test.py:90
close
#define close
Definition: test-fs.c:48
tests_gevent.unit.close_channel_test.CloseChannelTest._global_exception_handler
def _global_exception_handler(self, exctype, value, tb)
Definition: tests_gevent/unit/close_channel_test.py:97
grpc._simple_stubs.unary_unary
ResponseType unary_unary(RequestType request, str target, str method, Optional[Callable[[Any], bytes]] request_serializer=None, Optional[Callable[[bytes], Any]] response_deserializer=None, Sequence[Tuple[AnyStr, AnyStr]] options=(), Optional[grpc.ChannelCredentials] channel_credentials=None, bool insecure=False, Optional[grpc.CallCredentials] call_credentials=None, Optional[grpc.Compression] compression=None, Optional[bool] wait_for_ready=None, Optional[float] timeout=_DEFAULT_TIMEOUT, Optional[Sequence[Tuple[str, Union[str, bytes]]]] metadata=None)
Definition: _simple_stubs.py:169
tests_gevent.unit.close_channel_test.CloseChannelTest.setUp
def setUp(self)
Definition: tests_gevent/unit/close_channel_test.py:31
tests_gevent.unit.close_channel_test.CloseChannelTest.tearDown
def tearDown(self)
Definition: tests_gevent/unit/close_channel_test.py:37
stop
static const char stop[]
Definition: benchmark-async-pummel.c:35
tests_gevent.unit._test_server
Definition: tests_gevent/unit/_test_server.py:1
tests_gevent.unit.close_channel_test.CloseChannelTest.test_graceful_close_in_greenlet
def test_graceful_close_in_greenlet(self)
Definition: tests_gevent/unit/close_channel_test.py:49
tests_gevent.unit.close_channel_test.CloseChannelTest._server
_server
Definition: tests_gevent/unit/close_channel_test.py:32


grpc
Author(s):
autogenerated on Fri May 16 2025 02:57:56