grpc.py
Go to the documentation of this file.
1 # Copyright 2020 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 import logging
15 import re
16 from typing import Any, Dict, Optional
17 
18 from google.protobuf import json_format
20 import grpc
21 
22 logger = logging.getLogger(__name__)
23 
24 # Type aliases
26 
27 
29  channel: grpc.Channel
30  DEFAULT_RPC_DEADLINE_SEC = 90
31 
32  def __init__(self, channel: grpc.Channel, stub_class: Any):
33  self.channel = channel
34  self.stub = stub_class(channel)
35  # This is purely cosmetic to make RPC logs look like method calls.
36  self.log_service_name = re.sub('Stub$', '',
37  self.stub.__class__.__name__)
38 
40  self,
41  *,
42  rpc: str,
43  req: Message,
44  deadline_sec: Optional[int] = DEFAULT_RPC_DEADLINE_SEC,
45  log_level: Optional[int] = logging.DEBUG) -> Message:
46  if deadline_sec is None:
47  deadline_sec = self.DEFAULT_RPC_DEADLINE_SEC
48 
49  call_kwargs = dict(wait_for_ready=True, timeout=deadline_sec)
50  self._log_rpc_request(rpc, req, call_kwargs, log_level)
51 
52  # Call RPC, e.g. RpcStub(channel).RpcMethod(req, ...options)
53  rpc_callable: grpc.UnaryUnaryMultiCallable = getattr(self.stub, rpc)
54  return rpc_callable(req, **call_kwargs)
55 
56  def _log_rpc_request(self, rpc, req, call_kwargs, log_level=logging.DEBUG):
57  logger.log(logging.DEBUG if log_level is None else log_level,
58  'RPC %s.%s(request=%s(%r), %s)', self.log_service_name, rpc,
59  req.__class__.__name__, json_format.MessageToDict(req),
60  ', '.join({f'{k}={v}' for k, v in call_kwargs.items()}))
61 
62 
63 class GrpcApp:
64  channels: Dict[int, grpc.Channel]
65 
66  class NotFound(Exception):
67  """Requested resource not found"""
68 
69  def __init__(self, message):
70  self.message = message
71  super().__init__(message)
72 
73  def __init__(self, rpc_host):
74  self.rpc_host = rpc_host
75  # Cache gRPC channels per port
76  self.channels = dict()
77 
78  def _make_channel(self, port) -> grpc.Channel:
79  if port not in self.channels:
80  target = f'{self.rpc_host}:{port}'
81  self.channels[port] = grpc.insecure_channel(target)
82  return self.channels[port]
83 
84  def close(self):
85  # Close all channels
86  for channel in self.channels.values():
87  channel.close()
88 
89  def __enter__(self):
90  return self
91 
92  def __exit__(self, exc_type, exc_val, exc_tb):
93  self.close()
94  return False
95 
96  def __del__(self):
97  self.close()
grpc.insecure_channel
def insecure_channel(target, options=None, compression=None)
Definition: src/python/grpcio/grpc/__init__.py:1962
framework.rpc.grpc.GrpcApp.__init__
def __init__(self, rpc_host)
Definition: grpc.py:73
framework.rpc.grpc.GrpcClientHelper
Definition: grpc.py:28
google::protobuf
Definition: bloaty/third_party/protobuf/benchmarks/util/data_proto2_to_proto3_util.h:12
framework.rpc.grpc.GrpcClientHelper.__init__
def __init__(self, grpc.Channel channel, Any stub_class)
Definition: grpc.py:32
grpc::Channel
Channels represent a connection to an endpoint. Created by CreateChannel.
Definition: include/grpcpp/channel.h:54
framework.rpc.grpc.GrpcApp.channels
channels
Definition: grpc.py:76
framework.rpc.grpc.GrpcClientHelper.channel
channel
Definition: grpc.py:33
framework.rpc.grpc.GrpcApp
Definition: grpc.py:63
framework.rpc.grpc.GrpcClientHelper.call_unary_with_deadline
Message call_unary_with_deadline(self, *str rpc, Message req, Optional[int] deadline_sec=DEFAULT_RPC_DEADLINE_SEC, Optional[int] log_level=logging.DEBUG)
Definition: grpc.py:39
framework.rpc.grpc.GrpcApp._make_channel
grpc.Channel _make_channel(self, port)
Definition: grpc.py:78
framework.rpc.grpc.GrpcApp.NotFound
Definition: grpc.py:66
framework.rpc.grpc.GrpcApp.close
def close(self)
Definition: grpc.py:84
grpc.UnaryUnaryMultiCallable
Definition: src/python/grpcio/grpc/__init__.py:663
framework.rpc.grpc.GrpcClientHelper._log_rpc_request
def _log_rpc_request(self, rpc, req, call_kwargs, log_level=logging.DEBUG)
Definition: grpc.py:56
framework.rpc.grpc.GrpcClientHelper.log_service_name
log_service_name
Definition: grpc.py:36
google::protobuf.message
Definition: bloaty/third_party/protobuf/python/google/protobuf/message.py:1
values
std::array< int64_t, Size > values
Definition: abseil-cpp/absl/container/btree_benchmark.cc:608
framework.rpc.grpc.GrpcClientHelper.stub
stub
Definition: grpc.py:34
framework.rpc.grpc.GrpcApp.NotFound.__init__
def __init__(self, message)
Definition: grpc.py:69
framework.rpc.grpc.GrpcClientHelper.DEFAULT_RPC_DEADLINE_SEC
int DEFAULT_RPC_DEADLINE_SEC
Definition: grpc.py:30
framework.rpc.grpc.GrpcApp.rpc_host
rpc_host
Definition: grpc.py:74
framework.rpc.grpc.GrpcApp.NotFound.message
message
Definition: grpc.py:70
framework.rpc.grpc.GrpcApp.__enter__
def __enter__(self)
Definition: grpc.py:89
google::protobuf.message.Message
Definition: bloaty/third_party/protobuf/python/google/protobuf/message.py:44
framework.rpc.grpc.GrpcApp.__del__
def __del__(self)
Definition: grpc.py:96
framework.rpc.grpc.GrpcApp.__exit__
def __exit__(self, exc_type, exc_val, exc_tb)
Definition: grpc.py:92


grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:59:47