15 This contains helpers for gRPC services defined in
16 https://github.com/grpc/grpc-proto/blob/master/grpc/channelz/v1/channelz.proto
20 from typing
import Iterator, Optional
28 logger = logging.getLogger(__name__)
32 Channel = channelz_pb2.Channel
33 ChannelConnectivityState = channelz_pb2.ChannelConnectivityState
34 ChannelState = ChannelConnectivityState.State
35 _GetTopChannelsRequest = channelz_pb2.GetTopChannelsRequest
36 _GetTopChannelsResponse = channelz_pb2.GetTopChannelsResponse
38 Subchannel = channelz_pb2.Subchannel
39 _GetSubchannelRequest = channelz_pb2.GetSubchannelRequest
40 _GetSubchannelResponse = channelz_pb2.GetSubchannelResponse
42 Server = channelz_pb2.Server
43 _GetServersRequest = channelz_pb2.GetServersRequest
44 _GetServersResponse = channelz_pb2.GetServersResponse
46 Socket = channelz_pb2.Socket
47 SocketRef = channelz_pb2.SocketRef
48 _GetSocketRequest = channelz_pb2.GetSocketRequest
49 _GetSocketResponse = channelz_pb2.GetSocketResponse
50 Address = channelz_pb2.Address
51 Security = channelz_pb2.Security
53 _GetServerSocketsRequest = channelz_pb2.GetServerSocketsRequest
54 _GetServerSocketsResponse = channelz_pb2.GetServerSocketsResponse
58 stub: channelz_pb2_grpc.ChannelzStub
61 super().
__init__(channel, channelz_pb2_grpc.ChannelzStub)
65 return address.WhichOneof(
'address') ==
'tcpip_address'
68 def is_ipv4(tcpip_address: Address.TcpIpAddress):
71 return len(tcpip_address.ip_address) == 4
76 tcpip_address: Address.TcpIpAddress = address.tcpip_address
78 ip = ipaddress.IPv4Address(tcpip_address.ip_address)
80 ip = ipaddress.IPv6Address(tcpip_address.ip_address)
81 return f
'{ip}:{tcpip_address.port}'
83 raise NotImplementedError(
'Only tcpip_address implemented')
87 return (f
'local={cls.sock_address_to_str(socket.local)}, '
88 f
'remote={cls.sock_address_to_str(socket.remote)}')
92 client_socket: Socket) -> Socket:
93 for server_socket
in server_sockets:
94 if server_socket.remote == client_socket.local:
99 **kwargs) -> Iterator[Channel]:
101 if channel.data.target == target)
104 **kwargs) -> Optional[Server]:
106 listen_socket_ref: SocketRef
107 for listen_socket_ref
in server.listen_socket:
108 listen_socket = self.
get_socket(listen_socket_ref.socket_id,
110 listen_address: Address = listen_socket.local
112 listen_address.tcpip_address.port == port):
118 Iterate over all pages of all root channels.
120 Root channels are those which application has directly created.
121 This does not include subchannels nor non-top level channels.
124 response: Optional[_GetTopChannelsResponse] =
None
125 while start < 0
or not response.end:
130 rpc=
'GetTopChannels',
133 for channel
in response.channel:
134 start =
max(start, channel.ref.channel_id)
138 """Iterate over all pages of all servers that exist in the process."""
140 response: Optional[_GetServersResponse] =
None
141 while start < 0
or not response.end:
149 for server
in response.server:
150 start =
max(start, server.ref.server_id)
154 """List all server sockets that exist in server process.
156 Iterating over the results will resolve additional pages automatically.
159 response: Optional[_GetServerSocketsResponse] =
None
160 while start < 0
or not response.end:
165 rpc=
'GetServerSockets',
167 start_socket_id=start),
169 socket_ref: SocketRef
170 for socket_ref
in response.socket_ref:
171 start =
max(start, socket_ref.socket_id)
173 yield self.
get_socket(socket_ref.socket_id, **kwargs)
176 **kwargs) -> Iterator[Socket]:
177 """List all sockets of all subchannels of a given channel."""
182 **kwargs) -> Iterator[Subchannel]:
183 """List all subchannels of a given channel."""
184 for subchannel_ref
in channel.subchannel_ref:
188 **kwargs) -> Iterator[Socket]:
189 """List all sockets of a given subchannel."""
190 for socket_ref
in subchannel.socket_ref:
191 yield self.
get_socket(socket_ref.socket_id, **kwargs)
194 """Return a single Subchannel, otherwise raises RpcError."""
199 return response.subchannel
202 """Return a single Socket, otherwise raises RpcError."""
207 return response.socket