unit/_server_test.py
Go to the documentation of this file.
1 # Copyright 2018 The gRPC Authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 
15 from concurrent import futures
16 import logging
17 import unittest
18 
19 import grpc
20 
21 from tests.unit import resources
22 
23 
25 
26  def service(self, handler_call_details):
27  return None
28 
29 
30 class ServerTest(unittest.TestCase):
31 
33  with self.assertRaises(AttributeError) as exception_context:
34  grpc.server(futures.ThreadPoolExecutor(max_workers=5),
35  handlers=[
37  object(),
38  ])
39  self.assertIn('grpc.GenericRpcHandler',
40  str(exception_context.exception))
41 
43  server = grpc.server(futures.ThreadPoolExecutor(max_workers=5))
44  with self.assertRaises(AttributeError) as exception_context:
45  server.add_generic_rpc_handlers([
47  object(),
48  ])
49  self.assertIn('grpc.GenericRpcHandler',
50  str(exception_context.exception))
51 
53  server = grpc.server(None, options=(('grpc.so_reuseport', 0),))
54  port = server.add_insecure_port('localhost:0')
55  bind_address = "localhost:%d" % port
56 
57  with self.assertRaises(RuntimeError):
58  server.add_insecure_port(bind_address)
59 
60  server_credentials = grpc.ssl_server_credentials([
61  (resources.private_key(), resources.certificate_chain())
62  ])
63  with self.assertRaises(RuntimeError):
64  server.add_secure_port(bind_address, server_credentials)
65 
66 
67 if __name__ == '__main__':
68  logging.basicConfig()
69  unittest.main(verbosity=2)
xds_interop_client.str
str
Definition: xds_interop_client.py:487
tests.unit._server_test._ActualGenericRpcHandler
Definition: unit/_server_test.py:24
tests.unit._server_test.ServerTest
Definition: unit/_server_test.py:30
tests.unit._server_test._ActualGenericRpcHandler.service
def service(self, handler_call_details)
Definition: unit/_server_test.py:26
grpc.ssl_server_credentials
def ssl_server_credentials(private_key_certificate_chain_pairs, root_certificates=None, require_client_auth=False)
Definition: src/python/grpcio/grpc/__init__.py:1709
grpc.GenericRpcHandler
Definition: src/python/grpcio/grpc/__init__.py:1333
grpc.server
def server(thread_pool, handlers=None, interceptors=None, options=None, maximum_concurrent_rpcs=None, compression=None, xds=False)
Definition: src/python/grpcio/grpc/__init__.py:2034
tests.unit._server_test.ServerTest.test_not_a_generic_rpc_handler_at_construction
def test_not_a_generic_rpc_handler_at_construction(self)
Definition: unit/_server_test.py:32
tests.unit
Definition: src/python/grpcio_tests/tests/unit/__init__.py:1
tests.unit._server_test.ServerTest.test_failed_port_binding_exception
def test_failed_port_binding_exception(self)
Definition: unit/_server_test.py:52
tests.unit._server_test.ServerTest.test_not_a_generic_rpc_handler_after_construction
def test_not_a_generic_rpc_handler_after_construction(self)
Definition: unit/_server_test.py:42


grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:58:27