16 from typing
import Any, Dict, Optional
22 logger = logging.getLogger(__name__)
30 DEFAULT_RPC_DEADLINE_SEC = 90
32 def __init__(self, channel: grpc.Channel, stub_class: Any):
34 self.
stub = stub_class(channel)
37 self.
stub.__class__.__name__)
44 deadline_sec: Optional[int] = DEFAULT_RPC_DEADLINE_SEC,
45 log_level: Optional[int] = logging.DEBUG) -> Message:
46 if deadline_sec
is None:
49 call_kwargs = dict(wait_for_ready=
True, timeout=deadline_sec)
54 return rpc_callable(req, **call_kwargs)
57 logger.log(logging.DEBUG
if log_level
is None else log_level,
59 req.__class__.__name__, json_format.MessageToDict(req),
60 ', '.join({f
'{k}={v}' for k, v
in call_kwargs.items()}))
67 """Requested resource not found"""
80 target = f
'{self.rpc_host}:{port}'