grpc/_common.py
Go to the documentation of this file.
1 # Copyright 2016 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Shared implementation."""
15 
16 import logging
17 import time
18 
19 import grpc
20 from grpc._cython import cygrpc
21 import six
22 
23 _LOGGER = logging.getLogger(__name__)
24 
25 CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY = {
26  cygrpc.ConnectivityState.idle:
27  grpc.ChannelConnectivity.IDLE,
28  cygrpc.ConnectivityState.connecting:
29  grpc.ChannelConnectivity.CONNECTING,
30  cygrpc.ConnectivityState.ready:
31  grpc.ChannelConnectivity.READY,
32  cygrpc.ConnectivityState.transient_failure:
33  grpc.ChannelConnectivity.TRANSIENT_FAILURE,
34  cygrpc.ConnectivityState.shutdown:
35  grpc.ChannelConnectivity.SHUTDOWN,
36 }
37 
38 CYGRPC_STATUS_CODE_TO_STATUS_CODE = {
39  cygrpc.StatusCode.ok: grpc.StatusCode.OK,
40  cygrpc.StatusCode.cancelled: grpc.StatusCode.CANCELLED,
41  cygrpc.StatusCode.unknown: grpc.StatusCode.UNKNOWN,
42  cygrpc.StatusCode.invalid_argument: grpc.StatusCode.INVALID_ARGUMENT,
43  cygrpc.StatusCode.deadline_exceeded: grpc.StatusCode.DEADLINE_EXCEEDED,
44  cygrpc.StatusCode.not_found: grpc.StatusCode.NOT_FOUND,
45  cygrpc.StatusCode.already_exists: grpc.StatusCode.ALREADY_EXISTS,
46  cygrpc.StatusCode.permission_denied: grpc.StatusCode.PERMISSION_DENIED,
47  cygrpc.StatusCode.unauthenticated: grpc.StatusCode.UNAUTHENTICATED,
48  cygrpc.StatusCode.resource_exhausted: grpc.StatusCode.RESOURCE_EXHAUSTED,
49  cygrpc.StatusCode.failed_precondition: grpc.StatusCode.FAILED_PRECONDITION,
50  cygrpc.StatusCode.aborted: grpc.StatusCode.ABORTED,
51  cygrpc.StatusCode.out_of_range: grpc.StatusCode.OUT_OF_RANGE,
52  cygrpc.StatusCode.unimplemented: grpc.StatusCode.UNIMPLEMENTED,
53  cygrpc.StatusCode.internal: grpc.StatusCode.INTERNAL,
54  cygrpc.StatusCode.unavailable: grpc.StatusCode.UNAVAILABLE,
55  cygrpc.StatusCode.data_loss: grpc.StatusCode.DATA_LOSS,
56 }
57 STATUS_CODE_TO_CYGRPC_STATUS_CODE = {
58  grpc_code: cygrpc_code for cygrpc_code, grpc_code in six.iteritems(
59  CYGRPC_STATUS_CODE_TO_STATUS_CODE)
60 }
61 
62 MAXIMUM_WAIT_TIMEOUT = 0.1
63 
64 _ERROR_MESSAGE_PORT_BINDING_FAILED = 'Failed to bind to address %s; set ' \
65  'GRPC_VERBOSITY=debug environment variable to see detailed error message.'
66 
67 
68 def encode(s):
69  if isinstance(s, bytes):
70  return s
71  else:
72  return s.encode('utf8')
73 
74 
75 def decode(b):
76  if isinstance(b, bytes):
77  return b.decode('utf-8', 'replace')
78  return b
79 
80 
81 def _transform(message, transformer, exception_message):
82  if transformer is None:
83  return message
84  else:
85  try:
86  return transformer(message)
87  except Exception: # pylint: disable=broad-except
88  _LOGGER.exception(exception_message)
89  return None
90 
91 
92 def serialize(message, serializer):
93  return _transform(message, serializer, 'Exception serializing message!')
94 
95 
96 def deserialize(serialized_message, deserializer):
97  return _transform(serialized_message, deserializer,
98  'Exception deserializing message!')
99 
100 
101 def fully_qualified_method(group, method):
102  return '/{}/{}'.format(group, method)
103 
104 
105 def _wait_once(wait_fn, timeout, spin_cb):
106  wait_fn(timeout=timeout)
107  if spin_cb is not None:
108  spin_cb()
109 
110 
111 def wait(wait_fn, wait_complete_fn, timeout=None, spin_cb=None):
112  """Blocks waiting for an event without blocking the thread indefinitely.
113 
114  See https://github.com/grpc/grpc/issues/19464 for full context. CPython's
115  `threading.Event.wait` and `threading.Condition.wait` methods, if invoked
116  without a timeout kwarg, may block the calling thread indefinitely. If the
117  call is made from the main thread, this means that signal handlers may not
118  run for an arbitrarily long period of time.
119 
120  This wrapper calls the supplied wait function with an arbitrary short
121  timeout to ensure that no signal handler has to wait longer than
122  MAXIMUM_WAIT_TIMEOUT before executing.
123 
124  Args:
125  wait_fn: A callable acceptable a single float-valued kwarg named
126  `timeout`. This function is expected to be one of `threading.Event.wait`
127  or `threading.Condition.wait`.
128  wait_complete_fn: A callable taking no arguments and returning a bool.
129  When this function returns true, it indicates that waiting should cease.
130  timeout: An optional float-valued number of seconds after which the wait
131  should cease.
132  spin_cb: An optional Callable taking no arguments and returning nothing.
133  This callback will be called on each iteration of the spin. This may be
134  used for, e.g. work related to forking.
135 
136  Returns:
137  True if a timeout was supplied and it was reached. False otherwise.
138  """
139  if timeout is None:
140  while not wait_complete_fn():
141  _wait_once(wait_fn, MAXIMUM_WAIT_TIMEOUT, spin_cb)
142  else:
143  end = time.time() + timeout
144  while not wait_complete_fn():
145  remaining = min(end - time.time(), MAXIMUM_WAIT_TIMEOUT)
146  if remaining < 0:
147  return True
148  _wait_once(wait_fn, remaining, spin_cb)
149  return False
150 
151 
152 def validate_port_binding_result(address, port):
153  """Validates if the port binding succeed.
154 
155  If the port returned by Core is 0, the binding is failed. However, in that
156  case, the Core API doesn't return a detailed failing reason. The best we
157  can do is raising an exception to prevent further confusion.
158 
159  Args:
160  address: The address string to be bound.
161  port: An int returned by core
162  """
163  if port == 0:
164  # The Core API doesn't return a failure message. The best we can do
165  # is raising an exception to prevent further confusion.
166  raise RuntimeError(_ERROR_MESSAGE_PORT_BINDING_FAILED % address)
167  else:
168  return port
grpc._common.fully_qualified_method
def fully_qualified_method(group, method)
Definition: grpc/_common.py:101
http2_test_server.format
format
Definition: http2_test_server.py:118
grpc._common._transform
def _transform(message, transformer, exception_message)
Definition: grpc/_common.py:81
grpc._common.encode
def encode(s)
Definition: grpc/_common.py:68
grpc._common.validate_port_binding_result
def validate_port_binding_result(address, port)
Definition: grpc/_common.py:152
grpc._common.deserialize
def deserialize(serialized_message, deserializer)
Definition: grpc/_common.py:96
grpc._common.wait
def wait(wait_fn, wait_complete_fn, timeout=None, spin_cb=None)
Definition: grpc/_common.py:111
grpc._common._wait_once
def _wait_once(wait_fn, timeout, spin_cb)
Definition: grpc/_common.py:105
min
#define min(a, b)
Definition: qsort.h:83
grpc._common.serialize
def serialize(message, serializer)
Definition: grpc/_common.py:92
grpc._common.decode
def decode(b)
Definition: grpc/_common.py:75
grpc._cython
Definition: src/python/grpcio/grpc/_cython/__init__.py:1


grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:58:27