grpc/_channel.py
Go to the documentation of this file.
1 # Copyright 2016 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Invocation-side implementation of gRPC Python."""
15 
16 import copy
17 import functools
18 import logging
19 import os
20 import sys
21 import threading
22 import time
23 
24 import grpc
25 from grpc import _common
26 from grpc import _compression
27 from grpc import _grpcio_metadata
28 from grpc._cython import cygrpc
29 import grpc.experimental
30 
31 _LOGGER = logging.getLogger(__name__)
32 
33 _USER_AGENT = 'grpc-python/{}'.format(_grpcio_metadata.__version__)
34 
35 _EMPTY_FLAGS = 0
36 
37 # NOTE(rbellevi): No guarantees are given about the maintenance of this
38 # environment variable.
39 _DEFAULT_SINGLE_THREADED_UNARY_STREAM = os.getenv(
40  "GRPC_SINGLE_THREADED_UNARY_STREAM") is not None
41 
42 _UNARY_UNARY_INITIAL_DUE = (
43  cygrpc.OperationType.send_initial_metadata,
44  cygrpc.OperationType.send_message,
45  cygrpc.OperationType.send_close_from_client,
46  cygrpc.OperationType.receive_initial_metadata,
47  cygrpc.OperationType.receive_message,
48  cygrpc.OperationType.receive_status_on_client,
49 )
50 _UNARY_STREAM_INITIAL_DUE = (
51  cygrpc.OperationType.send_initial_metadata,
52  cygrpc.OperationType.send_message,
53  cygrpc.OperationType.send_close_from_client,
54  cygrpc.OperationType.receive_initial_metadata,
55  cygrpc.OperationType.receive_status_on_client,
56 )
57 _STREAM_UNARY_INITIAL_DUE = (
58  cygrpc.OperationType.send_initial_metadata,
59  cygrpc.OperationType.receive_initial_metadata,
60  cygrpc.OperationType.receive_message,
61  cygrpc.OperationType.receive_status_on_client,
62 )
63 _STREAM_STREAM_INITIAL_DUE = (
64  cygrpc.OperationType.send_initial_metadata,
65  cygrpc.OperationType.receive_initial_metadata,
66  cygrpc.OperationType.receive_status_on_client,
67 )
68 
69 _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = (
70  'Exception calling channel subscription callback!')
71 
72 _OK_RENDEZVOUS_REPR_FORMAT = ('<{} of RPC that terminated with:\n'
73  '\tstatus = {}\n'
74  '\tdetails = "{}"\n'
75  '>')
76 
77 _NON_OK_RENDEZVOUS_REPR_FORMAT = ('<{} of RPC that terminated with:\n'
78  '\tstatus = {}\n'
79  '\tdetails = "{}"\n'
80  '\tdebug_error_string = "{}"\n'
81  '>')
82 
83 
84 def _deadline(timeout):
85  return None if timeout is None else time.time() + timeout
86 
87 
88 def _unknown_code_details(unknown_cygrpc_code, details):
89  return 'Server sent unknown code {} and details "{}"'.format(
90  unknown_cygrpc_code, details)
91 
92 
93 class _RPCState(object):
94 
95  def __init__(self, due, initial_metadata, trailing_metadata, code, details):
96  # `condition` guards all members of _RPCState. `notify_all` is called on
97  # `condition` when the state of the RPC has changed.
98  self.condition = threading.Condition()
99 
100  # The cygrpc.OperationType objects representing events due from the RPC's
101  # completion queue. If an operation is in `due`, it is guaranteed that
102  # `operate()` has been called on a corresponding operation. But the
103  # converse is not true. That is, in the case of failed `operate()`
104  # calls, there may briefly be events in `due` that do not correspond to
105  # operations submitted to Core.
106  self.due = set(due)
107  self.initial_metadata = initial_metadata
108  self.response = None
109  self.trailing_metadata = trailing_metadata
110  self.code = code
111  self.details = details
112  self.debug_error_string = None
113 
114  # The semantics of grpc.Future.cancel and grpc.Future.cancelled are
115  # slightly wonky, so they have to be tracked separately from the rest of the
116  # result of the RPC. This field tracks whether cancellation was requested
117  # prior to termination of the RPC.
118  self.cancelled = False
119  self.callbacks = []
120  self.fork_epoch = cygrpc.get_fork_epoch()
121 
123  self.condition = threading.Condition()
124 
125 
126 def _abort(state, code, details):
127  if state.code is None:
128  state.code = code
129  state.details = details
130  if state.initial_metadata is None:
131  state.initial_metadata = ()
132  state.trailing_metadata = ()
133 
134 
135 def _handle_event(event, state, response_deserializer):
136  callbacks = []
137  for batch_operation in event.batch_operations:
138  operation_type = batch_operation.type()
139  state.due.remove(operation_type)
140  if operation_type == cygrpc.OperationType.receive_initial_metadata:
141  state.initial_metadata = batch_operation.initial_metadata()
142  elif operation_type == cygrpc.OperationType.receive_message:
143  serialized_response = batch_operation.message()
144  if serialized_response is not None:
145  response = _common.deserialize(serialized_response,
146  response_deserializer)
147  if response is None:
148  details = 'Exception deserializing response!'
149  _abort(state, grpc.StatusCode.INTERNAL, details)
150  else:
151  state.response = response
152  elif operation_type == cygrpc.OperationType.receive_status_on_client:
153  state.trailing_metadata = batch_operation.trailing_metadata()
154  if state.code is None:
155  code = _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE.get(
156  batch_operation.code())
157  if code is None:
158  state.code = grpc.StatusCode.UNKNOWN
159  state.details = _unknown_code_details(
160  code, batch_operation.details())
161  else:
162  state.code = code
163  state.details = batch_operation.details()
164  state.debug_error_string = batch_operation.error_string()
165  callbacks.extend(state.callbacks)
166  state.callbacks = None
167  return callbacks
168 
169 
170 def _event_handler(state, response_deserializer):
171 
172  def handle_event(event):
173  with state.condition:
174  callbacks = _handle_event(event, state, response_deserializer)
175  state.condition.notify_all()
176  done = not state.due
177  for callback in callbacks:
178  try:
179  callback()
180  except Exception as e: # pylint: disable=broad-except
181  # NOTE(rbellevi): We suppress but log errors here so as not to
182  # kill the channel spin thread.
183  logging.error('Exception in callback %s: %s',
184  repr(callback.func), repr(e))
185  return done and state.fork_epoch >= cygrpc.get_fork_epoch()
186 
187  return handle_event
188 
189 
190 #pylint: disable=too-many-statements
191 def _consume_request_iterator(request_iterator, state, call, request_serializer,
192  event_handler):
193  """Consume a request iterator supplied by the user."""
194 
195  def consume_request_iterator(): # pylint: disable=too-many-branches
196  # Iterate over the request iterator until it is exhausted or an error
197  # condition is encountered.
198  while True:
199  return_from_user_request_generator_invoked = False
200  try:
201  # The thread may die in user-code. Do not block fork for this.
202  cygrpc.enter_user_request_generator()
203  request = next(request_iterator)
204  except StopIteration:
205  break
206  except Exception: # pylint: disable=broad-except
207  cygrpc.return_from_user_request_generator()
208  return_from_user_request_generator_invoked = True
209  code = grpc.StatusCode.UNKNOWN
210  details = 'Exception iterating requests!'
211  _LOGGER.exception(details)
212  call.cancel(_common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code],
213  details)
214  _abort(state, code, details)
215  return
216  finally:
217  if not return_from_user_request_generator_invoked:
218  cygrpc.return_from_user_request_generator()
219  serialized_request = _common.serialize(request, request_serializer)
220  with state.condition:
221  if state.code is None and not state.cancelled:
222  if serialized_request is None:
223  code = grpc.StatusCode.INTERNAL
224  details = 'Exception serializing request!'
225  call.cancel(
226  _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code],
227  details)
228  _abort(state, code, details)
229  return
230  else:
231  state.due.add(cygrpc.OperationType.send_message)
232  operations = (cygrpc.SendMessageOperation(
233  serialized_request, _EMPTY_FLAGS),)
234  operating = call.operate(operations, event_handler)
235  if not operating:
236  state.due.remove(cygrpc.OperationType.send_message)
237  return
238 
239  def _done():
240  return (state.code is not None or
241  cygrpc.OperationType.send_message
242  not in state.due)
243 
244  _common.wait(state.condition.wait,
245  _done,
246  spin_cb=functools.partial(
247  cygrpc.block_if_fork_in_progress,
248  state))
249  if state.code is not None:
250  return
251  else:
252  return
253  with state.condition:
254  if state.code is None:
255  state.due.add(cygrpc.OperationType.send_close_from_client)
256  operations = (
257  cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),)
258  operating = call.operate(operations, event_handler)
259  if not operating:
260  state.due.remove(
261  cygrpc.OperationType.send_close_from_client)
262 
263  consumption_thread = cygrpc.ForkManagedThread(
264  target=consume_request_iterator)
265  consumption_thread.setDaemon(True)
266  consumption_thread.start()
267 
268 
269 def _rpc_state_string(class_name, rpc_state):
270  """Calculates error string for RPC."""
271  with rpc_state.condition:
272  if rpc_state.code is None:
273  return '<{} object>'.format(class_name)
274  elif rpc_state.code is grpc.StatusCode.OK:
275  return _OK_RENDEZVOUS_REPR_FORMAT.format(class_name, rpc_state.code,
276  rpc_state.details)
277  else:
278  return _NON_OK_RENDEZVOUS_REPR_FORMAT.format(
279  class_name, rpc_state.code, rpc_state.details,
280  rpc_state.debug_error_string)
281 
282 
284  """An RPC error not tied to the execution of a particular RPC.
285 
286  The RPC represented by the state object must not be in-progress or
287  cancelled.
288 
289  Attributes:
290  _state: An instance of _RPCState.
291  """
292 
293  def __init__(self, state):
294  with state.condition:
295  self._state = _RPCState((), copy.deepcopy(state.initial_metadata),
296  copy.deepcopy(state.trailing_metadata),
297  state.code, copy.deepcopy(state.details))
298  self._state.response = copy.copy(state.response)
299  self._state.debug_error_string = copy.copy(state.debug_error_string)
300 
301  def initial_metadata(self):
302  return self._state.initial_metadata
303 
304  def trailing_metadata(self):
305  return self._state.trailing_metadata
306 
307  def code(self):
308  return self._state.code
309 
310  def details(self):
311  return _common.decode(self._state.details)
312 
314  return _common.decode(self._state.debug_error_string)
315 
316  def _repr(self):
317  return _rpc_state_string(self.__class__.__name__, self._state)
318 
319  def __repr__(self):
320  return self._repr()
321 
322  def __str__(self):
323  return self._repr()
324 
325  def cancel(self):
326  """See grpc.Future.cancel."""
327  return False
328 
329  def cancelled(self):
330  """See grpc.Future.cancelled."""
331  return False
332 
333  def running(self):
334  """See grpc.Future.running."""
335  return False
336 
337  def done(self):
338  """See grpc.Future.done."""
339  return True
340 
341  def result(self, timeout=None): # pylint: disable=unused-argument
342  """See grpc.Future.result."""
343  raise self
344 
345  def exception(self, timeout=None): # pylint: disable=unused-argument
346  """See grpc.Future.exception."""
347  return self
348 
349  def traceback(self, timeout=None): # pylint: disable=unused-argument
350  """See grpc.Future.traceback."""
351  try:
352  raise self
353  except grpc.RpcError:
354  return sys.exc_info()[2]
355 
356  def add_done_callback(self, fn, timeout=None): # pylint: disable=unused-argument
357  """See grpc.Future.add_done_callback."""
358  fn(self)
359 
360 
362  """An RPC iterator.
363 
364  Attributes:
365  _state: An instance of _RPCState.
366  _call: An instance of SegregatedCall or IntegratedCall.
367  In either case, the _call object is expected to have operate, cancel,
368  and next_event methods.
369  _response_deserializer: A callable taking bytes and return a Python
370  object.
371  _deadline: A float representing the deadline of the RPC in seconds. Or
372  possibly None, to represent an RPC with no deadline at all.
373  """
374 
375  def __init__(self, state, call, response_deserializer, deadline):
376  super(_Rendezvous, self).__init__()
377  self._state = state
378  self._call = call
379  self._response_deserializer = response_deserializer
380  self._deadline = deadline
381 
382  def is_active(self):
383  """See grpc.RpcContext.is_active"""
384  with self._state.condition:
385  return self._state.code is None
386 
387  def time_remaining(self):
388  """See grpc.RpcContext.time_remaining"""
389  with self._state.condition:
390  if self._deadline is None:
391  return None
392  else:
393  return max(self._deadline - time.time(), 0)
394 
395  def cancel(self):
396  """See grpc.RpcContext.cancel"""
397  with self._state.condition:
398  if self._state.code is None:
399  code = grpc.StatusCode.CANCELLED
400  details = 'Locally cancelled by application!'
401  self._call.cancel(
402  _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details)
403  self._state.cancelled = True
404  _abort(self._state, code, details)
405  self._state.condition.notify_all()
406  return True
407  else:
408  return False
409 
410  def add_callback(self, callback):
411  """See grpc.RpcContext.add_callback"""
412  with self._state.condition:
413  if self._state.callbacks is None:
414  return False
415  else:
416  self._state.callbacks.append(callback)
417  return True
418 
419  def __iter__(self):
420  return self
421 
422  def next(self):
423  return self._next()
424 
425  def __next__(self):
426  return self._next()
427 
428  def _next(self):
429  raise NotImplementedError()
430 
432  raise NotImplementedError()
433 
434  def _repr(self):
435  return _rpc_state_string(self.__class__.__name__, self._state)
436 
437  def __repr__(self):
438  return self._repr()
439 
440  def __str__(self):
441  return self._repr()
442 
443  def __del__(self):
444  with self._state.condition:
445  if self._state.code is None:
446  self._state.code = grpc.StatusCode.CANCELLED
447  self._state.details = 'Cancelled upon garbage collection!'
448  self._state.cancelled = True
449  self._call.cancel(
450  _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[self._state.code],
451  self._state.details)
452  self._state.condition.notify_all()
453 
454 
455 class _SingleThreadedRendezvous(_Rendezvous, grpc.Call, grpc.Future): # pylint: disable=too-many-ancestors
456  """An RPC iterator operating entirely on a single thread.
457 
458  The __next__ method of _SingleThreadedRendezvous does not depend on the
459  existence of any other thread, including the "channel spin thread".
460  However, this means that its interface is entirely synchronous. So this
461  class cannot completely fulfill the grpc.Future interface. The result,
462  exception, and traceback methods will never block and will instead raise
463  an exception if calling the method would result in blocking.
464 
465  This means that these methods are safe to call from add_done_callback
466  handlers.
467  """
468 
469  def _is_complete(self):
470  return self._state.code is not None
471 
472  def cancelled(self):
473  with self._state.condition:
474  return self._state.cancelled
475 
476  def running(self):
477  with self._state.condition:
478  return self._state.code is None
479 
480  def done(self):
481  with self._state.condition:
482  return self._state.code is not None
483 
484  def result(self, timeout=None):
485  """Returns the result of the computation or raises its exception.
486 
487  This method will never block. Instead, it will raise an exception
488  if calling this method would otherwise result in blocking.
489 
490  Since this method will never block, any `timeout` argument passed will
491  be ignored.
492  """
493  del timeout
494  with self._state.condition:
495  if not self._is_complete():
497  "_SingleThreadedRendezvous only supports result() when the RPC is complete."
498  )
499  if self._state.code is grpc.StatusCode.OK:
500  return self._state.response
501  elif self._state.cancelled:
503  else:
504  raise self
505 
506  def exception(self, timeout=None):
507  """Return the exception raised by the computation.
508 
509  This method will never block. Instead, it will raise an exception
510  if calling this method would otherwise result in blocking.
511 
512  Since this method will never block, any `timeout` argument passed will
513  be ignored.
514  """
515  del timeout
516  with self._state.condition:
517  if not self._is_complete():
519  "_SingleThreadedRendezvous only supports exception() when the RPC is complete."
520  )
521  if self._state.code is grpc.StatusCode.OK:
522  return None
523  elif self._state.cancelled:
525  else:
526  return self
527 
528  def traceback(self, timeout=None):
529  """Access the traceback of the exception raised by the computation.
530 
531  This method will never block. Instead, it will raise an exception
532  if calling this method would otherwise result in blocking.
533 
534  Since this method will never block, any `timeout` argument passed will
535  be ignored.
536  """
537  del timeout
538  with self._state.condition:
539  if not self._is_complete():
541  "_SingleThreadedRendezvous only supports traceback() when the RPC is complete."
542  )
543  if self._state.code is grpc.StatusCode.OK:
544  return None
545  elif self._state.cancelled:
547  else:
548  try:
549  raise self
550  except grpc.RpcError:
551  return sys.exc_info()[2]
552 
553  def add_done_callback(self, fn):
554  with self._state.condition:
555  if self._state.code is None:
556  self._state.callbacks.append(functools.partial(fn, self))
557  return
558 
559  fn(self)
560 
561  def initial_metadata(self):
562  """See grpc.Call.initial_metadata"""
563  with self._state.condition:
564  # NOTE(gnossen): Based on our initial call batch, we are guaranteed
565  # to receive initial metadata before any messages.
566  while self._state.initial_metadata is None:
567  self._consume_next_event()
568  return self._state.initial_metadata
569 
570  def trailing_metadata(self):
571  """See grpc.Call.trailing_metadata"""
572  with self._state.condition:
573  if self._state.trailing_metadata is None:
575  "Cannot get trailing metadata until RPC is completed.")
576  return self._state.trailing_metadata
577 
578  def code(self):
579  """See grpc.Call.code"""
580  with self._state.condition:
581  if self._state.code is None:
583  "Cannot get code until RPC is completed.")
584  return self._state.code
585 
586  def details(self):
587  """See grpc.Call.details"""
588  with self._state.condition:
589  if self._state.details is None:
591  "Cannot get details until RPC is completed.")
592  return _common.decode(self._state.details)
593 
595  event = self._call.next_event()
596  with self._state.condition:
597  callbacks = _handle_event(event, self._state,
599  for callback in callbacks:
600  # NOTE(gnossen): We intentionally allow exceptions to bubble up
601  # to the user when running on a single thread.
602  callback()
603  return event
604 
605  def _next_response(self):
606  while True:
607  self._consume_next_event()
608  with self._state.condition:
609  if self._state.response is not None:
610  response = self._state.response
611  self._state.response = None
612  return response
613  elif cygrpc.OperationType.receive_message not in self._state.due:
614  if self._state.code is grpc.StatusCode.OK:
615  raise StopIteration()
616  elif self._state.code is not None:
617  raise self
618 
619  def _next(self):
620  with self._state.condition:
621  if self._state.code is None:
622  # We tentatively add the operation as expected and remove
623  # it if the enqueue operation fails. This allows us to guarantee that
624  # if an event has been submitted to the core completion queue,
625  # it is in `due`. If we waited until after a successful
626  # enqueue operation then a signal could interrupt this
627  # thread between the enqueue operation and the addition of the
628  # operation to `due`. This would cause an exception on the
629  # channel spin thread when the operation completes and no
630  # corresponding operation would be present in state.due.
631  # Note that, since `condition` is held through this block, there is
632  # no data race on `due`.
633  self._state.due.add(cygrpc.OperationType.receive_message)
634  operating = self._call.operate(
635  (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), None)
636  if not operating:
637  self._state.due.remove(cygrpc.OperationType.receive_message)
638  elif self._state.code is grpc.StatusCode.OK:
639  raise StopIteration()
640  else:
641  raise self
642  return self._next_response()
643 
645  with self._state.condition:
646  if self._state.debug_error_string is None:
648  "Cannot get debug error string until RPC is completed.")
649  return _common.decode(self._state.debug_error_string)
650 
651 
652 class _MultiThreadedRendezvous(_Rendezvous, grpc.Call, grpc.Future): # pylint: disable=too-many-ancestors
653  """An RPC iterator that depends on a channel spin thread.
654 
655  This iterator relies upon a per-channel thread running in the background,
656  dequeueing events from the completion queue, and notifying threads waiting
657  on the threading.Condition object in the _RPCState object.
658 
659  This extra thread allows _MultiThreadedRendezvous to fulfill the grpc.Future interface
660  and to mediate a bidirection streaming RPC.
661  """
662 
663  def initial_metadata(self):
664  """See grpc.Call.initial_metadata"""
665  with self._state.condition:
666 
667  def _done():
668  return self._state.initial_metadata is not None
669 
670  _common.wait(self._state.condition.wait, _done)
671  return self._state.initial_metadata
672 
673  def trailing_metadata(self):
674  """See grpc.Call.trailing_metadata"""
675  with self._state.condition:
676 
677  def _done():
678  return self._state.trailing_metadata is not None
679 
680  _common.wait(self._state.condition.wait, _done)
681  return self._state.trailing_metadata
682 
683  def code(self):
684  """See grpc.Call.code"""
685  with self._state.condition:
686 
687  def _done():
688  return self._state.code is not None
689 
690  _common.wait(self._state.condition.wait, _done)
691  return self._state.code
692 
693  def details(self):
694  """See grpc.Call.details"""
695  with self._state.condition:
696 
697  def _done():
698  return self._state.details is not None
699 
700  _common.wait(self._state.condition.wait, _done)
701  return _common.decode(self._state.details)
702 
704  with self._state.condition:
705 
706  def _done():
707  return self._state.debug_error_string is not None
708 
709  _common.wait(self._state.condition.wait, _done)
710  return _common.decode(self._state.debug_error_string)
711 
712  def cancelled(self):
713  with self._state.condition:
714  return self._state.cancelled
715 
716  def running(self):
717  with self._state.condition:
718  return self._state.code is None
719 
720  def done(self):
721  with self._state.condition:
722  return self._state.code is not None
723 
724  def _is_complete(self):
725  return self._state.code is not None
726 
727  def result(self, timeout=None):
728  """Returns the result of the computation or raises its exception.
729 
730  See grpc.Future.result for the full API contract.
731  """
732  with self._state.condition:
733  timed_out = _common.wait(self._state.condition.wait,
734  self._is_complete,
735  timeout=timeout)
736  if timed_out:
738  else:
739  if self._state.code is grpc.StatusCode.OK:
740  return self._state.response
741  elif self._state.cancelled:
743  else:
744  raise self
745 
746  def exception(self, timeout=None):
747  """Return the exception raised by the computation.
748 
749  See grpc.Future.exception for the full API contract.
750  """
751  with self._state.condition:
752  timed_out = _common.wait(self._state.condition.wait,
753  self._is_complete,
754  timeout=timeout)
755  if timed_out:
757  else:
758  if self._state.code is grpc.StatusCode.OK:
759  return None
760  elif self._state.cancelled:
762  else:
763  return self
764 
765  def traceback(self, timeout=None):
766  """Access the traceback of the exception raised by the computation.
767 
768  See grpc.future.traceback for the full API contract.
769  """
770  with self._state.condition:
771  timed_out = _common.wait(self._state.condition.wait,
772  self._is_complete,
773  timeout=timeout)
774  if timed_out:
776  else:
777  if self._state.code is grpc.StatusCode.OK:
778  return None
779  elif self._state.cancelled:
781  else:
782  try:
783  raise self
784  except grpc.RpcError:
785  return sys.exc_info()[2]
786 
787  def add_done_callback(self, fn):
788  with self._state.condition:
789  if self._state.code is None:
790  self._state.callbacks.append(functools.partial(fn, self))
791  return
792 
793  fn(self)
794 
795  def _next(self):
796  with self._state.condition:
797  if self._state.code is None:
798  event_handler = _event_handler(self._state,
800  self._state.due.add(cygrpc.OperationType.receive_message)
801  operating = self._call.operate(
802  (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
803  event_handler)
804  if not operating:
805  self._state.due.remove(cygrpc.OperationType.receive_message)
806  elif self._state.code is grpc.StatusCode.OK:
807  raise StopIteration()
808  else:
809  raise self
810 
811  def _response_ready():
812  return (self._state.response is not None or
813  (cygrpc.OperationType.receive_message
814  not in self._state.due and
815  self._state.code is not None))
816 
817  _common.wait(self._state.condition.wait, _response_ready)
818  if self._state.response is not None:
819  response = self._state.response
820  self._state.response = None
821  return response
822  elif cygrpc.OperationType.receive_message not in self._state.due:
823  if self._state.code is grpc.StatusCode.OK:
824  raise StopIteration()
825  elif self._state.code is not None:
826  raise self
827 
828 
829 def _start_unary_request(request, timeout, request_serializer):
830  deadline = _deadline(timeout)
831  serialized_request = _common.serialize(request, request_serializer)
832  if serialized_request is None:
833  state = _RPCState((), (), (), grpc.StatusCode.INTERNAL,
834  'Exception serializing request!')
835  error = _InactiveRpcError(state)
836  return deadline, None, error
837  else:
838  return deadline, serialized_request, None
839 
840 
841 def _end_unary_response_blocking(state, call, with_call, deadline):
842  if state.code is grpc.StatusCode.OK:
843  if with_call:
844  rendezvous = _MultiThreadedRendezvous(state, call, None, deadline)
845  return state.response, rendezvous
846  else:
847  return state.response
848  else:
849  raise _InactiveRpcError(state)
850 
851 
852 def _stream_unary_invocation_operationses(metadata, initial_metadata_flags):
853  return (
854  (
855  cygrpc.SendInitialMetadataOperation(metadata,
856  initial_metadata_flags),
857  cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
858  cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
859  ),
860  (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
861  )
862 
863 
865  initial_metadata_flags):
866  return tuple((
867  operations,
868  None,
869  ) for operations in _stream_unary_invocation_operationses(
870  metadata, initial_metadata_flags))
871 
872 
873 def _determine_deadline(user_deadline):
874  parent_deadline = cygrpc.get_deadline_from_context()
875  if parent_deadline is None and user_deadline is None:
876  return None
877  elif parent_deadline is not None and user_deadline is None:
878  return parent_deadline
879  elif user_deadline is not None and parent_deadline is None:
880  return user_deadline
881  else:
882  return min(parent_deadline, user_deadline)
883 
884 
886 
887  # pylint: disable=too-many-arguments
888  def __init__(self, channel, managed_call, method, request_serializer,
889  response_deserializer):
890  self._channel = channel
891  self._managed_call = managed_call
892  self._method = method
893  self._request_serializer = request_serializer
894  self._response_deserializer = response_deserializer
895  self._context = cygrpc.build_census_context()
896 
897  def _prepare(self, request, timeout, metadata, wait_for_ready, compression):
898  deadline, serialized_request, rendezvous = _start_unary_request(
899  request, timeout, self._request_serializer)
900  initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
901  wait_for_ready)
902  augmented_metadata = _compression.augment_metadata(
903  metadata, compression)
904  if serialized_request is None:
905  return None, None, None, rendezvous
906  else:
907  state = _RPCState(_UNARY_UNARY_INITIAL_DUE, None, None, None, None)
908  operations = (
909  cygrpc.SendInitialMetadataOperation(augmented_metadata,
910  initial_metadata_flags),
911  cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS),
912  cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
913  cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),
914  cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
915  cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
916  )
917  return state, operations, deadline, None
918 
919  def _blocking(self, request, timeout, metadata, credentials, wait_for_ready,
920  compression):
921  state, operations, deadline, rendezvous = self._prepare(
922  request, timeout, metadata, wait_for_ready, compression)
923  if state is None:
924  raise rendezvous # pylint: disable-msg=raising-bad-type
925  else:
926  call = self._channel.segregated_call(
927  cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
928  self._method, None, _determine_deadline(deadline), metadata,
929  None if credentials is None else credentials._credentials, ((
930  operations,
931  None,
932  ),), self._context)
933  event = call.next_event()
934  _handle_event(event, state, self._response_deserializer)
935  return state, call
936 
937  def __call__(self,
938  request,
939  timeout=None,
940  metadata=None,
941  credentials=None,
942  wait_for_ready=None,
943  compression=None):
944  state, call, = self._blocking(request, timeout, metadata, credentials,
945  wait_for_ready, compression)
946  return _end_unary_response_blocking(state, call, False, None)
947 
948  def with_call(self,
949  request,
950  timeout=None,
951  metadata=None,
952  credentials=None,
953  wait_for_ready=None,
954  compression=None):
955  state, call, = self._blocking(request, timeout, metadata, credentials,
956  wait_for_ready, compression)
957  return _end_unary_response_blocking(state, call, True, None)
958 
959  def future(self,
960  request,
961  timeout=None,
962  metadata=None,
963  credentials=None,
964  wait_for_ready=None,
965  compression=None):
966  state, operations, deadline, rendezvous = self._prepare(
967  request, timeout, metadata, wait_for_ready, compression)
968  if state is None:
969  raise rendezvous # pylint: disable-msg=raising-bad-type
970  else:
971  event_handler = _event_handler(state, self._response_deserializer)
972  call = self._managed_call(
973  cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
974  self._method, None, deadline, metadata,
975  None if credentials is None else credentials._credentials,
976  (operations,), event_handler, self._context)
977  return _MultiThreadedRendezvous(state, call,
979  deadline)
980 
981 
983 
984  # pylint: disable=too-many-arguments
985  def __init__(self, channel, method, request_serializer,
986  response_deserializer):
987  self._channel = channel
988  self._method = method
989  self._request_serializer = request_serializer
990  self._response_deserializer = response_deserializer
991  self._context = cygrpc.build_census_context()
992 
993  def __call__( # pylint: disable=too-many-locals
994  self,
995  request,
996  timeout=None,
997  metadata=None,
998  credentials=None,
999  wait_for_ready=None,
1000  compression=None):
1001  deadline = _deadline(timeout)
1002  serialized_request = _common.serialize(request,
1003  self._request_serializer)
1004  if serialized_request is None:
1005  state = _RPCState((), (), (), grpc.StatusCode.INTERNAL,
1006  'Exception serializing request!')
1007  raise _InactiveRpcError(state)
1008 
1009  state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None)
1010  call_credentials = None if credentials is None else credentials._credentials
1011  initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1012  wait_for_ready)
1013  augmented_metadata = _compression.augment_metadata(
1014  metadata, compression)
1015  operations = (
1016  (cygrpc.SendInitialMetadataOperation(augmented_metadata,
1017  initial_metadata_flags),
1018  cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS),
1019  cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS)),
1020  (cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),),
1021  (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
1022  )
1023  operations_and_tags = tuple((ops, None) for ops in operations)
1024  call = self._channel.segregated_call(
1025  cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self._method,
1026  None, _determine_deadline(deadline), metadata, call_credentials,
1027  operations_and_tags, self._context)
1028  return _SingleThreadedRendezvous(state, call,
1029  self._response_deserializer, deadline)
1030 
1031 
1033 
1034  # pylint: disable=too-many-arguments
1035  def __init__(self, channel, managed_call, method, request_serializer,
1036  response_deserializer):
1037  self._channel = channel
1038  self._managed_call = managed_call
1039  self._method = method
1040  self._request_serializer = request_serializer
1041  self._response_deserializer = response_deserializer
1042  self._context = cygrpc.build_census_context()
1043 
1044  def __call__( # pylint: disable=too-many-locals
1045  self,
1046  request,
1047  timeout=None,
1048  metadata=None,
1049  credentials=None,
1050  wait_for_ready=None,
1051  compression=None):
1052  deadline, serialized_request, rendezvous = _start_unary_request(
1053  request, timeout, self._request_serializer)
1054  initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1055  wait_for_ready)
1056  if serialized_request is None:
1057  raise rendezvous # pylint: disable-msg=raising-bad-type
1058  else:
1059  augmented_metadata = _compression.augment_metadata(
1060  metadata, compression)
1061  state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None)
1062  operationses = (
1063  (
1064  cygrpc.SendInitialMetadataOperation(augmented_metadata,
1065  initial_metadata_flags),
1066  cygrpc.SendMessageOperation(serialized_request,
1067  _EMPTY_FLAGS),
1068  cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
1069  cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
1070  ),
1071  (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
1072  )
1073  call = self._managed_call(
1074  cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
1075  self._method, None, _determine_deadline(deadline), metadata,
1076  None if credentials is None else credentials._credentials,
1077  operationses, _event_handler(state,
1078  self._response_deserializer),
1079  self._context)
1080  return _MultiThreadedRendezvous(state, call,
1082  deadline)
1083 
1084 
1086 
1087  # pylint: disable=too-many-arguments
1088  def __init__(self, channel, managed_call, method, request_serializer,
1089  response_deserializer):
1090  self._channel = channel
1091  self._managed_call = managed_call
1092  self._method = method
1093  self._request_serializer = request_serializer
1094  self._response_deserializer = response_deserializer
1095  self._context = cygrpc.build_census_context()
1096 
1097  def _blocking(self, request_iterator, timeout, metadata, credentials,
1098  wait_for_ready, compression):
1099  deadline = _deadline(timeout)
1100  state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None)
1101  initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1102  wait_for_ready)
1103  augmented_metadata = _compression.augment_metadata(
1104  metadata, compression)
1105  call = self._channel.segregated_call(
1106  cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self._method,
1107  None, _determine_deadline(deadline), augmented_metadata,
1108  None if credentials is None else credentials._credentials,
1110  augmented_metadata, initial_metadata_flags), self._context)
1111  _consume_request_iterator(request_iterator, state, call,
1112  self._request_serializer, None)
1113  while True:
1114  event = call.next_event()
1115  with state.condition:
1116  _handle_event(event, state, self._response_deserializer)
1117  state.condition.notify_all()
1118  if not state.due:
1119  break
1120  return state, call
1121 
1122  def __call__(self,
1123  request_iterator,
1124  timeout=None,
1125  metadata=None,
1126  credentials=None,
1127  wait_for_ready=None,
1128  compression=None):
1129  state, call, = self._blocking(request_iterator, timeout, metadata,
1130  credentials, wait_for_ready, compression)
1131  return _end_unary_response_blocking(state, call, False, None)
1132 
1133  def with_call(self,
1134  request_iterator,
1135  timeout=None,
1136  metadata=None,
1137  credentials=None,
1138  wait_for_ready=None,
1139  compression=None):
1140  state, call, = self._blocking(request_iterator, timeout, metadata,
1141  credentials, wait_for_ready, compression)
1142  return _end_unary_response_blocking(state, call, True, None)
1143 
1144  def future(self,
1145  request_iterator,
1146  timeout=None,
1147  metadata=None,
1148  credentials=None,
1149  wait_for_ready=None,
1150  compression=None):
1151  deadline = _deadline(timeout)
1152  state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None)
1153  event_handler = _event_handler(state, self._response_deserializer)
1154  initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1155  wait_for_ready)
1156  augmented_metadata = _compression.augment_metadata(
1157  metadata, compression)
1158  call = self._managed_call(
1159  cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self._method,
1160  None, deadline, augmented_metadata,
1161  None if credentials is None else credentials._credentials,
1163  initial_metadata_flags),
1164  event_handler, self._context)
1165  _consume_request_iterator(request_iterator, state, call,
1166  self._request_serializer, event_handler)
1167  return _MultiThreadedRendezvous(state, call,
1168  self._response_deserializer, deadline)
1169 
1170 
1172 
1173  # pylint: disable=too-many-arguments
1174  def __init__(self, channel, managed_call, method, request_serializer,
1175  response_deserializer):
1176  self._channel = channel
1177  self._managed_call = managed_call
1178  self._method = method
1179  self._request_serializer = request_serializer
1180  self._response_deserializer = response_deserializer
1181  self._context = cygrpc.build_census_context()
1182 
1183  def __call__(self,
1184  request_iterator,
1185  timeout=None,
1186  metadata=None,
1187  credentials=None,
1188  wait_for_ready=None,
1189  compression=None):
1190  deadline = _deadline(timeout)
1191  state = _RPCState(_STREAM_STREAM_INITIAL_DUE, None, None, None, None)
1192  initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1193  wait_for_ready)
1194  augmented_metadata = _compression.augment_metadata(
1195  metadata, compression)
1196  operationses = (
1197  (
1198  cygrpc.SendInitialMetadataOperation(augmented_metadata,
1199  initial_metadata_flags),
1200  cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
1201  ),
1202  (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
1203  )
1204  event_handler = _event_handler(state, self._response_deserializer)
1205  call = self._managed_call(
1206  cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self._method,
1207  None, _determine_deadline(deadline), augmented_metadata,
1208  None if credentials is None else credentials._credentials,
1209  operationses, event_handler, self._context)
1210  _consume_request_iterator(request_iterator, state, call,
1211  self._request_serializer, event_handler)
1212  return _MultiThreadedRendezvous(state, call,
1213  self._response_deserializer, deadline)
1214 
1215 
1217  """Stores immutable initial metadata flags"""
1218 
1219  def __new__(cls, value=_EMPTY_FLAGS):
1220  value &= cygrpc.InitialMetadataFlags.used_mask
1221  return super(_InitialMetadataFlags, cls).__new__(cls, value)
1222 
1223  def with_wait_for_ready(self, wait_for_ready):
1224  if wait_for_ready is not None:
1225  if wait_for_ready:
1226  return self.__class__(self | cygrpc.InitialMetadataFlags.wait_for_ready | \
1227  cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set)
1228  elif not wait_for_ready:
1229  return self.__class__(self & ~cygrpc.InitialMetadataFlags.wait_for_ready | \
1230  cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set)
1231  return self
1232 
1233 
1234 class _ChannelCallState(object):
1235 
1236  def __init__(self, channel):
1237  self.lock = threading.Lock()
1238  self.channel = channel
1239  self.managed_calls = 0
1240  self.threading = False
1241 
1243  self.managed_calls = 0
1244 
1245  def __del__(self):
1246  try:
1247  self.channel.close(cygrpc.StatusCode.cancelled,
1248  'Channel deallocated!')
1249  except (TypeError, AttributeError):
1250  pass
1251 
1252 
1254 
1255  def channel_spin():
1256  while True:
1257  cygrpc.block_if_fork_in_progress(state)
1258  event = state.channel.next_call_event()
1259  if event.completion_type == cygrpc.CompletionType.queue_timeout:
1260  continue
1261  call_completed = event.tag(event)
1262  if call_completed:
1263  with state.lock:
1264  state.managed_calls -= 1
1265  if state.managed_calls == 0:
1266  return
1267 
1268  channel_spin_thread = cygrpc.ForkManagedThread(target=channel_spin)
1269  channel_spin_thread.setDaemon(True)
1270  channel_spin_thread.start()
1271 
1272 
1274 
1275  # pylint: disable=too-many-arguments
1276  def create(flags, method, host, deadline, metadata, credentials,
1277  operationses, event_handler, context):
1278  """Creates a cygrpc.IntegratedCall.
1279 
1280  Args:
1281  flags: An integer bitfield of call flags.
1282  method: The RPC method.
1283  host: A host string for the created call.
1284  deadline: A float to be the deadline of the created call or None if
1285  the call is to have an infinite deadline.
1286  metadata: The metadata for the call or None.
1287  credentials: A cygrpc.CallCredentials or None.
1288  operationses: An iterable of iterables of cygrpc.Operations to be
1289  started on the call.
1290  event_handler: A behavior to call to handle the events resultant from
1291  the operations on the call.
1292  context: Context object for distributed tracing.
1293  Returns:
1294  A cygrpc.IntegratedCall with which to conduct an RPC.
1295  """
1296  operationses_and_tags = tuple((
1297  operations,
1298  event_handler,
1299  ) for operations in operationses)
1300  with state.lock:
1301  call = state.channel.integrated_call(flags, method, host, deadline,
1302  metadata, credentials,
1303  operationses_and_tags, context)
1304  if state.managed_calls == 0:
1305  state.managed_calls = 1
1307  else:
1308  state.managed_calls += 1
1309  return call
1310 
1311  return create
1312 
1313 
1315 
1316  def __init__(self, channel):
1317  self.lock = threading.RLock()
1318  self.channel = channel
1319  self.polling = False
1320  self.connectivity = None
1321  self.try_to_connect = False
1323  self.delivering = False
1324 
1326  self.polling = False
1327  self.connectivity = None
1328  self.try_to_connect = False
1330  self.delivering = False
1331 
1332 
1333 def _deliveries(state):
1334  callbacks_needing_update = []
1335  for callback_and_connectivity in state.callbacks_and_connectivities:
1336  callback, callback_connectivity, = callback_and_connectivity
1337  if callback_connectivity is not state.connectivity:
1338  callbacks_needing_update.append(callback)
1339  callback_and_connectivity[1] = state.connectivity
1340  return callbacks_needing_update
1341 
1342 
1343 def _deliver(state, initial_connectivity, initial_callbacks):
1344  connectivity = initial_connectivity
1345  callbacks = initial_callbacks
1346  while True:
1347  for callback in callbacks:
1348  cygrpc.block_if_fork_in_progress(state)
1349  try:
1350  callback(connectivity)
1351  except Exception: # pylint: disable=broad-except
1352  _LOGGER.exception(
1353  _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE)
1354  with state.lock:
1355  callbacks = _deliveries(state)
1356  if callbacks:
1357  connectivity = state.connectivity
1358  else:
1359  state.delivering = False
1360  return
1361 
1362 
1363 def _spawn_delivery(state, callbacks):
1364  delivering_thread = cygrpc.ForkManagedThread(target=_deliver,
1365  args=(
1366  state,
1367  state.connectivity,
1368  callbacks,
1369  ))
1370  delivering_thread.setDaemon(True)
1371  delivering_thread.start()
1372  state.delivering = True
1373 
1374 
1375 # NOTE(https://github.com/grpc/grpc/issues/3064): We'd rather not poll.
1376 def _poll_connectivity(state, channel, initial_try_to_connect):
1377  try_to_connect = initial_try_to_connect
1378  connectivity = channel.check_connectivity_state(try_to_connect)
1379  with state.lock:
1380  state.connectivity = (
1381  _common.
1382  CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[connectivity])
1383  callbacks = tuple(
1384  callback for callback, unused_but_known_to_be_none_connectivity in
1385  state.callbacks_and_connectivities)
1386  for callback_and_connectivity in state.callbacks_and_connectivities:
1387  callback_and_connectivity[1] = state.connectivity
1388  if callbacks:
1389  _spawn_delivery(state, callbacks)
1390  while True:
1391  event = channel.watch_connectivity_state(connectivity,
1392  time.time() + 0.2)
1393  cygrpc.block_if_fork_in_progress(state)
1394  with state.lock:
1395  if not state.callbacks_and_connectivities and not state.try_to_connect:
1396  state.polling = False
1397  state.connectivity = None
1398  break
1399  try_to_connect = state.try_to_connect
1400  state.try_to_connect = False
1401  if event.success or try_to_connect:
1402  connectivity = channel.check_connectivity_state(try_to_connect)
1403  with state.lock:
1404  state.connectivity = (
1405  _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[
1406  connectivity])
1407  if not state.delivering:
1408  callbacks = _deliveries(state)
1409  if callbacks:
1410  _spawn_delivery(state, callbacks)
1411 
1412 
1413 def _subscribe(state, callback, try_to_connect):
1414  with state.lock:
1415  if not state.callbacks_and_connectivities and not state.polling:
1416  polling_thread = cygrpc.ForkManagedThread(
1417  target=_poll_connectivity,
1418  args=(state, state.channel, bool(try_to_connect)))
1419  polling_thread.setDaemon(True)
1420  polling_thread.start()
1421  state.polling = True
1422  state.callbacks_and_connectivities.append([callback, None])
1423  elif not state.delivering and state.connectivity is not None:
1424  _spawn_delivery(state, (callback,))
1425  state.try_to_connect |= bool(try_to_connect)
1426  state.callbacks_and_connectivities.append(
1427  [callback, state.connectivity])
1428  else:
1429  state.try_to_connect |= bool(try_to_connect)
1430  state.callbacks_and_connectivities.append([callback, None])
1431 
1432 
1433 def _unsubscribe(state, callback):
1434  with state.lock:
1435  for index, (subscribed_callback, unused_connectivity) in enumerate(
1436  state.callbacks_and_connectivities):
1437  if callback == subscribed_callback:
1438  state.callbacks_and_connectivities.pop(index)
1439  break
1440 
1441 
1442 def _augment_options(base_options, compression):
1443  compression_option = _compression.create_channel_option(compression)
1444  return tuple(base_options) + compression_option + ((
1445  cygrpc.ChannelArgKey.primary_user_agent_string,
1446  _USER_AGENT,
1447  ),)
1448 
1449 
1451  """Separates core channel options from Python channel options."""
1452  core_options = []
1453  python_options = []
1454  for pair in options:
1455  if pair[0] == grpc.experimental.ChannelOptions.SingleThreadedUnaryStream:
1456  python_options.append(pair)
1457  else:
1458  core_options.append(pair)
1459  return python_options, core_options
1460 
1461 
1463  """A cygrpc.Channel-backed implementation of grpc.Channel."""
1464 
1465  def __init__(self, target, options, credentials, compression):
1466  """Constructor.
1467 
1468  Args:
1469  target: The target to which to connect.
1470  options: Configuration options for the channel.
1471  credentials: A cygrpc.ChannelCredentials or None.
1472  compression: An optional value indicating the compression method to be
1473  used over the lifetime of the channel.
1474  """
1475  python_options, core_options = _separate_channel_options(options)
1476  self._single_threaded_unary_stream = _DEFAULT_SINGLE_THREADED_UNARY_STREAM
1477  self._process_python_options(python_options)
1478  self._channel = cygrpc.Channel(
1479  _common.encode(target), _augment_options(core_options, compression),
1480  credentials)
1483  cygrpc.fork_register_channel(self)
1484  if cygrpc.g_gevent_activated:
1485  cygrpc.gevent_increment_channel_count()
1486 
1487  def _process_python_options(self, python_options):
1488  """Sets channel attributes according to python-only channel options."""
1489  for pair in python_options:
1490  if pair[0] == grpc.experimental.ChannelOptions.SingleThreadedUnaryStream:
1491  self._single_threaded_unary_stream = True
1492 
1493  def subscribe(self, callback, try_to_connect=None):
1494  _subscribe(self._connectivity_state, callback, try_to_connect)
1495 
1496  def unsubscribe(self, callback):
1497  _unsubscribe(self._connectivity_state, callback)
1498 
1499  def unary_unary(self,
1500  method,
1501  request_serializer=None,
1502  response_deserializer=None):
1503  return _UnaryUnaryMultiCallable(
1505  _common.encode(method), request_serializer, response_deserializer)
1506 
1507  def unary_stream(self,
1508  method,
1509  request_serializer=None,
1510  response_deserializer=None):
1511  # NOTE(rbellevi): Benchmarks have shown that running a unary-stream RPC
1512  # on a single Python thread results in an appreciable speed-up. However,
1513  # due to slight differences in capability, the multi-threaded variant
1514  # remains the default.
1517  self._channel, _common.encode(method), request_serializer,
1518  response_deserializer)
1519  else:
1521  self._channel,
1523  _common.encode(method), request_serializer,
1524  response_deserializer)
1525 
1526  def stream_unary(self,
1527  method,
1528  request_serializer=None,
1529  response_deserializer=None):
1532  _common.encode(method), request_serializer, response_deserializer)
1533 
1534  def stream_stream(self,
1535  method,
1536  request_serializer=None,
1537  response_deserializer=None):
1540  _common.encode(method), request_serializer, response_deserializer)
1541 
1542  def _unsubscribe_all(self):
1543  state = self._connectivity_state
1544  if state:
1545  with state.lock:
1546  del state.callbacks_and_connectivities[:]
1547 
1548  def _close(self):
1549  self._unsubscribe_all()
1550  self._channel.close(cygrpc.StatusCode.cancelled, 'Channel closed!')
1551  cygrpc.fork_unregister_channel(self)
1552  if cygrpc.g_gevent_activated:
1553  cygrpc.gevent_decrement_channel_count()
1554 
1555  def _close_on_fork(self):
1556  self._unsubscribe_all()
1557  self._channel.close_on_fork(cygrpc.StatusCode.cancelled,
1558  'Channel closed due to fork')
1559 
1560  def __enter__(self):
1561  return self
1562 
1563  def __exit__(self, exc_type, exc_val, exc_tb):
1564  self._close()
1565  return False
1566 
1567  def close(self):
1568  self._close()
1569 
1570  def __del__(self):
1571  # TODO(https://github.com/grpc/grpc/issues/12531): Several releases
1572  # after 1.12 (1.16 or thereabouts?) add a "self._channel.close" call
1573  # here (or more likely, call self._close() here). We don't do this today
1574  # because many valid use cases today allow the channel to be deleted
1575  # immediately after stubs are created. After a sufficient period of time
1576  # has passed for all users to be trusted to freeze out to their channels
1577  # for as long as they are in use and to close them after using them,
1578  # then deletion of this grpc._channel.Channel instance can be made to
1579  # effect closure of the underlying cygrpc.Channel instance.
1580  try:
1581  self._unsubscribe_all()
1582  except: # pylint: disable=bare-except
1583  # Exceptions in __del__ are ignored by Python anyway, but they can
1584  # keep spamming logs. Just silence them.
1585  pass
grpc._channel.Channel.close
def close(self)
Definition: grpc/_channel.py:1567
grpc._channel._RPCState.__init__
def __init__(self, due, initial_metadata, trailing_metadata, code, details)
Definition: grpc/_channel.py:95
grpc.Call
Definition: src/python/grpcio/grpc/__init__.py:359
grpc._channel.Channel._connectivity_state
_connectivity_state
Definition: grpc/_channel.py:1482
grpc._channel._InactiveRpcError._repr
def _repr(self)
Definition: grpc/_channel.py:316
grpc._channel.Channel.stream_unary
def stream_unary(self, method, request_serializer=None, response_deserializer=None)
Definition: grpc/_channel.py:1526
grpc._channel._InactiveRpcError.cancel
def cancel(self)
Definition: grpc/_channel.py:325
grpc._channel._separate_channel_options
def _separate_channel_options(options)
Definition: grpc/_channel.py:1450
grpc._channel._ChannelConnectivityState.polling
polling
Definition: grpc/_channel.py:1319
http2_test_server.format
format
Definition: http2_test_server.py:118
grpc._channel._SingleThreadedRendezvous.result
def result(self, timeout=None)
Definition: grpc/_channel.py:484
grpc._channel._SingleThreadedRendezvous._is_complete
def _is_complete(self)
Definition: grpc/_channel.py:469
grpc._channel._ChannelConnectivityState.try_to_connect
try_to_connect
Definition: grpc/_channel.py:1321
grpc._channel._RPCState.fork_epoch
fork_epoch
Definition: grpc/_channel.py:120
grpc._channel.Channel._close
def _close(self)
Definition: grpc/_channel.py:1548
grpc._channel._augment_options
def _augment_options(base_options, compression)
Definition: grpc/_channel.py:1442
grpc._channel._InactiveRpcError.exception
def exception(self, timeout=None)
Definition: grpc/_channel.py:345
grpc._channel._Rendezvous
Definition: grpc/_channel.py:361
grpc._channel._InactiveRpcError._state
_state
Definition: grpc/_channel.py:295
grpc._channel._InactiveRpcError.code
def code(self)
Definition: grpc/_channel.py:307
grpc._channel._Rendezvous.next
def next(self)
Definition: grpc/_channel.py:422
grpc._channel._InactiveRpcError.traceback
def traceback(self, timeout=None)
Definition: grpc/_channel.py:349
grpc._channel._SingleThreadedRendezvous._next
def _next(self)
Definition: grpc/_channel.py:619
grpc._channel._InactiveRpcError.__init__
def __init__(self, state)
Definition: grpc/_channel.py:293
grpc._channel._SingleThreadedUnaryStreamMultiCallable.__call__
def __call__(self, request, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None)
Definition: grpc/_channel.py:993
grpc._channel._UnaryUnaryMultiCallable._blocking
def _blocking(self, request, timeout, metadata, credentials, wait_for_ready, compression)
Definition: grpc/_channel.py:919
grpc._channel._ChannelCallState.reset_postfork_child
def reset_postfork_child(self)
Definition: grpc/_channel.py:1242
grpc._channel._UnaryUnaryMultiCallable._channel
_channel
Definition: grpc/_channel.py:889
grpc._channel._MultiThreadedRendezvous.code
def code(self)
Definition: grpc/_channel.py:683
grpc._channel._run_channel_spin_thread
def _run_channel_spin_thread(state)
Definition: grpc/_channel.py:1253
grpc._channel._SingleThreadedRendezvous.cancelled
def cancelled(self)
Definition: grpc/_channel.py:472
grpc._channel._poll_connectivity
def _poll_connectivity(state, channel, initial_try_to_connect)
Definition: grpc/_channel.py:1376
grpc._channel._SingleThreadedRendezvous.exception
def exception(self, timeout=None)
Definition: grpc/_channel.py:506
grpc.FutureCancelledError
Definition: src/python/grpcio/grpc/__init__.py:44
grpc._channel._StreamUnaryMultiCallable._blocking
def _blocking(self, request_iterator, timeout, metadata, credentials, wait_for_ready, compression)
Definition: grpc/_channel.py:1097
grpc._channel._Rendezvous.__repr__
def __repr__(self)
Definition: grpc/_channel.py:437
grpc._channel._UnaryStreamMultiCallable
Definition: grpc/_channel.py:1032
grpc._channel._ChannelCallState.channel
channel
Definition: grpc/_channel.py:1238
grpc._channel._Rendezvous.__iter__
def __iter__(self)
Definition: grpc/_channel.py:419
grpc._channel.Channel._process_python_options
def _process_python_options(self, python_options)
Definition: grpc/_channel.py:1487
grpc._channel._StreamStreamMultiCallable._method
_method
Definition: grpc/_channel.py:1177
grpc._channel._deliver
def _deliver(state, initial_connectivity, initial_callbacks)
Definition: grpc/_channel.py:1343
grpc._channel._UnaryUnaryMultiCallable._response_deserializer
_response_deserializer
Definition: grpc/_channel.py:893
grpc._channel._Rendezvous._next
def _next(self)
Definition: grpc/_channel.py:428
grpc._channel._event_handler
def _event_handler(state, response_deserializer)
Definition: grpc/_channel.py:170
grpc._channel._SingleThreadedUnaryStreamMultiCallable._context
_context
Definition: grpc/_channel.py:990
grpc._channel._InactiveRpcError.initial_metadata
def initial_metadata(self)
Definition: grpc/_channel.py:301
grpc.RpcError
Definition: src/python/grpcio/grpc/__init__.py:302
grpc._channel._RPCState.details
details
Definition: grpc/_channel.py:111
grpc._channel._SingleThreadedRendezvous._next_response
def _next_response(self)
Definition: grpc/_channel.py:605
grpc._channel._UnaryUnaryMultiCallable
Definition: grpc/_channel.py:885
grpc._channel._InactiveRpcError.debug_error_string
def debug_error_string(self)
Definition: grpc/_channel.py:313
grpc._channel._UnaryStreamMultiCallable._managed_call
_managed_call
Definition: grpc/_channel.py:1037
grpc._channel._UnaryStreamMultiCallable._method
_method
Definition: grpc/_channel.py:1038
grpc._channel.Channel.__init__
def __init__(self, target, options, credentials, compression)
Definition: grpc/_channel.py:1465
grpc._channel._SingleThreadedRendezvous.traceback
def traceback(self, timeout=None)
Definition: grpc/_channel.py:528
grpc._channel._StreamStreamMultiCallable.__init__
def __init__(self, channel, managed_call, method, request_serializer, response_deserializer)
Definition: grpc/_channel.py:1174
grpc._channel._RPCState.condition
condition
Definition: grpc/_channel.py:98
grpc._channel._Rendezvous.__str__
def __str__(self)
Definition: grpc/_channel.py:440
grpc._channel.Channel.__del__
def __del__(self)
Definition: grpc/_channel.py:1570
grpc::Channel
Channels represent a connection to an endpoint. Created by CreateChannel.
Definition: include/grpcpp/channel.h:54
grpc._channel._InactiveRpcError.add_done_callback
def add_done_callback(self, fn, timeout=None)
Definition: grpc/_channel.py:356
grpc._channel._start_unary_request
def _start_unary_request(request, timeout, request_serializer)
Definition: grpc/_channel.py:829
grpc._channel._SingleThreadedRendezvous.initial_metadata
def initial_metadata(self)
Definition: grpc/_channel.py:561
grpc._channel._MultiThreadedRendezvous
Definition: grpc/_channel.py:652
grpc._channel._StreamUnaryMultiCallable
Definition: grpc/_channel.py:1085
grpc._channel._RPCState.reset_postfork_child
def reset_postfork_child(self)
Definition: grpc/_channel.py:122
grpc._channel._Rendezvous.__init__
def __init__(self, state, call, response_deserializer, deadline)
Definition: grpc/_channel.py:375
grpc._channel._UnaryUnaryMultiCallable._request_serializer
_request_serializer
Definition: grpc/_channel.py:892
grpc._channel._MultiThreadedRendezvous.result
def result(self, timeout=None)
Definition: grpc/_channel.py:727
grpc._channel._InactiveRpcError.details
def details(self)
Definition: grpc/_channel.py:310
grpc._channel._UnaryUnaryMultiCallable._context
_context
Definition: grpc/_channel.py:894
grpc._channel._StreamUnaryMultiCallable._managed_call
_managed_call
Definition: grpc/_channel.py:1090
generic_client_interceptor.create
def create(intercept_call)
Definition: generic_client_interceptor.py:55
xds_interop_client.int
int
Definition: xds_interop_client.py:113
grpc._channel._Rendezvous._response_deserializer
_response_deserializer
Definition: grpc/_channel.py:379
grpc._channel._StreamUnaryMultiCallable._method
_method
Definition: grpc/_channel.py:1091
grpc._channel._determine_deadline
def _determine_deadline(user_deadline)
Definition: grpc/_channel.py:873
generate-asm-lcov.fn
fn
Definition: generate-asm-lcov.py:146
grpc::experimental
Definition: include/grpcpp/channel.h:46
grpc._channel._InactiveRpcError.cancelled
def cancelled(self)
Definition: grpc/_channel.py:329
grpc._channel._ChannelConnectivityState.callbacks_and_connectivities
callbacks_and_connectivities
Definition: grpc/_channel.py:1322
grpc._channel._MultiThreadedRendezvous.cancelled
def cancelled(self)
Definition: grpc/_channel.py:712
max
int max
Definition: bloaty/third_party/zlib/examples/enough.c:170
grpc._channel._SingleThreadedRendezvous.trailing_metadata
def trailing_metadata(self)
Definition: grpc/_channel.py:570
grpc._channel._UnaryUnaryMultiCallable._managed_call
_managed_call
Definition: grpc/_channel.py:890
grpc._channel._StreamStreamMultiCallable._context
_context
Definition: grpc/_channel.py:1180
grpc._channel._Rendezvous.time_remaining
def time_remaining(self)
Definition: grpc/_channel.py:387
grpc._channel._SingleThreadedRendezvous.code
def code(self)
Definition: grpc/_channel.py:578
grpc._channel._UnaryStreamMultiCallable._request_serializer
_request_serializer
Definition: grpc/_channel.py:1039
grpc._channel._unsubscribe
def _unsubscribe(state, callback)
Definition: grpc/_channel.py:1433
grpc._channel._InactiveRpcError.__str__
def __str__(self)
Definition: grpc/_channel.py:322
grpc._channel._SingleThreadedUnaryStreamMultiCallable._method
_method
Definition: grpc/_channel.py:987
grpc._channel._MultiThreadedRendezvous._is_complete
def _is_complete(self)
Definition: grpc/_channel.py:724
grpc._channel._stream_unary_invocation_operationses_and_tags
def _stream_unary_invocation_operationses_and_tags(metadata, initial_metadata_flags)
Definition: grpc/_channel.py:864
grpc._channel._SingleThreadedUnaryStreamMultiCallable._response_deserializer
_response_deserializer
Definition: grpc/_channel.py:989
grpc._channel._MultiThreadedRendezvous.initial_metadata
def initial_metadata(self)
Definition: grpc/_channel.py:663
grpc._channel._SingleThreadedRendezvous.add_done_callback
def add_done_callback(self, fn)
Definition: grpc/_channel.py:553
grpc._channel._StreamUnaryMultiCallable._request_serializer
_request_serializer
Definition: grpc/_channel.py:1092
grpc._channel._InactiveRpcError.result
def result(self, timeout=None)
Definition: grpc/_channel.py:341
grpc._channel._Rendezvous.cancel
def cancel(self)
Definition: grpc/_channel.py:395
grpc._channel.Channel.unary_unary
def unary_unary(self, method, request_serializer=None, response_deserializer=None)
Definition: grpc/_channel.py:1499
grpc._channel._InactiveRpcError.running
def running(self)
Definition: grpc/_channel.py:333
close
#define close
Definition: test-fs.c:48
grpc.Future
Definition: src/python/grpcio/grpc/__init__.py:48
grpc._channel._SingleThreadedUnaryStreamMultiCallable._request_serializer
_request_serializer
Definition: grpc/_channel.py:988
grpc._channel._SingleThreadedUnaryStreamMultiCallable._channel
_channel
Definition: grpc/_channel.py:986
grpc._channel.Channel.__enter__
def __enter__(self)
Definition: grpc/_channel.py:1560
grpc::experimental.UsageError
Definition: src/python/grpcio/grpc/experimental/__init__.py:41
callback
static void callback(void *arg, int status, int timeouts, struct hostent *host)
Definition: acountry.c:224
grpc._channel.Channel._single_threaded_unary_stream
_single_threaded_unary_stream
Definition: grpc/_channel.py:1476
grpc._channel._RPCState.due
due
Definition: grpc/_channel.py:106
grpc._channel.Channel.subscribe
def subscribe(self, callback, try_to_connect=None)
Definition: grpc/_channel.py:1493
min
#define min(a, b)
Definition: qsort.h:83
grpc._channel._SingleThreadedRendezvous._consume_next_event
def _consume_next_event(self)
Definition: grpc/_channel.py:594
grpc._channel._ChannelConnectivityState.connectivity
connectivity
Definition: grpc/_channel.py:1320
grpc._channel.Channel._call_state
_call_state
Definition: grpc/_channel.py:1481
grpc._channel._InitialMetadataFlags.__new__
def __new__(cls, value=_EMPTY_FLAGS)
Definition: grpc/_channel.py:1219
grpc._channel._StreamStreamMultiCallable._request_serializer
_request_serializer
Definition: grpc/_channel.py:1178
grpc.UnaryUnaryMultiCallable
Definition: src/python/grpcio/grpc/__init__.py:663
grpc._channel._Rendezvous._call
_call
Definition: grpc/_channel.py:378
grpc._channel._Rendezvous.__del__
def __del__(self)
Definition: grpc/_channel.py:443
grpc._channel._MultiThreadedRendezvous.debug_error_string
def debug_error_string(self)
Definition: grpc/_channel.py:703
grpc._channel._MultiThreadedRendezvous.trailing_metadata
def trailing_metadata(self)
Definition: grpc/_channel.py:673
grpc._channel._SingleThreadedRendezvous.debug_error_string
def debug_error_string(self)
Definition: grpc/_channel.py:644
grpc._channel._InactiveRpcError
Definition: grpc/_channel.py:283
grpc._channel._channel_managed_call_management
def _channel_managed_call_management(state)
Definition: grpc/_channel.py:1273
grpc._channel._unknown_code_details
def _unknown_code_details(unknown_cygrpc_code, details)
Definition: grpc/_channel.py:88
grpc._channel._deliveries
def _deliveries(state)
Definition: grpc/_channel.py:1333
grpc._channel._UnaryUnaryMultiCallable._method
_method
Definition: grpc/_channel.py:891
grpc._channel._InactiveRpcError.done
def done(self)
Definition: grpc/_channel.py:337
grpc._channel._RPCState.cancelled
cancelled
Definition: grpc/_channel.py:118
grpc._channel._StreamStreamMultiCallable._channel
_channel
Definition: grpc/_channel.py:1175
grpc._channel._UnaryStreamMultiCallable.__init__
def __init__(self, channel, managed_call, method, request_serializer, response_deserializer)
Definition: grpc/_channel.py:1035
grpc._channel.Channel.stream_stream
def stream_stream(self, method, request_serializer=None, response_deserializer=None)
Definition: grpc/_channel.py:1534
grpc._channel._MultiThreadedRendezvous.exception
def exception(self, timeout=None)
Definition: grpc/_channel.py:746
grpc._channel._ChannelCallState.managed_calls
managed_calls
Definition: grpc/_channel.py:1239
grpc._channel._handle_event
def _handle_event(event, state, response_deserializer)
Definition: grpc/_channel.py:135
grpc._channel._Rendezvous._state
_state
Definition: grpc/_channel.py:377
grpc._channel._MultiThreadedRendezvous.running
def running(self)
Definition: grpc/_channel.py:716
grpc._channel._ChannelConnectivityState
Definition: grpc/_channel.py:1314
grpc_testing._channel._invocation._done
def _done(handler)
Definition: _invocation.py:156
grpc._channel._RPCState.callbacks
callbacks
Definition: grpc/_channel.py:119
grpc._channel._MultiThreadedRendezvous.traceback
def traceback(self, timeout=None)
Definition: grpc/_channel.py:765
grpc._channel._SingleThreadedRendezvous.details
def details(self)
Definition: grpc/_channel.py:586
grpc._channel._SingleThreadedUnaryStreamMultiCallable.__init__
def __init__(self, channel, method, request_serializer, response_deserializer)
Definition: grpc/_channel.py:985
grpc._channel._rpc_state_string
def _rpc_state_string(class_name, rpc_state)
Definition: grpc/_channel.py:269
grpc._channel._StreamUnaryMultiCallable.with_call
def with_call(self, request_iterator, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None)
Definition: grpc/_channel.py:1133
grpc._channel._SingleThreadedRendezvous.running
def running(self)
Definition: grpc/_channel.py:476
grpc._channel._StreamUnaryMultiCallable._context
_context
Definition: grpc/_channel.py:1094
grpc._channel._spawn_delivery
def _spawn_delivery(state, callbacks)
Definition: grpc/_channel.py:1363
grpc._channel.Channel.unary_stream
def unary_stream(self, method, request_serializer=None, response_deserializer=None)
Definition: grpc/_channel.py:1507
grpc._channel._InactiveRpcError.trailing_metadata
def trailing_metadata(self)
Definition: grpc/_channel.py:304
grpc._channel._RPCState.trailing_metadata
trailing_metadata
Definition: grpc/_channel.py:109
grpc.FutureTimeoutError
Future Interface ###############################.
Definition: src/python/grpcio/grpc/__init__.py:40
grpc._channel._UnaryUnaryMultiCallable.future
def future(self, request, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None)
Definition: grpc/_channel.py:959
grpc._channel._MultiThreadedRendezvous.done
def done(self)
Definition: grpc/_channel.py:720
grpc._channel._UnaryUnaryMultiCallable.__init__
def __init__(self, channel, managed_call, method, request_serializer, response_deserializer)
Definition: grpc/_channel.py:888
grpc._channel._MultiThreadedRendezvous.details
def details(self)
Definition: grpc/_channel.py:693
grpc._channel._ChannelConnectivityState.__init__
def __init__(self, channel)
Definition: grpc/_channel.py:1316
grpc._channel._StreamUnaryMultiCallable.future
def future(self, request_iterator, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None)
Definition: grpc/_channel.py:1144
grpc._channel._ChannelConnectivityState.delivering
delivering
Definition: grpc/_channel.py:1323
grpc._channel._SingleThreadedRendezvous.done
def done(self)
Definition: grpc/_channel.py:480
grpc._channel._UnaryUnaryMultiCallable._prepare
def _prepare(self, request, timeout, metadata, wait_for_ready, compression)
Definition: grpc/_channel.py:897
grpc._channel._StreamUnaryMultiCallable.__init__
def __init__(self, channel, managed_call, method, request_serializer, response_deserializer)
Definition: grpc/_channel.py:1088
grpc._channel._end_unary_response_blocking
def _end_unary_response_blocking(state, call, with_call, deadline)
Definition: grpc/_channel.py:841
next
AllocList * next[kMaxLevel]
Definition: abseil-cpp/absl/base/internal/low_level_alloc.cc:100
grpc._channel._subscribe
def _subscribe(state, callback, try_to_connect)
Definition: grpc/_channel.py:1413
grpc._channel._InactiveRpcError.__repr__
def __repr__(self)
Definition: grpc/_channel.py:319
grpc._channel._StreamUnaryMultiCallable._channel
_channel
Definition: grpc/_channel.py:1089
grpc._channel._UnaryStreamMultiCallable._response_deserializer
_response_deserializer
Definition: grpc/_channel.py:1040
cpp.gmock_class.set
set
Definition: bloaty/third_party/googletest/googlemock/scripts/generator/cpp/gmock_class.py:44
grpc._channel._UnaryStreamMultiCallable._channel
_channel
Definition: grpc/_channel.py:1036
grpc._channel._MultiThreadedRendezvous._next
def _next(self)
Definition: grpc/_channel.py:795
grpc._channel._UnaryUnaryMultiCallable.with_call
def with_call(self, request, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None)
Definition: grpc/_channel.py:948
grpc._channel._Rendezvous.debug_error_string
def debug_error_string(self)
Definition: grpc/_channel.py:431
grpc._channel.Channel._close_on_fork
def _close_on_fork(self)
Definition: grpc/_channel.py:1555
grpc._channel._StreamUnaryMultiCallable._response_deserializer
_response_deserializer
Definition: grpc/_channel.py:1093
grpc._channel._RPCState.response
response
Definition: grpc/_channel.py:108
grpc._channel._StreamStreamMultiCallable
Definition: grpc/_channel.py:1171
grpc._channel._RPCState
Definition: grpc/_channel.py:93
grpc._channel._ChannelConnectivityState.reset_postfork_child
def reset_postfork_child(self)
Definition: grpc/_channel.py:1325
grpc._channel._Rendezvous._repr
def _repr(self)
Definition: grpc/_channel.py:434
grpc._channel._ChannelConnectivityState.lock
lock
Definition: grpc/_channel.py:1317
grpc._channel._StreamUnaryMultiCallable.__call__
def __call__(self, request_iterator, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None)
Definition: grpc/_channel.py:1122
grpc._channel.Channel.__exit__
def __exit__(self, exc_type, exc_val, exc_tb)
Definition: grpc/_channel.py:1563
grpc._cython
Definition: src/python/grpcio/grpc/_cython/__init__.py:1
grpc._channel._InitialMetadataFlags.with_wait_for_ready
def with_wait_for_ready(self, wait_for_ready)
Definition: grpc/_channel.py:1223
grpc._channel.Channel._channel
_channel
Definition: grpc/_channel.py:1478
grpc._channel._Rendezvous.is_active
def is_active(self)
Definition: grpc/_channel.py:382
grpc._channel._StreamStreamMultiCallable._response_deserializer
_response_deserializer
Definition: grpc/_channel.py:1179
grpc._channel._ChannelCallState.__init__
def __init__(self, channel)
Definition: grpc/_channel.py:1236
grpc.StreamUnaryMultiCallable
Definition: src/python/grpcio/grpc/__init__.py:800
grpc._channel.Channel._unsubscribe_all
def _unsubscribe_all(self)
Definition: grpc/_channel.py:1542
grpc._channel._ChannelCallState
Definition: grpc/_channel.py:1234
grpc._channel._RPCState.debug_error_string
debug_error_string
Definition: grpc/_channel.py:112
grpc._channel._UnaryStreamMultiCallable._context
_context
Definition: grpc/_channel.py:1041
grpc._channel._Rendezvous._deadline
_deadline
Definition: grpc/_channel.py:380
grpc._channel._SingleThreadedUnaryStreamMultiCallable
Definition: grpc/_channel.py:982
grpc._channel._Rendezvous.add_callback
def add_callback(self, callback)
Definition: grpc/_channel.py:410
grpc._channel._InitialMetadataFlags
Definition: grpc/_channel.py:1216
grpc._channel._RPCState.code
code
Definition: grpc/_channel.py:110
grpc._channel._stream_unary_invocation_operationses
def _stream_unary_invocation_operationses(metadata, initial_metadata_flags)
Definition: grpc/_channel.py:852
grpc._channel._SingleThreadedRendezvous
Definition: grpc/_channel.py:455
grpc._channel._Rendezvous.__next__
def __next__(self)
Definition: grpc/_channel.py:425
grpc._channel._ChannelCallState.lock
lock
Definition: grpc/_channel.py:1237
grpc._channel.Channel
Definition: grpc/_channel.py:1462
grpc._channel._ChannelConnectivityState.channel
channel
Definition: grpc/_channel.py:1318
grpc._channel._consume_request_iterator
def _consume_request_iterator(request_iterator, state, call, request_serializer, event_handler)
Definition: grpc/_channel.py:191
grpc._channel.Channel.unsubscribe
def unsubscribe(self, callback)
Definition: grpc/_channel.py:1496
grpc._channel._RPCState.initial_metadata
initial_metadata
Definition: grpc/_channel.py:107
grpc._channel._StreamStreamMultiCallable.__call__
def __call__(self, request_iterator, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None)
Definition: grpc/_channel.py:1183
grpc._channel._MultiThreadedRendezvous.add_done_callback
def add_done_callback(self, fn)
Definition: grpc/_channel.py:787
grpc.UnaryStreamMultiCallable
Definition: src/python/grpcio/grpc/__init__.py:765
grpc._channel._deadline
def _deadline(timeout)
Definition: grpc/_channel.py:84
grpc._channel._ChannelCallState.threading
threading
Definition: grpc/_channel.py:1240
grpc._channel._ChannelCallState.__del__
def __del__(self)
Definition: grpc/_channel.py:1245
grpc._channel._abort
def _abort(state, code, details)
Definition: grpc/_channel.py:126
grpc.RpcContext
Definition: src/python/grpcio/grpc/__init__.py:309
grpc._channel._UnaryUnaryMultiCallable.__call__
def __call__(self, request, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None)
Definition: grpc/_channel.py:937
grpc._channel._UnaryStreamMultiCallable.__call__
def __call__(self, request, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None)
Definition: grpc/_channel.py:1044
grpc._channel._StreamStreamMultiCallable._managed_call
_managed_call
Definition: grpc/_channel.py:1176
grpc.StreamStreamMultiCallable
Definition: src/python/grpcio/grpc/__init__.py:904


grpc
Author(s):
autogenerated on Fri May 16 2025 02:57:38