14 """Reference implementation for health checking in gRPC Python."""
18 from typing
import MutableMapping
26 """An AsyncIO implementation of health checking servicer."""
27 _server_status: MutableMapping[
28 str,
'_health_pb2.HealthCheckResponse.ServingStatus']
29 _server_watchers: MutableMapping[str, asyncio.Condition]
30 _gracefully_shutting_down: bool
37 async
def Check(self, request: _health_pb2.HealthCheckRequest,
42 await context.abort(grpc.StatusCode.NOT_FOUND)
44 return _health_pb2.HealthCheckResponse(status=status)
46 async
def Watch(self, request: _health_pb2.HealthCheckRequest,
55 _health_pb2.HealthCheckResponse.SERVICE_UNKNOWN)
60 if status != last_status:
63 _health_pb2.HealthCheckResponse(status=status))
69 await condition.wait()
76 status: _health_pb2.HealthCheckResponse.ServingStatus) ->
None:
81 condition.notify_all()
87 status: _health_pb2.HealthCheckResponse.ServingStatus) ->
None:
88 """Sets the status of a service.
91 service: string, the name of the service.
92 status: HealthCheckResponse.status enum value indicating the status of
98 await self.
_set(service, status)
101 """Permanently sets the status of all services to NOT_SERVING.
103 This should be invoked when the server is entering a graceful shutdown
104 period. After this method is invoked, future attempts to set the status
105 of a service will be ignored.
112 await self.
_set(service,
113 _health_pb2.HealthCheckResponse.NOT_SERVING)