14 """Invocation-side implementation of gRPC Python.""" 
   25 from grpc 
import _common
 
   26 from grpc 
import _compression
 
   27 from grpc 
import _grpcio_metadata
 
   31 _LOGGER = logging.getLogger(__name__)
 
   33 _USER_AGENT = 
'grpc-python/{}'.
format(_grpcio_metadata.__version__)
 
   39 _DEFAULT_SINGLE_THREADED_UNARY_STREAM = os.getenv(
 
   40     "GRPC_SINGLE_THREADED_UNARY_STREAM") 
is not None 
   42 _UNARY_UNARY_INITIAL_DUE = (
 
   43     cygrpc.OperationType.send_initial_metadata,
 
   44     cygrpc.OperationType.send_message,
 
   45     cygrpc.OperationType.send_close_from_client,
 
   46     cygrpc.OperationType.receive_initial_metadata,
 
   47     cygrpc.OperationType.receive_message,
 
   48     cygrpc.OperationType.receive_status_on_client,
 
   50 _UNARY_STREAM_INITIAL_DUE = (
 
   51     cygrpc.OperationType.send_initial_metadata,
 
   52     cygrpc.OperationType.send_message,
 
   53     cygrpc.OperationType.send_close_from_client,
 
   54     cygrpc.OperationType.receive_initial_metadata,
 
   55     cygrpc.OperationType.receive_status_on_client,
 
   57 _STREAM_UNARY_INITIAL_DUE = (
 
   58     cygrpc.OperationType.send_initial_metadata,
 
   59     cygrpc.OperationType.receive_initial_metadata,
 
   60     cygrpc.OperationType.receive_message,
 
   61     cygrpc.OperationType.receive_status_on_client,
 
   63 _STREAM_STREAM_INITIAL_DUE = (
 
   64     cygrpc.OperationType.send_initial_metadata,
 
   65     cygrpc.OperationType.receive_initial_metadata,
 
   66     cygrpc.OperationType.receive_status_on_client,
 
   69 _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = (
 
   70     'Exception calling channel subscription callback!')
 
   72 _OK_RENDEZVOUS_REPR_FORMAT = (
'<{} of RPC that terminated with:\n' 
   77 _NON_OK_RENDEZVOUS_REPR_FORMAT = (
'<{} of RPC that terminated with:\n' 
   80                                   '\tdebug_error_string = "{}"\n' 
   85     return None if timeout 
is None else time.time() + timeout
 
   89     return 'Server sent unknown code {} and details "{}"'.
format(
 
   90         unknown_cygrpc_code, details)
 
   95     def __init__(self, due, initial_metadata, trailing_metadata, code, details):
 
  127     if state.code 
is None:
 
  129         state.details = details
 
  130         if state.initial_metadata 
is None:
 
  131             state.initial_metadata = ()
 
  132         state.trailing_metadata = ()
 
  137     for batch_operation 
in event.batch_operations:
 
  138         operation_type = batch_operation.type()
 
  139         state.due.remove(operation_type)
 
  140         if operation_type == cygrpc.OperationType.receive_initial_metadata:
 
  141             state.initial_metadata = batch_operation.initial_metadata()
 
  142         elif operation_type == cygrpc.OperationType.receive_message:
 
  143             serialized_response = batch_operation.message()
 
  144             if serialized_response 
is not None:
 
  145                 response = _common.deserialize(serialized_response,
 
  146                                                response_deserializer)
 
  148                     details = 
'Exception deserializing response!' 
  149                     _abort(state, grpc.StatusCode.INTERNAL, details)
 
  151                     state.response = response
 
  152         elif operation_type == cygrpc.OperationType.receive_status_on_client:
 
  153             state.trailing_metadata = batch_operation.trailing_metadata()
 
  154             if state.code 
is None:
 
  155                 code = _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE.get(
 
  156                     batch_operation.code())
 
  158                     state.code = grpc.StatusCode.UNKNOWN
 
  160                         code, batch_operation.details())
 
  163                     state.details = batch_operation.details()
 
  164                     state.debug_error_string = batch_operation.error_string()
 
  165             callbacks.extend(state.callbacks)
 
  166             state.callbacks = 
None 
  172     def handle_event(event):
 
  173         with state.condition:
 
  174             callbacks = 
_handle_event(event, state, response_deserializer)
 
  175             state.condition.notify_all()
 
  177         for callback 
in callbacks:
 
  180             except Exception 
as e:  
 
  183                 logging.error(
'Exception in callback %s: %s',
 
  184                               repr(callback.func), repr(e))
 
  185         return done 
and state.fork_epoch >= cygrpc.get_fork_epoch()
 
  193     """Consume a request iterator supplied by the user.""" 
  195     def consume_request_iterator():  
 
  199             return_from_user_request_generator_invoked = 
False 
  202                 cygrpc.enter_user_request_generator()
 
  203                 request = 
next(request_iterator)
 
  204             except StopIteration:
 
  207                 cygrpc.return_from_user_request_generator()
 
  208                 return_from_user_request_generator_invoked = 
True 
  209                 code = grpc.StatusCode.UNKNOWN
 
  210                 details = 
'Exception iterating requests!' 
  211                 _LOGGER.exception(details)
 
  212                 call.cancel(_common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code],
 
  214                 _abort(state, code, details)
 
  217                 if not return_from_user_request_generator_invoked:
 
  218                     cygrpc.return_from_user_request_generator()
 
  219             serialized_request = _common.serialize(request, request_serializer)
 
  220             with state.condition:
 
  221                 if state.code 
is None and not state.cancelled:
 
  222                     if serialized_request 
is None:
 
  223                         code = grpc.StatusCode.INTERNAL
 
  224                         details = 
'Exception serializing request!' 
  226                             _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code],
 
  228                         _abort(state, code, details)
 
  231                         state.due.add(cygrpc.OperationType.send_message)
 
  232                         operations = (cygrpc.SendMessageOperation(
 
  233                             serialized_request, _EMPTY_FLAGS),)
 
  234                         operating = call.operate(operations, event_handler)
 
  236                             state.due.remove(cygrpc.OperationType.send_message)
 
  240                             return (state.code 
is not None or 
  241                                     cygrpc.OperationType.send_message
 
  244                         _common.wait(state.condition.wait,
 
  246                                      spin_cb=functools.partial(
 
  247                                          cygrpc.block_if_fork_in_progress,
 
  249                         if state.code 
is not None:
 
  253         with state.condition:
 
  254             if state.code 
is None:
 
  255                 state.due.add(cygrpc.OperationType.send_close_from_client)
 
  257                     cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),)
 
  258                 operating = call.operate(operations, event_handler)
 
  261                         cygrpc.OperationType.send_close_from_client)
 
  263     consumption_thread = cygrpc.ForkManagedThread(
 
  264         target=consume_request_iterator)
 
  265     consumption_thread.setDaemon(
True)
 
  266     consumption_thread.start()
 
  270     """Calculates error string for RPC.""" 
  271     with rpc_state.condition:
 
  272         if rpc_state.code 
is None:
 
  273             return '<{} object>'.
format(class_name)
 
  274         elif rpc_state.code 
is grpc.StatusCode.OK:
 
  275             return _OK_RENDEZVOUS_REPR_FORMAT.format(class_name, rpc_state.code,
 
  278             return _NON_OK_RENDEZVOUS_REPR_FORMAT.format(
 
  279                 class_name, rpc_state.code, rpc_state.details,
 
  280                 rpc_state.debug_error_string)
 
  284     """An RPC error not tied to the execution of a particular RPC. 
  286     The RPC represented by the state object must not be in-progress or 
  290       _state: An instance of _RPCState. 
  294         with state.condition:
 
  296                                     copy.deepcopy(state.trailing_metadata),
 
  297                                     state.code, copy.deepcopy(state.details))
 
  298             self.
_state.response = copy.copy(state.response)
 
  299             self.
_state.debug_error_string = copy.copy(state.debug_error_string)
 
  302         return self.
_state.initial_metadata
 
  305         return self.
_state.trailing_metadata
 
  311         return _common.decode(self.
_state.details)
 
  314         return _common.decode(self.
_state.debug_error_string)
 
  326         """See grpc.Future.cancel.""" 
  330         """See grpc.Future.cancelled.""" 
  334         """See grpc.Future.running.""" 
  338         """See grpc.Future.done.""" 
  342         """See grpc.Future.result.""" 
  346         """See grpc.Future.exception.""" 
  350         """See grpc.Future.traceback.""" 
  354             return sys.exc_info()[2]
 
  357         """See grpc.Future.add_done_callback.""" 
  365       _state: An instance of _RPCState. 
  366       _call: An instance of SegregatedCall or IntegratedCall. 
  367         In either case, the _call object is expected to have operate, cancel, 
  368         and next_event methods. 
  369       _response_deserializer: A callable taking bytes and return a Python 
  371       _deadline: A float representing the deadline of the RPC in seconds. Or 
  372         possibly None, to represent an RPC with no deadline at all. 
  375     def __init__(self, state, call, response_deserializer, deadline):
 
  383         """See grpc.RpcContext.is_active""" 
  384         with self.
_state.condition:
 
  385             return self.
_state.code 
is None 
  388         """See grpc.RpcContext.time_remaining""" 
  389         with self.
_state.condition:
 
  396         """See grpc.RpcContext.cancel""" 
  397         with self.
_state.condition:
 
  398             if self.
_state.code 
is None:
 
  399                 code = grpc.StatusCode.CANCELLED
 
  400                 details = 
'Locally cancelled by application!' 
  402                     _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details)
 
  403                 self.
_state.cancelled = 
True 
  405                 self.
_state.condition.notify_all()
 
  411         """See grpc.RpcContext.add_callback""" 
  412         with self.
_state.condition:
 
  413             if self.
_state.callbacks 
is None:
 
  416                 self.
_state.callbacks.append(callback)
 
  429         raise NotImplementedError()
 
  432         raise NotImplementedError()
 
  444         with self.
_state.condition:
 
  445             if self.
_state.code 
is None:
 
  446                 self.
_state.code = grpc.StatusCode.CANCELLED
 
  447                 self.
_state.details = 
'Cancelled upon garbage collection!' 
  448                 self.
_state.cancelled = 
True 
  450                     _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[self.
_state.code],
 
  452                 self.
_state.condition.notify_all()
 
  456     """An RPC iterator operating entirely on a single thread. 
  458     The __next__ method of _SingleThreadedRendezvous does not depend on the 
  459     existence of any other thread, including the "channel spin thread". 
  460     However, this means that its interface is entirely synchronous. So this 
  461     class cannot completely fulfill the grpc.Future interface. The result, 
  462     exception, and traceback methods will never block and will instead raise 
  463     an exception if calling the method would result in blocking. 
  465     This means that these methods are safe to call from add_done_callback 
  470         return self.
_state.code 
is not None 
  473         with self.
_state.condition:
 
  474             return self.
_state.cancelled
 
  477         with self.
_state.condition:
 
  478             return self.
_state.code 
is None 
  481         with self.
_state.condition:
 
  482             return self.
_state.code 
is not None 
  485         """Returns the result of the computation or raises its exception. 
  487         This method will never block. Instead, it will raise an exception 
  488         if calling this method would otherwise result in blocking. 
  490         Since this method will never block, any `timeout` argument passed will 
  494         with self.
_state.condition:
 
  497                     "_SingleThreadedRendezvous only supports result() when the RPC is complete." 
  499             if self.
_state.code 
is grpc.StatusCode.OK:
 
  500                 return self.
_state.response
 
  501             elif self.
_state.cancelled:
 
  507         """Return the exception raised by the computation. 
  509         This method will never block. Instead, it will raise an exception 
  510         if calling this method would otherwise result in blocking. 
  512         Since this method will never block, any `timeout` argument passed will 
  516         with self.
_state.condition:
 
  519                     "_SingleThreadedRendezvous only supports exception() when the RPC is complete." 
  521             if self.
_state.code 
is grpc.StatusCode.OK:
 
  523             elif self.
_state.cancelled:
 
  529         """Access the traceback of the exception raised by the computation. 
  531         This method will never block. Instead, it will raise an exception 
  532         if calling this method would otherwise result in blocking. 
  534         Since this method will never block, any `timeout` argument passed will 
  538         with self.
_state.condition:
 
  541                     "_SingleThreadedRendezvous only supports traceback() when the RPC is complete." 
  543             if self.
_state.code 
is grpc.StatusCode.OK:
 
  545             elif self.
_state.cancelled:
 
  551                     return sys.exc_info()[2]
 
  554         with self.
_state.condition:
 
  555             if self.
_state.code 
is None:
 
  556                 self.
_state.callbacks.append(functools.partial(fn, self))
 
  562         """See grpc.Call.initial_metadata""" 
  563         with self.
_state.condition:
 
  566             while self.
_state.initial_metadata 
is None:
 
  568             return self.
_state.initial_metadata
 
  571         """See grpc.Call.trailing_metadata""" 
  572         with self.
_state.condition:
 
  573             if self.
_state.trailing_metadata 
is None:
 
  575                     "Cannot get trailing metadata until RPC is completed.")
 
  576             return self.
_state.trailing_metadata
 
  579         """See grpc.Call.code""" 
  580         with self.
_state.condition:
 
  581             if self.
_state.code 
is None:
 
  583                     "Cannot get code until RPC is completed.")
 
  587         """See grpc.Call.details""" 
  588         with self.
_state.condition:
 
  589             if self.
_state.details 
is None:
 
  591                     "Cannot get details until RPC is completed.")
 
  592             return _common.decode(self.
_state.details)
 
  595         event = self.
_call.next_event()
 
  596         with self.
_state.condition:
 
  599             for callback 
in callbacks:
 
  608             with self.
_state.condition:
 
  609                 if self.
_state.response 
is not None:
 
  610                     response = self.
_state.response
 
  611                     self.
_state.response = 
None 
  613                 elif cygrpc.OperationType.receive_message 
not in self.
_state.due:
 
  614                     if self.
_state.code 
is grpc.StatusCode.OK:
 
  615                         raise StopIteration()
 
  616                     elif self.
_state.code 
is not None:
 
  620         with self.
_state.condition:
 
  621             if self.
_state.code 
is None:
 
  633                 self.
_state.due.add(cygrpc.OperationType.receive_message)
 
  634                 operating = self.
_call.operate(
 
  635                     (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), 
None)
 
  637                     self.
_state.due.remove(cygrpc.OperationType.receive_message)
 
  638             elif self.
_state.code 
is grpc.StatusCode.OK:
 
  639                 raise StopIteration()
 
  645         with self.
_state.condition:
 
  646             if self.
_state.debug_error_string 
is None:
 
  648                     "Cannot get debug error string until RPC is completed.")
 
  649             return _common.decode(self.
_state.debug_error_string)
 
  653     """An RPC iterator that depends on a channel spin thread. 
  655     This iterator relies upon a per-channel thread running in the background, 
  656     dequeueing events from the completion queue, and notifying threads waiting 
  657     on the threading.Condition object in the _RPCState object. 
  659     This extra thread allows _MultiThreadedRendezvous to fulfill the grpc.Future interface 
  660     and to mediate a bidirection streaming RPC. 
  664         """See grpc.Call.initial_metadata""" 
  665         with self.
_state.condition:
 
  668                 return self.
_state.initial_metadata 
is not None 
  670             _common.wait(self.
_state.condition.wait, _done)
 
  671             return self.
_state.initial_metadata
 
  674         """See grpc.Call.trailing_metadata""" 
  675         with self.
_state.condition:
 
  678                 return self.
_state.trailing_metadata 
is not None 
  680             _common.wait(self.
_state.condition.wait, _done)
 
  681             return self.
_state.trailing_metadata
 
  684         """See grpc.Call.code""" 
  685         with self.
_state.condition:
 
  688                 return self.
_state.code 
is not None 
  690             _common.wait(self.
_state.condition.wait, _done)
 
  694         """See grpc.Call.details""" 
  695         with self.
_state.condition:
 
  698                 return self.
_state.details 
is not None 
  700             _common.wait(self.
_state.condition.wait, _done)
 
  701             return _common.decode(self.
_state.details)
 
  704         with self.
_state.condition:
 
  707                 return self.
_state.debug_error_string 
is not None 
  709             _common.wait(self.
_state.condition.wait, _done)
 
  710             return _common.decode(self.
_state.debug_error_string)
 
  713         with self.
_state.condition:
 
  714             return self.
_state.cancelled
 
  717         with self.
_state.condition:
 
  718             return self.
_state.code 
is None 
  721         with self.
_state.condition:
 
  722             return self.
_state.code 
is not None 
  725         return self.
_state.code 
is not None 
  728         """Returns the result of the computation or raises its exception. 
  730         See grpc.Future.result for the full API contract. 
  732         with self.
_state.condition:
 
  733             timed_out = _common.wait(self.
_state.condition.wait,
 
  739                 if self.
_state.code 
is grpc.StatusCode.OK:
 
  740                     return self.
_state.response
 
  741                 elif self.
_state.cancelled:
 
  747         """Return the exception raised by the computation. 
  749         See grpc.Future.exception for the full API contract. 
  751         with self.
_state.condition:
 
  752             timed_out = _common.wait(self.
_state.condition.wait,
 
  758                 if self.
_state.code 
is grpc.StatusCode.OK:
 
  760                 elif self.
_state.cancelled:
 
  766         """Access the traceback of the exception raised by the computation. 
  768         See grpc.future.traceback for the full API contract. 
  770         with self.
_state.condition:
 
  771             timed_out = _common.wait(self.
_state.condition.wait,
 
  777                 if self.
_state.code 
is grpc.StatusCode.OK:
 
  779                 elif self.
_state.cancelled:
 
  785                         return sys.exc_info()[2]
 
  788         with self.
_state.condition:
 
  789             if self.
_state.code 
is None:
 
  790                 self.
_state.callbacks.append(functools.partial(fn, self))
 
  796         with self.
_state.condition:
 
  797             if self.
_state.code 
is None:
 
  800                 self.
_state.due.add(cygrpc.OperationType.receive_message)
 
  801                 operating = self.
_call.operate(
 
  802                     (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
 
  805                     self.
_state.due.remove(cygrpc.OperationType.receive_message)
 
  806             elif self.
_state.code 
is grpc.StatusCode.OK:
 
  807                 raise StopIteration()
 
  811             def _response_ready():
 
  812                 return (self.
_state.response 
is not None or 
  813                         (cygrpc.OperationType.receive_message
 
  814                          not in self.
_state.due 
and 
  815                          self.
_state.code 
is not None))
 
  817             _common.wait(self.
_state.condition.wait, _response_ready)
 
  818             if self.
_state.response 
is not None:
 
  819                 response = self.
_state.response
 
  820                 self.
_state.response = 
None 
  822             elif cygrpc.OperationType.receive_message 
not in self.
_state.due:
 
  823                 if self.
_state.code 
is grpc.StatusCode.OK:
 
  824                     raise StopIteration()
 
  825                 elif self.
_state.code 
is not None:
 
  831     serialized_request = _common.serialize(request, request_serializer)
 
  832     if serialized_request 
is None:
 
  833         state = 
_RPCState((), (), (), grpc.StatusCode.INTERNAL,
 
  834                           'Exception serializing request!')
 
  836         return deadline, 
None, error
 
  838         return deadline, serialized_request, 
None 
  842     if state.code 
is grpc.StatusCode.OK:
 
  845             return state.response, rendezvous
 
  847             return state.response
 
  855             cygrpc.SendInitialMetadataOperation(metadata,
 
  856                                                 initial_metadata_flags),
 
  857             cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
 
  858             cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
 
  860         (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
 
  865                                                    initial_metadata_flags):
 
  870         metadata, initial_metadata_flags))
 
  874     parent_deadline = cygrpc.get_deadline_from_context()
 
  875     if parent_deadline 
is None and user_deadline 
is None:
 
  877     elif parent_deadline 
is not None and user_deadline 
is None:
 
  878         return parent_deadline
 
  879     elif user_deadline 
is not None and parent_deadline 
is None:
 
  882         return min(parent_deadline, user_deadline)
 
  888     def __init__(self, channel, managed_call, method, request_serializer,
 
  889                  response_deserializer):
 
  895         self.
_context = cygrpc.build_census_context()
 
  897     def _prepare(self, request, timeout, metadata, wait_for_ready, compression):
 
  902         augmented_metadata = _compression.augment_metadata(
 
  903             metadata, compression)
 
  904         if serialized_request 
is None:
 
  905             return None, 
None, 
None, rendezvous
 
  907             state = 
_RPCState(_UNARY_UNARY_INITIAL_DUE, 
None, 
None, 
None, 
None)
 
  909                 cygrpc.SendInitialMetadataOperation(augmented_metadata,
 
  910                                                     initial_metadata_flags),
 
  911                 cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS),
 
  912                 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
 
  913                 cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),
 
  914                 cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
 
  915                 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
 
  917             return state, operations, deadline, 
None 
  919     def _blocking(self, request, timeout, metadata, credentials, wait_for_ready,
 
  921         state, operations, deadline, rendezvous = self.
_prepare(
 
  922             request, timeout, metadata, wait_for_ready, compression)
 
  926             call = self.
_channel.segregated_call(
 
  927                 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
 
  929                 None if credentials 
is None else credentials._credentials, ((
 
  933             event = call.next_event()
 
  944         state, call, = self.
_blocking(request, timeout, metadata, credentials,
 
  945                                       wait_for_ready, compression)
 
  955         state, call, = self.
_blocking(request, timeout, metadata, credentials,
 
  956                                       wait_for_ready, compression)
 
  966         state, operations, deadline, rendezvous = self.
_prepare(
 
  967             request, timeout, metadata, wait_for_ready, compression)
 
  973                 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
 
  974                 self.
_method, 
None, deadline, metadata,
 
  975                 None if credentials 
is None else credentials._credentials,
 
  976                 (operations,), event_handler, self.
_context)
 
  985     def __init__(self, channel, method, request_serializer,
 
  986                  response_deserializer):
 
  991         self.
_context = cygrpc.build_census_context()
 
 1002         serialized_request = _common.serialize(request,
 
 1004         if serialized_request 
is None:
 
 1005             state = 
_RPCState((), (), (), grpc.StatusCode.INTERNAL,
 
 1006                               'Exception serializing request!')
 
 1009         state = 
_RPCState(_UNARY_STREAM_INITIAL_DUE, 
None, 
None, 
None, 
None)
 
 1010         call_credentials = 
None if credentials 
is None else credentials._credentials
 
 1013         augmented_metadata = _compression.augment_metadata(
 
 1014             metadata, compression)
 
 1016             (cygrpc.SendInitialMetadataOperation(augmented_metadata,
 
 1017                                                  initial_metadata_flags),
 
 1018              cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS),
 
 1019              cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS)),
 
 1020             (cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),),
 
 1021             (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
 
 1023         operations_and_tags = tuple((ops, 
None) 
for ops 
in operations)
 
 1024         call = self.
_channel.segregated_call(
 
 1025             cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self.
_method,
 
 1027             operations_and_tags, self.
_context)
 
 1035     def __init__(self, channel, managed_call, method, request_serializer,
 
 1036                  response_deserializer):
 
 1042         self.
_context = cygrpc.build_census_context()
 
 1050             wait_for_ready=None,
 
 1056         if serialized_request 
is None:
 
 1059             augmented_metadata = _compression.augment_metadata(
 
 1060                 metadata, compression)
 
 1061             state = 
_RPCState(_UNARY_STREAM_INITIAL_DUE, 
None, 
None, 
None, 
None)
 
 1064                     cygrpc.SendInitialMetadataOperation(augmented_metadata,
 
 1065                                                         initial_metadata_flags),
 
 1066                     cygrpc.SendMessageOperation(serialized_request,
 
 1068                     cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
 
 1069                     cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
 
 1071                 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
 
 1074                 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
 
 1076                 None if credentials 
is None else credentials._credentials,
 
 1088     def __init__(self, channel, managed_call, method, request_serializer,
 
 1089                  response_deserializer):
 
 1095         self.
_context = cygrpc.build_census_context()
 
 1097     def _blocking(self, request_iterator, timeout, metadata, credentials,
 
 1098                   wait_for_ready, compression):
 
 1100         state = 
_RPCState(_STREAM_UNARY_INITIAL_DUE, 
None, 
None, 
None, 
None)
 
 1103         augmented_metadata = _compression.augment_metadata(
 
 1104             metadata, compression)
 
 1105         call = self.
_channel.segregated_call(
 
 1106             cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self.
_method,
 
 1108             None if credentials 
is None else credentials._credentials,
 
 1110                 augmented_metadata, initial_metadata_flags), self.
_context)
 
 1114             event = call.next_event()
 
 1115             with state.condition:
 
 1117                 state.condition.notify_all()
 
 1127                  wait_for_ready=None,
 
 1129         state, call, = self.
_blocking(request_iterator, timeout, metadata,
 
 1130                                       credentials, wait_for_ready, compression)
 
 1138                   wait_for_ready=None,
 
 1140         state, call, = self.
_blocking(request_iterator, timeout, metadata,
 
 1141                                       credentials, wait_for_ready, compression)
 
 1149                wait_for_ready=None,
 
 1152         state = 
_RPCState(_STREAM_UNARY_INITIAL_DUE, 
None, 
None, 
None, 
None)
 
 1156         augmented_metadata = _compression.augment_metadata(
 
 1157             metadata, compression)
 
 1159             cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self.
_method,
 
 1160             None, deadline, augmented_metadata,
 
 1161             None if credentials 
is None else credentials._credentials,
 
 1163                                                   initial_metadata_flags),
 
 1174     def __init__(self, channel, managed_call, method, request_serializer,
 
 1175                  response_deserializer):
 
 1181         self.
_context = cygrpc.build_census_context()
 
 1188                  wait_for_ready=None,
 
 1191         state = 
_RPCState(_STREAM_STREAM_INITIAL_DUE, 
None, 
None, 
None, 
None)
 
 1194         augmented_metadata = _compression.augment_metadata(
 
 1195             metadata, compression)
 
 1198                 cygrpc.SendInitialMetadataOperation(augmented_metadata,
 
 1199                                                     initial_metadata_flags),
 
 1200                 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
 
 1202             (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
 
 1206             cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self.
_method,
 
 1208             None if credentials 
is None else credentials._credentials,
 
 1209             operationses, event_handler, self.
_context)
 
 1217     """Stores immutable initial metadata flags""" 
 1220         value &= cygrpc.InitialMetadataFlags.used_mask
 
 1221         return super(_InitialMetadataFlags, cls).
__new__(cls, value)
 
 1224         if wait_for_ready 
is not None:
 
 1226                 return self.__class__(self | cygrpc.InitialMetadataFlags.wait_for_ready | \
 
 1227                     cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set)
 
 1228             elif not wait_for_ready:
 
 1229                 return self.__class__(self & ~cygrpc.InitialMetadataFlags.wait_for_ready | \
 
 1230                     cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set)
 
 1248                                'Channel deallocated!')
 
 1249         except (TypeError, AttributeError):
 
 1257             cygrpc.block_if_fork_in_progress(state)
 
 1258             event = state.channel.next_call_event()
 
 1259             if event.completion_type == cygrpc.CompletionType.queue_timeout:
 
 1261             call_completed = event.tag(event)
 
 1264                     state.managed_calls -= 1
 
 1265                     if state.managed_calls == 0:
 
 1268     channel_spin_thread = cygrpc.ForkManagedThread(target=channel_spin)
 
 1269     channel_spin_thread.setDaemon(
True)
 
 1270     channel_spin_thread.start()
 
 1276     def create(flags, method, host, deadline, metadata, credentials,
 
 1277                operationses, event_handler, context):
 
 1278         """Creates a cygrpc.IntegratedCall. 
 1281           flags: An integer bitfield of call flags. 
 1282           method: The RPC method. 
 1283           host: A host string for the created call. 
 1284           deadline: A float to be the deadline of the created call or None if 
 1285             the call is to have an infinite deadline. 
 1286           metadata: The metadata for the call or None. 
 1287           credentials: A cygrpc.CallCredentials or None. 
 1288           operationses: An iterable of iterables of cygrpc.Operations to be 
 1289             started on the call. 
 1290           event_handler: A behavior to call to handle the events resultant from 
 1291             the operations on the call. 
 1292           context: Context object for distributed tracing. 
 1294           A cygrpc.IntegratedCall with which to conduct an RPC. 
 1296         operationses_and_tags = tuple((
 
 1299         ) 
for operations 
in operationses)
 
 1301             call = state.channel.integrated_call(flags, method, host, deadline,
 
 1302                                                  metadata, credentials,
 
 1303                                                  operationses_and_tags, context)
 
 1304             if state.managed_calls == 0:
 
 1305                 state.managed_calls = 1
 
 1308                 state.managed_calls += 1
 
 1334     callbacks_needing_update = []
 
 1335     for callback_and_connectivity 
in state.callbacks_and_connectivities:
 
 1336         callback, callback_connectivity, = callback_and_connectivity
 
 1337         if callback_connectivity 
is not state.connectivity:
 
 1338             callbacks_needing_update.append(callback)
 
 1339             callback_and_connectivity[1] = state.connectivity
 
 1340     return callbacks_needing_update
 
 1343 def _deliver(state, initial_connectivity, initial_callbacks):
 
 1344     connectivity = initial_connectivity
 
 1345     callbacks = initial_callbacks
 
 1347         for callback 
in callbacks:
 
 1348             cygrpc.block_if_fork_in_progress(state)
 
 1353                     _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE)
 
 1357                 connectivity = state.connectivity
 
 1359                 state.delivering = 
False 
 1364     delivering_thread = cygrpc.ForkManagedThread(target=_deliver,
 
 1370     delivering_thread.setDaemon(
True)
 
 1371     delivering_thread.start()
 
 1372     state.delivering = 
True 
 1377     try_to_connect = initial_try_to_connect
 
 1378     connectivity = channel.check_connectivity_state(try_to_connect)
 
 1380         state.connectivity = (
 
 1382             CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[connectivity])
 
 1384             callback 
for callback, unused_but_known_to_be_none_connectivity 
in 
 1385             state.callbacks_and_connectivities)
 
 1386         for callback_and_connectivity 
in state.callbacks_and_connectivities:
 
 1387             callback_and_connectivity[1] = state.connectivity
 
 1391         event = channel.watch_connectivity_state(connectivity,
 
 1393         cygrpc.block_if_fork_in_progress(state)
 
 1395             if not state.callbacks_and_connectivities 
and not state.try_to_connect:
 
 1396                 state.polling = 
False 
 1397                 state.connectivity = 
None 
 1399             try_to_connect = state.try_to_connect
 
 1400             state.try_to_connect = 
False 
 1401         if event.success 
or try_to_connect:
 
 1402             connectivity = channel.check_connectivity_state(try_to_connect)
 
 1404                 state.connectivity = (
 
 1405                     _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[
 
 1407                 if not state.delivering:
 
 1415         if not state.callbacks_and_connectivities 
and not state.polling:
 
 1416             polling_thread = cygrpc.ForkManagedThread(
 
 1417                 target=_poll_connectivity,
 
 1418                 args=(state, state.channel, bool(try_to_connect)))
 
 1419             polling_thread.setDaemon(
True)
 
 1420             polling_thread.start()
 
 1421             state.polling = 
True 
 1422             state.callbacks_and_connectivities.append([callback, 
None])
 
 1423         elif not state.delivering 
and state.connectivity 
is not None:
 
 1425             state.try_to_connect |= bool(try_to_connect)
 
 1426             state.callbacks_and_connectivities.append(
 
 1427                 [callback, state.connectivity])
 
 1429             state.try_to_connect |= bool(try_to_connect)
 
 1430             state.callbacks_and_connectivities.append([callback, 
None])
 
 1435         for index, (subscribed_callback, unused_connectivity) 
in enumerate(
 
 1436                 state.callbacks_and_connectivities):
 
 1437             if callback == subscribed_callback:
 
 1438                 state.callbacks_and_connectivities.pop(index)
 
 1443     compression_option = _compression.create_channel_option(compression)
 
 1444     return tuple(base_options) + compression_option + ((
 
 1445         cygrpc.ChannelArgKey.primary_user_agent_string,
 
 1451     """Separates core channel options from Python channel options.""" 
 1454     for pair 
in options:
 
 1455         if pair[0] == grpc.experimental.ChannelOptions.SingleThreadedUnaryStream:
 
 1456             python_options.append(pair)
 
 1458             core_options.append(pair)
 
 1459     return python_options, core_options
 
 1463     """A cygrpc.Channel-backed implementation of grpc.Channel.""" 
 1465     def __init__(self, target, options, credentials, compression):
 
 1469           target: The target to which to connect. 
 1470           options: Configuration options for the channel. 
 1471           credentials: A cygrpc.ChannelCredentials or None. 
 1472           compression: An optional value indicating the compression method to be 
 1473             used over the lifetime of the channel. 
 1483         cygrpc.fork_register_channel(self)
 
 1484         if cygrpc.g_gevent_activated:
 
 1485             cygrpc.gevent_increment_channel_count()
 
 1488         """Sets channel attributes according to python-only channel options.""" 
 1489         for pair 
in python_options:
 
 1490             if pair[0] == grpc.experimental.ChannelOptions.SingleThreadedUnaryStream:
 
 1501                     request_serializer=None,
 
 1502                     response_deserializer=None):
 
 1505             _common.encode(method), request_serializer, response_deserializer)
 
 1509                      request_serializer=None,
 
 1510                      response_deserializer=None):
 
 1517                 self.
_channel, _common.encode(method), request_serializer,
 
 1518                 response_deserializer)
 
 1523                 _common.encode(method), request_serializer,
 
 1524                 response_deserializer)
 
 1528                      request_serializer=None,
 
 1529                      response_deserializer=None):
 
 1532             _common.encode(method), request_serializer, response_deserializer)
 
 1536                       request_serializer=None,
 
 1537                       response_deserializer=None):
 
 1540             _common.encode(method), request_serializer, response_deserializer)
 
 1546                 del state.callbacks_and_connectivities[:]
 
 1550         self.
_channel.
close(cygrpc.StatusCode.cancelled, 
'Channel closed!')
 
 1551         cygrpc.fork_unregister_channel(self)
 
 1552         if cygrpc.g_gevent_activated:
 
 1553             cygrpc.gevent_decrement_channel_count()
 
 1557         self.
_channel.close_on_fork(cygrpc.StatusCode.cancelled,
 
 1558                                     'Channel closed due to fork')