_reconnect_test.py
Go to the documentation of this file.
1 # Copyright 2017 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Tests that a channel will reconnect if a connection is dropped"""
15 
16 import logging
17 import socket
18 import time
19 import unittest
20 
21 import grpc
22 from grpc.framework.foundation import logging_pool
23 
24 from tests.unit.framework.common import bound_socket
25 from tests.unit.framework.common import test_constants
26 
27 _REQUEST = b'\x00\x00\x00'
28 _RESPONSE = b'\x00\x00\x01'
29 
30 _UNARY_UNARY = '/test/UnaryUnary'
31 
32 
33 def _handle_unary_unary(unused_request, unused_servicer_context):
34  return _RESPONSE
35 
36 
37 class ReconnectTest(unittest.TestCase):
38 
39  def test_reconnect(self):
40  server_pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY)
41  handler = grpc.method_handlers_generic_handler('test', {
42  'UnaryUnary':
43  grpc.unary_unary_rpc_method_handler(_handle_unary_unary)
44  })
45  options = (('grpc.so_reuseport', 1),)
46  with bound_socket() as (host, port):
47  addr = '{}:{}'.format(host, port)
48  server = grpc.server(server_pool, (handler,), options=options)
49  server.add_insecure_port(addr)
50  server.start()
51  channel = grpc.insecure_channel(addr)
52  multi_callable = channel.unary_unary(_UNARY_UNARY)
53  self.assertEqual(_RESPONSE, multi_callable(_REQUEST))
54  server.stop(None)
55  # By default, the channel connectivity is checked every 5s
56  # GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS can be set to change
57  # this.
58  time.sleep(5.1)
59  server = grpc.server(server_pool, (handler,), options=options)
60  server.add_insecure_port(addr)
61  server.start()
62  self.assertEqual(_RESPONSE, multi_callable(_REQUEST))
63  server.stop(None)
64  channel.close()
65 
66 
67 if __name__ == '__main__':
68  logging.basicConfig()
69  unittest.main(verbosity=2)
grpc.unary_unary_rpc_method_handler
def unary_unary_rpc_method_handler(behavior, request_deserializer=None, response_serializer=None)
Definition: src/python/grpcio/grpc/__init__.py:1510
grpc.insecure_channel
def insecure_channel(target, options=None, compression=None)
Definition: src/python/grpcio/grpc/__init__.py:1962
http2_test_server.format
format
Definition: http2_test_server.py:118
grpc.framework.foundation
Definition: src/python/grpcio/grpc/framework/foundation/__init__.py:1
tests.unit._reconnect_test.ReconnectTest
Definition: _reconnect_test.py:37
tests.unit._exit_scenarios.multi_callable
multi_callable
Definition: _exit_scenarios.py:216
tests.unit._reconnect_test._handle_unary_unary
def _handle_unary_unary(unused_request, unused_servicer_context)
Definition: _reconnect_test.py:33
grpc.server
def server(thread_pool, handlers=None, interceptors=None, options=None, maximum_concurrent_rpcs=None, compression=None, xds=False)
Definition: src/python/grpcio/grpc/__init__.py:2034
tests.unit._reconnect_test.ReconnectTest.test_reconnect
def test_reconnect(self)
Definition: _reconnect_test.py:39
grpc.method_handlers_generic_handler
def method_handlers_generic_handler(service, method_handlers)
Definition: src/python/grpcio/grpc/__init__.py:1590
tests.unit.framework.common.bound_socket
def bound_socket(bind_address='localhost', port=0, listen=True, sock_options=_DEFAULT_SOCK_OPTIONS)
Definition: src/python/grpcio_tests/tests/unit/framework/common/__init__.py:76
tests.unit.framework.common
Definition: src/python/grpcio_tests/tests/unit/framework/common/__init__.py:1


grpc
Author(s):
autogenerated on Fri May 16 2025 02:57:38