_debug_example_test.py
Go to the documentation of this file.
1 # Copyright 2019 The gRPC Authors
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Test for gRPC Python debug example."""
15 
16 import asyncio
17 import logging
18 import unittest
19 
20 from examples.python.debug import asyncio_debug_server
21 from examples.python.debug import asyncio_get_stats
22 from examples.python.debug import asyncio_send_message
23 from examples.python.debug import debug_server
24 from examples.python.debug import get_stats
25 from examples.python.debug import send_message
26 
27 _LOGGER = logging.getLogger(__name__)
28 _LOGGER.setLevel(logging.INFO)
29 
30 _FAILURE_RATE = 0.5
31 _NUMBER_OF_MESSAGES = 100
32 
33 _ADDR_TEMPLATE = 'localhost:%d'
34 
35 
36 class DebugExampleTest(unittest.TestCase):
37 
39  server = debug_server.create_server(addr='[::]:0',
40  failure_rate=_FAILURE_RATE)
41  port = server.add_insecure_port('[::]:0')
42  server.start()
43  address = _ADDR_TEMPLATE % port
44 
45  send_message.run(addr=address, n=_NUMBER_OF_MESSAGES)
46  get_stats.run(addr=address)
47  server.stop(None)
48  # No unhandled exception raised, test passed!
49 
51 
52  async def body():
54  addr='[::]:0', failure_rate=_FAILURE_RATE)
55  port = server.add_insecure_port('[::]:0')
56  await server.start()
57  address = _ADDR_TEMPLATE % port
58 
59  await asyncio_send_message.run(addr=address, n=_NUMBER_OF_MESSAGES)
60  await asyncio_get_stats.run(addr=address)
61  await server.stop(None)
62  # No unhandled exception raised, test passed!
63 
64  asyncio.get_event_loop().run_until_complete(body())
65 
66 
67 if __name__ == '__main__':
68  logging.basicConfig(level=logging.DEBUG)
69  unittest.main(verbosity=2)
asyncio_debug_server.create_server
grpc.aio.Server create_server(str addr, float failure_rate)
Definition: asyncio_debug_server.py:49
_debug_example_test.DebugExampleTest.test_asyncio_channelz_example
def test_asyncio_channelz_example(self)
Definition: _debug_example_test.py:50
_debug_example_test.DebugExampleTest.test_channelz_example
def test_channelz_example(self)
Definition: _debug_example_test.py:38
asyncio_get_stats.run
None run(str addr)
Definition: asyncio_get_stats.py:25
asyncio_send_message.run
None run(str addr, int n)
Definition: asyncio_send_message.py:36
get_stats.run
def run(addr)
Definition: get_stats.py:28
_debug_example_test.DebugExampleTest
Definition: _debug_example_test.py:36
send_message.run
def run(addr, n)
Definition: send_message.py:38
debug_server.create_server
def create_server(addr, failure_rate)
Definition: debug_server.py:51


grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:58:27