wait_for_ready_example.py
Go to the documentation of this file.
1 # Copyright 2019 The gRPC Authors
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """The Python example of utilizing wait-for-ready flag."""
15 
16 from concurrent import futures
17 from contextlib import contextmanager
18 import logging
19 import socket
20 import threading
21 
22 import grpc
23 
24 helloworld_pb2, helloworld_pb2_grpc = grpc.protos_and_services(
25  "helloworld.proto")
26 
27 _LOGGER = logging.getLogger(__name__)
28 _LOGGER.setLevel(logging.INFO)
29 
30 
31 @contextmanager
33  if socket.has_ipv6:
34  tcp_socket = socket.socket(socket.AF_INET6)
35  else:
36  tcp_socket = socket.socket(socket.AF_INET)
37  tcp_socket.bind(('', 0))
38  address_tuple = tcp_socket.getsockname()
39  yield "localhost:%s" % (address_tuple[1])
40  tcp_socket.close()
41 
42 
44 
45  def SayHello(self, request, unused_context):
46  return helloworld_pb2.HelloReply(message='Hello, %s!' % request.name)
47 
48 
49 def create_server(server_address):
50  server = grpc.server(futures.ThreadPoolExecutor())
52  bound_port = server.add_insecure_port(server_address)
53  assert bound_port == int(server_address.split(':')[-1])
54  return server
55 
56 
57 def process(stub, wait_for_ready=None):
58  try:
59  response = stub.SayHello(helloworld_pb2.HelloRequest(name='you'),
60  wait_for_ready=wait_for_ready)
61  message = response.message
62  except grpc.RpcError as rpc_error:
63  assert rpc_error.code() == grpc.StatusCode.UNAVAILABLE
64  assert not wait_for_ready
65  message = rpc_error
66  else:
67  assert wait_for_ready
68  _LOGGER.info("Wait-for-ready %s, client received: %s",
69  "enabled" if wait_for_ready else "disabled", message)
70 
71 
72 def main():
73  # Pick a random free port
74  with get_free_loopback_tcp_port() as server_address:
75 
76  # Register connectivity event to notify main thread
77  transient_failure_event = threading.Event()
78 
79  def wait_for_transient_failure(channel_connectivity):
80  if channel_connectivity == grpc.ChannelConnectivity.TRANSIENT_FAILURE:
81  transient_failure_event.set()
82 
83  # Create gRPC channel
84  channel = grpc.insecure_channel(server_address)
85  channel.subscribe(wait_for_transient_failure)
86  stub = helloworld_pb2_grpc.GreeterStub(channel)
87 
88  # Fire an RPC without wait_for_ready
89  thread_disabled_wait_for_ready = threading.Thread(target=process,
90  args=(stub, False))
91  thread_disabled_wait_for_ready.start()
92  # Fire an RPC with wait_for_ready
93  thread_enabled_wait_for_ready = threading.Thread(target=process,
94  args=(stub, True))
95  thread_enabled_wait_for_ready.start()
96 
97  # Wait for the channel entering TRANSIENT FAILURE state.
98  transient_failure_event.wait()
99  server = create_server(server_address)
100  server.start()
101 
102  # Expected to fail with StatusCode.UNAVAILABLE.
103  thread_disabled_wait_for_ready.join()
104  # Expected to success.
105  thread_enabled_wait_for_ready.join()
106 
107  server.stop(None)
108  channel.close()
109 
110 
111 if __name__ == '__main__':
112  logging.basicConfig(level=logging.INFO)
113  main()
wait_for_ready_example.main
def main()
Definition: wait_for_ready_example.py:72
helloworld_pb2_grpc.GreeterStub
Definition: helloworld/helloworld_pb2_grpc.py:7
grpc.insecure_channel
def insecure_channel(target, options=None, compression=None)
Definition: src/python/grpcio/grpc/__init__.py:1962
helloworld_pb2.HelloRequest
HelloRequest
Definition: helloworld/helloworld_pb2.py:93
grpc.RpcError
Definition: src/python/grpcio/grpc/__init__.py:302
wait_for_ready_example.process
def process(stub, wait_for_ready=None)
Definition: wait_for_ready_example.py:57
xds_interop_client.int
int
Definition: xds_interop_client.py:113
wait_for_ready_example.create_server
def create_server(server_address)
Definition: wait_for_ready_example.py:49
helloworld_pb2.HelloReply
HelloReply
Definition: helloworld/helloworld_pb2.py:100
grpc.server
def server(thread_pool, handlers=None, interceptors=None, options=None, maximum_concurrent_rpcs=None, compression=None, xds=False)
Definition: src/python/grpcio/grpc/__init__.py:2034
helloworld_pb2_grpc.GreeterServicer
Definition: helloworld/helloworld_pb2_grpc.py:24
wait_for_ready_example.Greeter.SayHello
def SayHello(self, request, unused_context)
Definition: wait_for_ready_example.py:45
main
Definition: main.py:1
helloworld_pb2_grpc.add_GreeterServicer_to_server
def add_GreeterServicer_to_server(servicer, server)
Definition: helloworld/helloworld_pb2_grpc.py:36
wait_for_ready_example.get_free_loopback_tcp_port
def get_free_loopback_tcp_port()
Definition: wait_for_ready_example.py:32
wait_for_ready_example.Greeter
Definition: wait_for_ready_example.py:43


grpc
Author(s):
autogenerated on Thu Mar 13 2025 03:01:51