14 """Shared implementation."""
23 _LOGGER = logging.getLogger(__name__)
25 CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY = {
26 cygrpc.ConnectivityState.idle:
27 grpc.ChannelConnectivity.IDLE,
28 cygrpc.ConnectivityState.connecting:
29 grpc.ChannelConnectivity.CONNECTING,
30 cygrpc.ConnectivityState.ready:
31 grpc.ChannelConnectivity.READY,
32 cygrpc.ConnectivityState.transient_failure:
33 grpc.ChannelConnectivity.TRANSIENT_FAILURE,
34 cygrpc.ConnectivityState.shutdown:
35 grpc.ChannelConnectivity.SHUTDOWN,
38 CYGRPC_STATUS_CODE_TO_STATUS_CODE = {
39 cygrpc.StatusCode.ok: grpc.StatusCode.OK,
40 cygrpc.StatusCode.cancelled: grpc.StatusCode.CANCELLED,
41 cygrpc.StatusCode.unknown: grpc.StatusCode.UNKNOWN,
42 cygrpc.StatusCode.invalid_argument: grpc.StatusCode.INVALID_ARGUMENT,
43 cygrpc.StatusCode.deadline_exceeded: grpc.StatusCode.DEADLINE_EXCEEDED,
44 cygrpc.StatusCode.not_found: grpc.StatusCode.NOT_FOUND,
45 cygrpc.StatusCode.already_exists: grpc.StatusCode.ALREADY_EXISTS,
46 cygrpc.StatusCode.permission_denied: grpc.StatusCode.PERMISSION_DENIED,
47 cygrpc.StatusCode.unauthenticated: grpc.StatusCode.UNAUTHENTICATED,
48 cygrpc.StatusCode.resource_exhausted: grpc.StatusCode.RESOURCE_EXHAUSTED,
49 cygrpc.StatusCode.failed_precondition: grpc.StatusCode.FAILED_PRECONDITION,
50 cygrpc.StatusCode.aborted: grpc.StatusCode.ABORTED,
51 cygrpc.StatusCode.out_of_range: grpc.StatusCode.OUT_OF_RANGE,
52 cygrpc.StatusCode.unimplemented: grpc.StatusCode.UNIMPLEMENTED,
53 cygrpc.StatusCode.internal: grpc.StatusCode.INTERNAL,
54 cygrpc.StatusCode.unavailable: grpc.StatusCode.UNAVAILABLE,
55 cygrpc.StatusCode.data_loss: grpc.StatusCode.DATA_LOSS,
57 STATUS_CODE_TO_CYGRPC_STATUS_CODE = {
58 grpc_code: cygrpc_code
for cygrpc_code, grpc_code
in six.iteritems(
59 CYGRPC_STATUS_CODE_TO_STATUS_CODE)
62 MAXIMUM_WAIT_TIMEOUT = 0.1
64 _ERROR_MESSAGE_PORT_BINDING_FAILED =
'Failed to bind to address %s; set ' \
65 'GRPC_VERBOSITY=debug environment variable to see detailed error message.'
69 if isinstance(s, bytes):
72 return s.encode(
'utf8')
76 if isinstance(b, bytes):
77 return b.decode(
'utf-8',
'replace')
82 if transformer
is None:
86 return transformer(message)
88 _LOGGER.exception(exception_message)
93 return _transform(message, serializer,
'Exception serializing message!')
97 return _transform(serialized_message, deserializer,
98 'Exception deserializing message!')
102 return '/{}/{}'.
format(group, method)
106 wait_fn(timeout=timeout)
107 if spin_cb
is not None:
111 def wait(wait_fn, wait_complete_fn, timeout=None, spin_cb=None):
112 """Blocks waiting for an event without blocking the thread indefinitely.
114 See https://github.com/grpc/grpc/issues/19464 for full context. CPython's
115 `threading.Event.wait` and `threading.Condition.wait` methods, if invoked
116 without a timeout kwarg, may block the calling thread indefinitely. If the
117 call is made from the main thread, this means that signal handlers may not
118 run for an arbitrarily long period of time.
120 This wrapper calls the supplied wait function with an arbitrary short
121 timeout to ensure that no signal handler has to wait longer than
122 MAXIMUM_WAIT_TIMEOUT before executing.
125 wait_fn: A callable acceptable a single float-valued kwarg named
126 `timeout`. This function is expected to be one of `threading.Event.wait`
127 or `threading.Condition.wait`.
128 wait_complete_fn: A callable taking no arguments and returning a bool.
129 When this function returns true, it indicates that waiting should cease.
130 timeout: An optional float-valued number of seconds after which the wait
132 spin_cb: An optional Callable taking no arguments and returning nothing.
133 This callback will be called on each iteration of the spin. This may be
134 used for, e.g. work related to forking.
137 True if a timeout was supplied and it was reached. False otherwise.
140 while not wait_complete_fn():
141 _wait_once(wait_fn, MAXIMUM_WAIT_TIMEOUT, spin_cb)
143 end = time.time() + timeout
144 while not wait_complete_fn():
145 remaining =
min(end - time.time(), MAXIMUM_WAIT_TIMEOUT)
153 """Validates if the port binding succeed.
155 If the port returned by Core is 0, the binding is failed. However, in that
156 case, the Core API doesn't return a detailed failing reason. The best we
157 can do is raising an exception to prevent further confusion.
160 address: The address string to be bound.
161 port: An int returned by core
166 raise RuntimeError(_ERROR_MESSAGE_PORT_BINDING_FAILED % address)