14 """Invocation-side implementation of gRPC Python."""
25 from grpc
import _common
26 from grpc
import _compression
27 from grpc
import _grpcio_metadata
31 _LOGGER = logging.getLogger(__name__)
33 _USER_AGENT =
'grpc-python/{}'.
format(_grpcio_metadata.__version__)
39 _DEFAULT_SINGLE_THREADED_UNARY_STREAM = os.getenv(
40 "GRPC_SINGLE_THREADED_UNARY_STREAM")
is not None
42 _UNARY_UNARY_INITIAL_DUE = (
43 cygrpc.OperationType.send_initial_metadata,
44 cygrpc.OperationType.send_message,
45 cygrpc.OperationType.send_close_from_client,
46 cygrpc.OperationType.receive_initial_metadata,
47 cygrpc.OperationType.receive_message,
48 cygrpc.OperationType.receive_status_on_client,
50 _UNARY_STREAM_INITIAL_DUE = (
51 cygrpc.OperationType.send_initial_metadata,
52 cygrpc.OperationType.send_message,
53 cygrpc.OperationType.send_close_from_client,
54 cygrpc.OperationType.receive_initial_metadata,
55 cygrpc.OperationType.receive_status_on_client,
57 _STREAM_UNARY_INITIAL_DUE = (
58 cygrpc.OperationType.send_initial_metadata,
59 cygrpc.OperationType.receive_initial_metadata,
60 cygrpc.OperationType.receive_message,
61 cygrpc.OperationType.receive_status_on_client,
63 _STREAM_STREAM_INITIAL_DUE = (
64 cygrpc.OperationType.send_initial_metadata,
65 cygrpc.OperationType.receive_initial_metadata,
66 cygrpc.OperationType.receive_status_on_client,
69 _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = (
70 'Exception calling channel subscription callback!')
72 _OK_RENDEZVOUS_REPR_FORMAT = (
'<{} of RPC that terminated with:\n'
77 _NON_OK_RENDEZVOUS_REPR_FORMAT = (
'<{} of RPC that terminated with:\n'
80 '\tdebug_error_string = "{}"\n'
85 return None if timeout
is None else time.time() + timeout
89 return 'Server sent unknown code {} and details "{}"'.
format(
90 unknown_cygrpc_code, details)
95 def __init__(self, due, initial_metadata, trailing_metadata, code, details):
127 if state.code
is None:
129 state.details = details
130 if state.initial_metadata
is None:
131 state.initial_metadata = ()
132 state.trailing_metadata = ()
137 for batch_operation
in event.batch_operations:
138 operation_type = batch_operation.type()
139 state.due.remove(operation_type)
140 if operation_type == cygrpc.OperationType.receive_initial_metadata:
141 state.initial_metadata = batch_operation.initial_metadata()
142 elif operation_type == cygrpc.OperationType.receive_message:
143 serialized_response = batch_operation.message()
144 if serialized_response
is not None:
145 response = _common.deserialize(serialized_response,
146 response_deserializer)
148 details =
'Exception deserializing response!'
149 _abort(state, grpc.StatusCode.INTERNAL, details)
151 state.response = response
152 elif operation_type == cygrpc.OperationType.receive_status_on_client:
153 state.trailing_metadata = batch_operation.trailing_metadata()
154 if state.code
is None:
155 code = _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE.get(
156 batch_operation.code())
158 state.code = grpc.StatusCode.UNKNOWN
160 code, batch_operation.details())
163 state.details = batch_operation.details()
164 state.debug_error_string = batch_operation.error_string()
165 callbacks.extend(state.callbacks)
166 state.callbacks =
None
172 def handle_event(event):
173 with state.condition:
174 callbacks =
_handle_event(event, state, response_deserializer)
175 state.condition.notify_all()
177 for callback
in callbacks:
180 except Exception
as e:
183 logging.error(
'Exception in callback %s: %s',
184 repr(callback.func), repr(e))
185 return done
and state.fork_epoch >= cygrpc.get_fork_epoch()
193 """Consume a request iterator supplied by the user."""
195 def consume_request_iterator():
199 return_from_user_request_generator_invoked =
False
202 cygrpc.enter_user_request_generator()
203 request =
next(request_iterator)
204 except StopIteration:
207 cygrpc.return_from_user_request_generator()
208 return_from_user_request_generator_invoked =
True
209 code = grpc.StatusCode.UNKNOWN
210 details =
'Exception iterating requests!'
211 _LOGGER.exception(details)
212 call.cancel(_common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code],
214 _abort(state, code, details)
217 if not return_from_user_request_generator_invoked:
218 cygrpc.return_from_user_request_generator()
219 serialized_request = _common.serialize(request, request_serializer)
220 with state.condition:
221 if state.code
is None and not state.cancelled:
222 if serialized_request
is None:
223 code = grpc.StatusCode.INTERNAL
224 details =
'Exception serializing request!'
226 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code],
228 _abort(state, code, details)
231 state.due.add(cygrpc.OperationType.send_message)
232 operations = (cygrpc.SendMessageOperation(
233 serialized_request, _EMPTY_FLAGS),)
234 operating = call.operate(operations, event_handler)
236 state.due.remove(cygrpc.OperationType.send_message)
240 return (state.code
is not None or
241 cygrpc.OperationType.send_message
244 _common.wait(state.condition.wait,
246 spin_cb=functools.partial(
247 cygrpc.block_if_fork_in_progress,
249 if state.code
is not None:
253 with state.condition:
254 if state.code
is None:
255 state.due.add(cygrpc.OperationType.send_close_from_client)
257 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),)
258 operating = call.operate(operations, event_handler)
261 cygrpc.OperationType.send_close_from_client)
263 consumption_thread = cygrpc.ForkManagedThread(
264 target=consume_request_iterator)
265 consumption_thread.setDaemon(
True)
266 consumption_thread.start()
270 """Calculates error string for RPC."""
271 with rpc_state.condition:
272 if rpc_state.code
is None:
273 return '<{} object>'.
format(class_name)
274 elif rpc_state.code
is grpc.StatusCode.OK:
275 return _OK_RENDEZVOUS_REPR_FORMAT.format(class_name, rpc_state.code,
278 return _NON_OK_RENDEZVOUS_REPR_FORMAT.format(
279 class_name, rpc_state.code, rpc_state.details,
280 rpc_state.debug_error_string)
284 """An RPC error not tied to the execution of a particular RPC.
286 The RPC represented by the state object must not be in-progress or
290 _state: An instance of _RPCState.
294 with state.condition:
296 copy.deepcopy(state.trailing_metadata),
297 state.code, copy.deepcopy(state.details))
298 self.
_state.response = copy.copy(state.response)
299 self.
_state.debug_error_string = copy.copy(state.debug_error_string)
302 return self.
_state.initial_metadata
305 return self.
_state.trailing_metadata
311 return _common.decode(self.
_state.details)
314 return _common.decode(self.
_state.debug_error_string)
326 """See grpc.Future.cancel."""
330 """See grpc.Future.cancelled."""
334 """See grpc.Future.running."""
338 """See grpc.Future.done."""
342 """See grpc.Future.result."""
346 """See grpc.Future.exception."""
350 """See grpc.Future.traceback."""
354 return sys.exc_info()[2]
357 """See grpc.Future.add_done_callback."""
365 _state: An instance of _RPCState.
366 _call: An instance of SegregatedCall or IntegratedCall.
367 In either case, the _call object is expected to have operate, cancel,
368 and next_event methods.
369 _response_deserializer: A callable taking bytes and return a Python
371 _deadline: A float representing the deadline of the RPC in seconds. Or
372 possibly None, to represent an RPC with no deadline at all.
375 def __init__(self, state, call, response_deserializer, deadline):
383 """See grpc.RpcContext.is_active"""
384 with self.
_state.condition:
385 return self.
_state.code
is None
388 """See grpc.RpcContext.time_remaining"""
389 with self.
_state.condition:
396 """See grpc.RpcContext.cancel"""
397 with self.
_state.condition:
398 if self.
_state.code
is None:
399 code = grpc.StatusCode.CANCELLED
400 details =
'Locally cancelled by application!'
402 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details)
403 self.
_state.cancelled =
True
405 self.
_state.condition.notify_all()
411 """See grpc.RpcContext.add_callback"""
412 with self.
_state.condition:
413 if self.
_state.callbacks
is None:
416 self.
_state.callbacks.append(callback)
429 raise NotImplementedError()
432 raise NotImplementedError()
444 with self.
_state.condition:
445 if self.
_state.code
is None:
446 self.
_state.code = grpc.StatusCode.CANCELLED
447 self.
_state.details =
'Cancelled upon garbage collection!'
448 self.
_state.cancelled =
True
450 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[self.
_state.code],
452 self.
_state.condition.notify_all()
456 """An RPC iterator operating entirely on a single thread.
458 The __next__ method of _SingleThreadedRendezvous does not depend on the
459 existence of any other thread, including the "channel spin thread".
460 However, this means that its interface is entirely synchronous. So this
461 class cannot completely fulfill the grpc.Future interface. The result,
462 exception, and traceback methods will never block and will instead raise
463 an exception if calling the method would result in blocking.
465 This means that these methods are safe to call from add_done_callback
470 return self.
_state.code
is not None
473 with self.
_state.condition:
474 return self.
_state.cancelled
477 with self.
_state.condition:
478 return self.
_state.code
is None
481 with self.
_state.condition:
482 return self.
_state.code
is not None
485 """Returns the result of the computation or raises its exception.
487 This method will never block. Instead, it will raise an exception
488 if calling this method would otherwise result in blocking.
490 Since this method will never block, any `timeout` argument passed will
494 with self.
_state.condition:
497 "_SingleThreadedRendezvous only supports result() when the RPC is complete."
499 if self.
_state.code
is grpc.StatusCode.OK:
500 return self.
_state.response
501 elif self.
_state.cancelled:
507 """Return the exception raised by the computation.
509 This method will never block. Instead, it will raise an exception
510 if calling this method would otherwise result in blocking.
512 Since this method will never block, any `timeout` argument passed will
516 with self.
_state.condition:
519 "_SingleThreadedRendezvous only supports exception() when the RPC is complete."
521 if self.
_state.code
is grpc.StatusCode.OK:
523 elif self.
_state.cancelled:
529 """Access the traceback of the exception raised by the computation.
531 This method will never block. Instead, it will raise an exception
532 if calling this method would otherwise result in blocking.
534 Since this method will never block, any `timeout` argument passed will
538 with self.
_state.condition:
541 "_SingleThreadedRendezvous only supports traceback() when the RPC is complete."
543 if self.
_state.code
is grpc.StatusCode.OK:
545 elif self.
_state.cancelled:
551 return sys.exc_info()[2]
554 with self.
_state.condition:
555 if self.
_state.code
is None:
556 self.
_state.callbacks.append(functools.partial(fn, self))
562 """See grpc.Call.initial_metadata"""
563 with self.
_state.condition:
566 while self.
_state.initial_metadata
is None:
568 return self.
_state.initial_metadata
571 """See grpc.Call.trailing_metadata"""
572 with self.
_state.condition:
573 if self.
_state.trailing_metadata
is None:
575 "Cannot get trailing metadata until RPC is completed.")
576 return self.
_state.trailing_metadata
579 """See grpc.Call.code"""
580 with self.
_state.condition:
581 if self.
_state.code
is None:
583 "Cannot get code until RPC is completed.")
587 """See grpc.Call.details"""
588 with self.
_state.condition:
589 if self.
_state.details
is None:
591 "Cannot get details until RPC is completed.")
592 return _common.decode(self.
_state.details)
595 event = self.
_call.next_event()
596 with self.
_state.condition:
599 for callback
in callbacks:
608 with self.
_state.condition:
609 if self.
_state.response
is not None:
610 response = self.
_state.response
611 self.
_state.response =
None
613 elif cygrpc.OperationType.receive_message
not in self.
_state.due:
614 if self.
_state.code
is grpc.StatusCode.OK:
615 raise StopIteration()
616 elif self.
_state.code
is not None:
620 with self.
_state.condition:
621 if self.
_state.code
is None:
633 self.
_state.due.add(cygrpc.OperationType.receive_message)
634 operating = self.
_call.operate(
635 (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
None)
637 self.
_state.due.remove(cygrpc.OperationType.receive_message)
638 elif self.
_state.code
is grpc.StatusCode.OK:
639 raise StopIteration()
645 with self.
_state.condition:
646 if self.
_state.debug_error_string
is None:
648 "Cannot get debug error string until RPC is completed.")
649 return _common.decode(self.
_state.debug_error_string)
653 """An RPC iterator that depends on a channel spin thread.
655 This iterator relies upon a per-channel thread running in the background,
656 dequeueing events from the completion queue, and notifying threads waiting
657 on the threading.Condition object in the _RPCState object.
659 This extra thread allows _MultiThreadedRendezvous to fulfill the grpc.Future interface
660 and to mediate a bidirection streaming RPC.
664 """See grpc.Call.initial_metadata"""
665 with self.
_state.condition:
668 return self.
_state.initial_metadata
is not None
670 _common.wait(self.
_state.condition.wait, _done)
671 return self.
_state.initial_metadata
674 """See grpc.Call.trailing_metadata"""
675 with self.
_state.condition:
678 return self.
_state.trailing_metadata
is not None
680 _common.wait(self.
_state.condition.wait, _done)
681 return self.
_state.trailing_metadata
684 """See grpc.Call.code"""
685 with self.
_state.condition:
688 return self.
_state.code
is not None
690 _common.wait(self.
_state.condition.wait, _done)
694 """See grpc.Call.details"""
695 with self.
_state.condition:
698 return self.
_state.details
is not None
700 _common.wait(self.
_state.condition.wait, _done)
701 return _common.decode(self.
_state.details)
704 with self.
_state.condition:
707 return self.
_state.debug_error_string
is not None
709 _common.wait(self.
_state.condition.wait, _done)
710 return _common.decode(self.
_state.debug_error_string)
713 with self.
_state.condition:
714 return self.
_state.cancelled
717 with self.
_state.condition:
718 return self.
_state.code
is None
721 with self.
_state.condition:
722 return self.
_state.code
is not None
725 return self.
_state.code
is not None
728 """Returns the result of the computation or raises its exception.
730 See grpc.Future.result for the full API contract.
732 with self.
_state.condition:
733 timed_out = _common.wait(self.
_state.condition.wait,
739 if self.
_state.code
is grpc.StatusCode.OK:
740 return self.
_state.response
741 elif self.
_state.cancelled:
747 """Return the exception raised by the computation.
749 See grpc.Future.exception for the full API contract.
751 with self.
_state.condition:
752 timed_out = _common.wait(self.
_state.condition.wait,
758 if self.
_state.code
is grpc.StatusCode.OK:
760 elif self.
_state.cancelled:
766 """Access the traceback of the exception raised by the computation.
768 See grpc.future.traceback for the full API contract.
770 with self.
_state.condition:
771 timed_out = _common.wait(self.
_state.condition.wait,
777 if self.
_state.code
is grpc.StatusCode.OK:
779 elif self.
_state.cancelled:
785 return sys.exc_info()[2]
788 with self.
_state.condition:
789 if self.
_state.code
is None:
790 self.
_state.callbacks.append(functools.partial(fn, self))
796 with self.
_state.condition:
797 if self.
_state.code
is None:
800 self.
_state.due.add(cygrpc.OperationType.receive_message)
801 operating = self.
_call.operate(
802 (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
805 self.
_state.due.remove(cygrpc.OperationType.receive_message)
806 elif self.
_state.code
is grpc.StatusCode.OK:
807 raise StopIteration()
811 def _response_ready():
812 return (self.
_state.response
is not None or
813 (cygrpc.OperationType.receive_message
814 not in self.
_state.due
and
815 self.
_state.code
is not None))
817 _common.wait(self.
_state.condition.wait, _response_ready)
818 if self.
_state.response
is not None:
819 response = self.
_state.response
820 self.
_state.response =
None
822 elif cygrpc.OperationType.receive_message
not in self.
_state.due:
823 if self.
_state.code
is grpc.StatusCode.OK:
824 raise StopIteration()
825 elif self.
_state.code
is not None:
831 serialized_request = _common.serialize(request, request_serializer)
832 if serialized_request
is None:
833 state =
_RPCState((), (), (), grpc.StatusCode.INTERNAL,
834 'Exception serializing request!')
836 return deadline,
None, error
838 return deadline, serialized_request,
None
842 if state.code
is grpc.StatusCode.OK:
845 return state.response, rendezvous
847 return state.response
855 cygrpc.SendInitialMetadataOperation(metadata,
856 initial_metadata_flags),
857 cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
858 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
860 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
865 initial_metadata_flags):
870 metadata, initial_metadata_flags))
874 parent_deadline = cygrpc.get_deadline_from_context()
875 if parent_deadline
is None and user_deadline
is None:
877 elif parent_deadline
is not None and user_deadline
is None:
878 return parent_deadline
879 elif user_deadline
is not None and parent_deadline
is None:
882 return min(parent_deadline, user_deadline)
888 def __init__(self, channel, managed_call, method, request_serializer,
889 response_deserializer):
895 self.
_context = cygrpc.build_census_context()
897 def _prepare(self, request, timeout, metadata, wait_for_ready, compression):
902 augmented_metadata = _compression.augment_metadata(
903 metadata, compression)
904 if serialized_request
is None:
905 return None,
None,
None, rendezvous
907 state =
_RPCState(_UNARY_UNARY_INITIAL_DUE,
None,
None,
None,
None)
909 cygrpc.SendInitialMetadataOperation(augmented_metadata,
910 initial_metadata_flags),
911 cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS),
912 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
913 cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),
914 cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
915 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
917 return state, operations, deadline,
None
919 def _blocking(self, request, timeout, metadata, credentials, wait_for_ready,
921 state, operations, deadline, rendezvous = self.
_prepare(
922 request, timeout, metadata, wait_for_ready, compression)
926 call = self.
_channel.segregated_call(
927 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
929 None if credentials
is None else credentials._credentials, ((
933 event = call.next_event()
944 state, call, = self.
_blocking(request, timeout, metadata, credentials,
945 wait_for_ready, compression)
955 state, call, = self.
_blocking(request, timeout, metadata, credentials,
956 wait_for_ready, compression)
966 state, operations, deadline, rendezvous = self.
_prepare(
967 request, timeout, metadata, wait_for_ready, compression)
973 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
974 self.
_method,
None, deadline, metadata,
975 None if credentials
is None else credentials._credentials,
976 (operations,), event_handler, self.
_context)
985 def __init__(self, channel, method, request_serializer,
986 response_deserializer):
991 self.
_context = cygrpc.build_census_context()
1002 serialized_request = _common.serialize(request,
1004 if serialized_request
is None:
1005 state =
_RPCState((), (), (), grpc.StatusCode.INTERNAL,
1006 'Exception serializing request!')
1009 state =
_RPCState(_UNARY_STREAM_INITIAL_DUE,
None,
None,
None,
None)
1010 call_credentials =
None if credentials
is None else credentials._credentials
1013 augmented_metadata = _compression.augment_metadata(
1014 metadata, compression)
1016 (cygrpc.SendInitialMetadataOperation(augmented_metadata,
1017 initial_metadata_flags),
1018 cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS),
1019 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS)),
1020 (cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),),
1021 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
1023 operations_and_tags = tuple((ops,
None)
for ops
in operations)
1024 call = self.
_channel.segregated_call(
1025 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self.
_method,
1027 operations_and_tags, self.
_context)
1035 def __init__(self, channel, managed_call, method, request_serializer,
1036 response_deserializer):
1042 self.
_context = cygrpc.build_census_context()
1050 wait_for_ready=None,
1056 if serialized_request
is None:
1059 augmented_metadata = _compression.augment_metadata(
1060 metadata, compression)
1061 state =
_RPCState(_UNARY_STREAM_INITIAL_DUE,
None,
None,
None,
None)
1064 cygrpc.SendInitialMetadataOperation(augmented_metadata,
1065 initial_metadata_flags),
1066 cygrpc.SendMessageOperation(serialized_request,
1068 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
1069 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
1071 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
1074 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
1076 None if credentials
is None else credentials._credentials,
1088 def __init__(self, channel, managed_call, method, request_serializer,
1089 response_deserializer):
1095 self.
_context = cygrpc.build_census_context()
1097 def _blocking(self, request_iterator, timeout, metadata, credentials,
1098 wait_for_ready, compression):
1100 state =
_RPCState(_STREAM_UNARY_INITIAL_DUE,
None,
None,
None,
None)
1103 augmented_metadata = _compression.augment_metadata(
1104 metadata, compression)
1105 call = self.
_channel.segregated_call(
1106 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self.
_method,
1108 None if credentials
is None else credentials._credentials,
1110 augmented_metadata, initial_metadata_flags), self.
_context)
1114 event = call.next_event()
1115 with state.condition:
1117 state.condition.notify_all()
1127 wait_for_ready=None,
1129 state, call, = self.
_blocking(request_iterator, timeout, metadata,
1130 credentials, wait_for_ready, compression)
1138 wait_for_ready=None,
1140 state, call, = self.
_blocking(request_iterator, timeout, metadata,
1141 credentials, wait_for_ready, compression)
1149 wait_for_ready=None,
1152 state =
_RPCState(_STREAM_UNARY_INITIAL_DUE,
None,
None,
None,
None)
1156 augmented_metadata = _compression.augment_metadata(
1157 metadata, compression)
1159 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self.
_method,
1160 None, deadline, augmented_metadata,
1161 None if credentials
is None else credentials._credentials,
1163 initial_metadata_flags),
1174 def __init__(self, channel, managed_call, method, request_serializer,
1175 response_deserializer):
1181 self.
_context = cygrpc.build_census_context()
1188 wait_for_ready=None,
1191 state =
_RPCState(_STREAM_STREAM_INITIAL_DUE,
None,
None,
None,
None)
1194 augmented_metadata = _compression.augment_metadata(
1195 metadata, compression)
1198 cygrpc.SendInitialMetadataOperation(augmented_metadata,
1199 initial_metadata_flags),
1200 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
1202 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
1206 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self.
_method,
1208 None if credentials
is None else credentials._credentials,
1209 operationses, event_handler, self.
_context)
1217 """Stores immutable initial metadata flags"""
1220 value &= cygrpc.InitialMetadataFlags.used_mask
1221 return super(_InitialMetadataFlags, cls).
__new__(cls, value)
1224 if wait_for_ready
is not None:
1226 return self.__class__(self | cygrpc.InitialMetadataFlags.wait_for_ready | \
1227 cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set)
1228 elif not wait_for_ready:
1229 return self.__class__(self & ~cygrpc.InitialMetadataFlags.wait_for_ready | \
1230 cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set)
1248 'Channel deallocated!')
1249 except (TypeError, AttributeError):
1257 cygrpc.block_if_fork_in_progress(state)
1258 event = state.channel.next_call_event()
1259 if event.completion_type == cygrpc.CompletionType.queue_timeout:
1261 call_completed = event.tag(event)
1264 state.managed_calls -= 1
1265 if state.managed_calls == 0:
1268 channel_spin_thread = cygrpc.ForkManagedThread(target=channel_spin)
1269 channel_spin_thread.setDaemon(
True)
1270 channel_spin_thread.start()
1276 def create(flags, method, host, deadline, metadata, credentials,
1277 operationses, event_handler, context):
1278 """Creates a cygrpc.IntegratedCall.
1281 flags: An integer bitfield of call flags.
1282 method: The RPC method.
1283 host: A host string for the created call.
1284 deadline: A float to be the deadline of the created call or None if
1285 the call is to have an infinite deadline.
1286 metadata: The metadata for the call or None.
1287 credentials: A cygrpc.CallCredentials or None.
1288 operationses: An iterable of iterables of cygrpc.Operations to be
1289 started on the call.
1290 event_handler: A behavior to call to handle the events resultant from
1291 the operations on the call.
1292 context: Context object for distributed tracing.
1294 A cygrpc.IntegratedCall with which to conduct an RPC.
1296 operationses_and_tags = tuple((
1299 )
for operations
in operationses)
1301 call = state.channel.integrated_call(flags, method, host, deadline,
1302 metadata, credentials,
1303 operationses_and_tags, context)
1304 if state.managed_calls == 0:
1305 state.managed_calls = 1
1308 state.managed_calls += 1
1334 callbacks_needing_update = []
1335 for callback_and_connectivity
in state.callbacks_and_connectivities:
1336 callback, callback_connectivity, = callback_and_connectivity
1337 if callback_connectivity
is not state.connectivity:
1338 callbacks_needing_update.append(callback)
1339 callback_and_connectivity[1] = state.connectivity
1340 return callbacks_needing_update
1343 def _deliver(state, initial_connectivity, initial_callbacks):
1344 connectivity = initial_connectivity
1345 callbacks = initial_callbacks
1347 for callback
in callbacks:
1348 cygrpc.block_if_fork_in_progress(state)
1353 _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE)
1357 connectivity = state.connectivity
1359 state.delivering =
False
1364 delivering_thread = cygrpc.ForkManagedThread(target=_deliver,
1370 delivering_thread.setDaemon(
True)
1371 delivering_thread.start()
1372 state.delivering =
True
1377 try_to_connect = initial_try_to_connect
1378 connectivity = channel.check_connectivity_state(try_to_connect)
1380 state.connectivity = (
1382 CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[connectivity])
1384 callback
for callback, unused_but_known_to_be_none_connectivity
in
1385 state.callbacks_and_connectivities)
1386 for callback_and_connectivity
in state.callbacks_and_connectivities:
1387 callback_and_connectivity[1] = state.connectivity
1391 event = channel.watch_connectivity_state(connectivity,
1393 cygrpc.block_if_fork_in_progress(state)
1395 if not state.callbacks_and_connectivities
and not state.try_to_connect:
1396 state.polling =
False
1397 state.connectivity =
None
1399 try_to_connect = state.try_to_connect
1400 state.try_to_connect =
False
1401 if event.success
or try_to_connect:
1402 connectivity = channel.check_connectivity_state(try_to_connect)
1404 state.connectivity = (
1405 _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[
1407 if not state.delivering:
1415 if not state.callbacks_and_connectivities
and not state.polling:
1416 polling_thread = cygrpc.ForkManagedThread(
1417 target=_poll_connectivity,
1418 args=(state, state.channel, bool(try_to_connect)))
1419 polling_thread.setDaemon(
True)
1420 polling_thread.start()
1421 state.polling =
True
1422 state.callbacks_and_connectivities.append([callback,
None])
1423 elif not state.delivering
and state.connectivity
is not None:
1425 state.try_to_connect |= bool(try_to_connect)
1426 state.callbacks_and_connectivities.append(
1427 [callback, state.connectivity])
1429 state.try_to_connect |= bool(try_to_connect)
1430 state.callbacks_and_connectivities.append([callback,
None])
1435 for index, (subscribed_callback, unused_connectivity)
in enumerate(
1436 state.callbacks_and_connectivities):
1437 if callback == subscribed_callback:
1438 state.callbacks_and_connectivities.pop(index)
1443 compression_option = _compression.create_channel_option(compression)
1444 return tuple(base_options) + compression_option + ((
1445 cygrpc.ChannelArgKey.primary_user_agent_string,
1451 """Separates core channel options from Python channel options."""
1454 for pair
in options:
1455 if pair[0] == grpc.experimental.ChannelOptions.SingleThreadedUnaryStream:
1456 python_options.append(pair)
1458 core_options.append(pair)
1459 return python_options, core_options
1463 """A cygrpc.Channel-backed implementation of grpc.Channel."""
1465 def __init__(self, target, options, credentials, compression):
1469 target: The target to which to connect.
1470 options: Configuration options for the channel.
1471 credentials: A cygrpc.ChannelCredentials or None.
1472 compression: An optional value indicating the compression method to be
1473 used over the lifetime of the channel.
1483 cygrpc.fork_register_channel(self)
1484 if cygrpc.g_gevent_activated:
1485 cygrpc.gevent_increment_channel_count()
1488 """Sets channel attributes according to python-only channel options."""
1489 for pair
in python_options:
1490 if pair[0] == grpc.experimental.ChannelOptions.SingleThreadedUnaryStream:
1501 request_serializer=None,
1502 response_deserializer=None):
1505 _common.encode(method), request_serializer, response_deserializer)
1509 request_serializer=None,
1510 response_deserializer=None):
1517 self.
_channel, _common.encode(method), request_serializer,
1518 response_deserializer)
1523 _common.encode(method), request_serializer,
1524 response_deserializer)
1528 request_serializer=None,
1529 response_deserializer=None):
1532 _common.encode(method), request_serializer, response_deserializer)
1536 request_serializer=None,
1537 response_deserializer=None):
1540 _common.encode(method), request_serializer, response_deserializer)
1546 del state.callbacks_and_connectivities[:]
1550 self.
_channel.
close(cygrpc.StatusCode.cancelled,
'Channel closed!')
1551 cygrpc.fork_unregister_channel(self)
1552 if cygrpc.g_gevent_activated:
1553 cygrpc.gevent_decrement_channel_count()
1557 self.
_channel.close_on_fork(cygrpc.StatusCode.cancelled,
1558 'Channel closed due to fork')