14 """Service-side implementation of gRPC Python."""
17 from concurrent
import futures
24 from grpc
import _common
25 from grpc
import _compression
26 from grpc
import _interceptor
30 _LOGGER = logging.getLogger(__name__)
32 _SHUTDOWN_TAG =
'shutdown'
33 _REQUEST_CALL_TAG =
'request_call'
35 _RECEIVE_CLOSE_ON_SERVER_TOKEN =
'receive_close_on_server'
36 _SEND_INITIAL_METADATA_TOKEN =
'send_initial_metadata'
37 _RECEIVE_MESSAGE_TOKEN =
'receive_message'
38 _SEND_MESSAGE_TOKEN =
'send_message'
39 _SEND_INITIAL_METADATA_AND_SEND_MESSAGE_TOKEN = (
40 'send_initial_metadata * send_message')
41 _SEND_STATUS_FROM_SERVER_TOKEN =
'send_status_from_server'
42 _SEND_INITIAL_METADATA_AND_SEND_STATUS_FROM_SERVER_TOKEN = (
43 'send_initial_metadata * send_status_from_server')
47 _CANCELLED =
'cancelled'
51 _DEALLOCATED_SERVER_CHECK_PERIOD_S = 1.0
56 return request_event.batch_operations[0].
message()
60 cygrpc_code = _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE.get(code)
61 return cygrpc.StatusCode.unknown
if cygrpc_code
is None else cygrpc_code
65 if state.code
is None:
66 return cygrpc.StatusCode.ok
72 if state.code
is None:
79 return b
'' if state.details
is None else state.details
83 collections.namedtuple(
'_HandlerCallDetails', (
85 'invocation_metadata',
111 state.rpc_errors.append(rpc_error)
116 state.due.remove(token)
118 callbacks = state.callbacks
119 state.callbacks =
None
120 return state, callbacks
127 def send_status_from_server(unused_send_status_from_server_event):
128 with state.condition:
131 return send_status_from_server
135 with state.condition:
136 if state.compression_algorithm:
137 compression_metadata = (
138 _compression.compression_algorithm_to_metadata(
139 state.compression_algorithm),)
141 return compression_metadata
143 return compression_metadata + tuple(metadata)
149 operation = cygrpc.SendInitialMetadataOperation(
155 if state.client
is not _CANCELLED:
157 effective_details = details
if state.details
is None else state.details
158 if state.initial_metadata_allowed:
161 cygrpc.SendStatusFromServerOperation(state.trailing_metadata,
166 token = _SEND_INITIAL_METADATA_AND_SEND_STATUS_FROM_SERVER_TOKEN
168 operations = (cygrpc.SendStatusFromServerOperation(
169 state.trailing_metadata, effective_code, effective_details,
171 token = _SEND_STATUS_FROM_SERVER_TOKEN
172 call.start_server_batch(operations,
174 state.statused =
True
180 def receive_close_on_server(receive_close_on_server_event):
181 with state.condition:
182 if receive_close_on_server_event.batch_operations[0].cancelled():
183 state.client = _CANCELLED
184 elif state.client
is _OPEN:
185 state.client = _CLOSED
186 state.condition.notify_all()
189 return receive_close_on_server
194 def receive_message(receive_message_event):
196 if serialized_request
is None:
197 with state.condition:
198 if state.client
is _OPEN:
199 state.client = _CLOSED
200 state.condition.notify_all()
203 request = _common.deserialize(serialized_request,
204 request_deserializer)
205 with state.condition:
207 _abort(state, call, cygrpc.StatusCode.internal,
208 b
'Exception deserializing request!')
210 state.request = request
211 state.condition.notify_all()
214 return receive_message
220 with state.condition:
223 return send_initial_metadata
229 with state.condition:
230 state.condition.notify_all()
238 def __init__(self, rpc_event, state, request_deserializer):
244 with self.
_state.condition:
248 return max(self.
_rpc_event.call_details.deadline - time.time(), 0)
254 with self.
_state.condition:
255 if self.
_state.callbacks
is None:
258 self.
_state.callbacks.append(callback)
262 with self.
_state.condition:
263 self.
_state.disable_next_compression =
True
269 return _common.decode(self.
_rpc_event.call.peer())
272 return cygrpc.peer_identities(self.
_rpc_event.call)
275 id_key = cygrpc.peer_identity_key(self.
_rpc_event.call)
276 return id_key
if id_key
is None else _common.decode(id_key)
280 _common.decode(key): value
for key, value
in six.iteritems(
285 with self.
_state.condition:
286 self.
_state.compression_algorithm = compression
289 with self.
_state.condition:
290 if self.
_state.client
is _CANCELLED:
293 if self.
_state.initial_metadata_allowed:
295 self.
_state, initial_metadata)
298 self.
_state.initial_metadata_allowed =
False
299 self.
_state.due.add(_SEND_INITIAL_METADATA_TOKEN)
301 raise ValueError(
'Initial metadata no longer allowed!')
304 with self.
_state.condition:
305 self.
_state.trailing_metadata = trailing_metadata
308 return self.
_state.trailing_metadata
312 if code == grpc.StatusCode.OK:
314 'abort() called with StatusCode.OK; returning UNKNOWN')
315 code = grpc.StatusCode.UNKNOWN
317 with self.
_state.condition:
319 self.
_state.details = _common.encode(details)
320 self.
_state.aborted =
True
324 self.
_state.trailing_metadata = status.trailing_metadata
325 self.
abort(status.code, status.details)
328 with self.
_state.condition:
335 with self.
_state.condition:
336 self.
_state.details = _common.encode(details)
339 return self.
_state.details
345 class _RequestIterator(object):
347 def __init__(self, state, call, request_deserializer):
353 if self.
_state.client
is _CANCELLED:
356 raise StopIteration()
358 self.
_call.start_server_batch(
359 (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
362 self.
_state.due.add(_RECEIVE_MESSAGE_TOKEN)
365 if self.
_state.client
is _CANCELLED:
367 elif (self.
_state.request
is None and
368 _RECEIVE_MESSAGE_TOKEN
not in self.
_state.due):
369 raise StopIteration()
371 request = self.
_state.request
372 self.
_state.request =
None
375 raise AssertionError()
378 with self.
_state.condition:
381 self.
_state.condition.wait()
383 if request
is not None:
399 with state.condition:
403 rpc_event.call.start_server_batch(
404 (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
406 request_deserializer))
407 state.due.add(_RECEIVE_MESSAGE_TOKEN)
409 state.condition.wait()
410 if state.request
is None:
411 if state.client
is _CLOSED:
412 details =
'"{}" requires exactly one request message.'.
format(
413 rpc_event.call_details.method)
414 _abort(state, rpc_event.call,
415 cygrpc.StatusCode.unimplemented,
416 _common.encode(details))
418 elif state.client
is _CANCELLED:
421 request = state.request
432 request_deserializer,
433 send_response_callback=None):
434 from grpc
import _create_servicer_context
436 request_deserializer)
as context:
438 response_or_iterator =
None
439 if send_response_callback
is not None:
440 response_or_iterator = behavior(argument, context,
441 send_response_callback)
443 response_or_iterator = behavior(argument, context)
444 return response_or_iterator,
True
445 except Exception
as exception:
446 with state.condition:
448 _abort(state, rpc_event.call, cygrpc.StatusCode.unknown,
450 elif exception
not in state.rpc_errors:
451 details =
'Exception calling application: {}'.
format(
453 _LOGGER.exception(details)
454 _abort(state, rpc_event.call, cygrpc.StatusCode.unknown,
455 _common.encode(details))
461 return next(response_iterator),
True
462 except StopIteration:
464 except Exception
as exception:
465 with state.condition:
467 _abort(state, rpc_event.call, cygrpc.StatusCode.unknown,
469 elif exception
not in state.rpc_errors:
470 details =
'Exception iterating responses: {}'.
format(exception)
471 _LOGGER.exception(details)
472 _abort(state, rpc_event.call, cygrpc.StatusCode.unknown,
473 _common.encode(details))
478 serialized_response = _common.serialize(response, response_serializer)
479 if serialized_response
is None:
480 with state.condition:
481 _abort(state, rpc_event.call, cygrpc.StatusCode.internal,
482 b
'Failed to serialize response!')
485 return serialized_response
489 if state.disable_next_compression:
490 return cygrpc.WriteFlag.no_compress
496 with state.condition:
497 state.disable_next_compression =
False
501 with state.condition:
505 if state.initial_metadata_allowed:
508 cygrpc.SendMessageOperation(
512 state.initial_metadata_allowed =
False
513 token = _SEND_INITIAL_METADATA_AND_SEND_MESSAGE_TOKEN
515 operations = (cygrpc.SendMessageOperation(
518 token = _SEND_MESSAGE_TOKEN
519 rpc_event.call.start_server_batch(operations,
524 state.condition.wait()
525 if token
not in state.due:
529 def _status(rpc_event, state, serialized_response):
530 with state.condition:
531 if state.client
is not _CANCELLED:
535 cygrpc.SendStatusFromServerOperation(state.trailing_metadata,
539 if state.initial_metadata_allowed:
541 if serialized_response
is not None:
543 cygrpc.SendMessageOperation(
546 rpc_event.call.start_server_batch(
549 state.statused =
True
551 state.due.add(_SEND_STATUS_FROM_SERVER_TOKEN)
555 request_deserializer, response_serializer):
556 cygrpc.install_context_from_request_call_event(rpc_event)
558 argument = argument_thunk()
559 if argument
is not None:
561 argument, request_deserializer)
564 rpc_event, state, response, response_serializer)
565 if serialized_response
is not None:
566 _status(rpc_event, state, serialized_response)
568 cygrpc.uninstall_context()
572 request_deserializer, response_serializer):
573 cygrpc.install_context_from_request_call_event(rpc_event)
575 def send_response(response):
577 _status(rpc_event, state,
None)
582 if serialized_response
is not None:
586 argument = argument_thunk()
587 if argument
is not None:
588 if hasattr(behavior,
'experimental_non_blocking'
589 )
and behavior.experimental_non_blocking:
594 request_deserializer,
595 send_response_callback=send_response)
598 rpc_event, state, behavior, argument, request_deserializer)
601 rpc_event, state, send_response, response_iterator)
603 cygrpc.uninstall_context()
607 return state.client
is not _CANCELLED
and not state.statused
611 send_response_callback,
615 rpc_event, state, response_iterator)
617 send_response_callback(response)
625 if hasattr(behavior,
'experimental_thread_pool')
and isinstance(
626 behavior.experimental_thread_pool, futures.ThreadPoolExecutor):
627 return behavior.experimental_thread_pool
629 return default_thread_pool
634 method_handler.request_deserializer)
637 return thread_pool.submit(_unary_response_in_pool, rpc_event, state,
638 method_handler.unary_unary, unary_request,
639 method_handler.request_deserializer,
640 method_handler.response_serializer)
645 method_handler.request_deserializer)
648 return thread_pool.submit(_stream_response_in_pool, rpc_event, state,
649 method_handler.unary_stream, unary_request,
650 method_handler.request_deserializer,
651 method_handler.response_serializer)
656 method_handler.request_deserializer)
659 return thread_pool.submit(_unary_response_in_pool, rpc_event, state,
660 method_handler.stream_unary,
661 lambda: request_iterator,
662 method_handler.request_deserializer,
663 method_handler.response_serializer)
667 default_thread_pool):
669 method_handler.request_deserializer)
672 return thread_pool.submit(_stream_response_in_pool, rpc_event, state,
673 method_handler.stream_stream,
674 lambda: request_iterator,
675 method_handler.request_deserializer,
676 method_handler.response_serializer)
681 def query_handlers(handler_call_details):
682 for generic_handler
in generic_handlers:
683 method_handler = generic_handler.service(handler_call_details)
684 if method_handler
is not None:
685 return method_handler
689 _common.decode(rpc_event.call_details.method),
690 rpc_event.invocation_metadata)
692 if interceptor_pipeline
is not None:
693 return interceptor_pipeline.execute(query_handlers,
694 handler_call_details)
696 return query_handlers(handler_call_details)
703 cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS),
704 cygrpc.SendStatusFromServerOperation(
None, status, details,
707 rpc_event.call.start_server_batch(operations,
lambda ignored_event: (
716 with state.condition:
717 rpc_event.call.start_server_batch(
718 (cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS),),
720 state.due.add(_RECEIVE_CLOSE_ON_SERVER_TOKEN)
721 if method_handler.request_streaming:
722 if method_handler.response_streaming:
724 method_handler, thread_pool)
727 method_handler, thread_pool)
729 if method_handler.response_streaming:
731 method_handler, thread_pool)
734 method_handler, thread_pool)
737 def _handle_call(rpc_event, generic_handlers, interceptor_pipeline, thread_pool,
738 concurrency_exceeded):
739 if not rpc_event.success:
741 if rpc_event.call_details.method
is not None:
744 interceptor_pipeline)
745 except Exception
as exception:
746 details =
'Exception servicing handler: {}'.
format(exception)
747 _LOGGER.exception(details)
748 return _reject_rpc(rpc_event, cygrpc.StatusCode.unknown,
749 b
'Error in service handler!'),
None
750 if method_handler
is None:
751 return _reject_rpc(rpc_event, cygrpc.StatusCode.unimplemented,
752 b
'Method not found!'),
None
753 elif concurrency_exceeded:
754 return _reject_rpc(rpc_event, cygrpc.StatusCode.resource_exhausted,
755 b
'Concurrent RPC limit exceeded!'),
None
773 def __init__(self, completion_queue, server, generic_handlers,
774 interceptor_pipeline, thread_pool, maximum_concurrent_rpcs):
797 state.generic_handlers.extend(generic_handlers)
802 return state.server.add_http2_port(address)
807 return state.server.add_http2_port(address,
808 server_credentials._credentials)
812 state.server.request_call(state.completion_queue, state.completion_queue,
814 state.due.add(_REQUEST_CALL_TAG)
819 if not state.rpc_states
and not state.due:
820 state.server.destroy()
821 for shutdown_event
in state.shutdown_events:
823 state.stage = _ServerStage.STOPPED
831 state.active_rpc_count -= 1
835 should_continue =
True
836 if event.tag
is _SHUTDOWN_TAG:
838 state.due.remove(_SHUTDOWN_TAG)
840 should_continue =
False
841 elif event.tag
is _REQUEST_CALL_TAG:
843 state.due.remove(_REQUEST_CALL_TAG)
844 concurrency_exceeded = (
845 state.maximum_concurrent_rpcs
is not None and
846 state.active_rpc_count >= state.maximum_concurrent_rpcs)
847 rpc_state, rpc_future =
_handle_call(event, state.generic_handlers,
848 state.interceptor_pipeline,
850 concurrency_exceeded)
851 if rpc_state
is not None:
852 state.rpc_states.add(rpc_state)
853 if rpc_future
is not None:
854 state.active_rpc_count += 1
855 rpc_future.add_done_callback(
857 if state.stage
is _ServerStage.STARTED:
860 should_continue =
False
862 rpc_state, callbacks = event.tag(event)
863 for callback
in callbacks:
867 _LOGGER.exception(
'Exception calling callback!')
868 if rpc_state
is not None:
870 state.rpc_states.remove(rpc_state)
872 should_continue =
False
873 return should_continue
878 timeout = time.time() + _DEALLOCATED_SERVER_CHECK_PERIOD_S
879 event = state.completion_queue.poll(timeout)
880 if state.server_deallocated:
882 if event.completion_type != cygrpc.CompletionType.queue_timeout:
893 if state.stage
is _ServerStage.STARTED:
894 state.server.shutdown(state.completion_queue, _SHUTDOWN_TAG)
895 state.stage = _ServerStage.GRACE
896 state.due.add(_SHUTDOWN_TAG)
901 if state.stage
is _ServerStage.STOPPED:
902 shutdown_event = threading.Event()
904 return shutdown_event
907 shutdown_event = threading.Event()
908 state.shutdown_events.append(shutdown_event)
910 state.server.cancel_all_calls()
913 def cancel_all_calls_after_grace():
914 shutdown_event.wait(timeout=grace)
916 state.server.cancel_all_calls()
918 thread = threading.Thread(target=cancel_all_calls_after_grace)
920 return shutdown_event
921 shutdown_event.wait()
922 return shutdown_event
927 if state.stage
is not _ServerStage.STOPPED:
928 raise ValueError(
'Cannot start already-started server!')
930 state.stage = _ServerStage.STARTED
933 thread = threading.Thread(target=_serve, args=(state,))
939 for generic_rpc_handler
in generic_rpc_handlers:
940 service_attribute = getattr(generic_rpc_handler,
'service',
None)
941 if service_attribute
is None:
942 raise AttributeError(
943 '"{}" must conform to grpc.GenericRpcHandler type but does '
944 'not have "service" method!'.
format(generic_rpc_handler))
948 compression_option = _compression.create_channel_option(compression)
949 return tuple(base_options) + compression_option
955 def __init__(self, thread_pool, generic_handlers, interceptors, options,
956 maximum_concurrent_rpcs, compression, xds):
957 completion_queue = cygrpc.CompletionQueue()
959 server.register_completion_queue(completion_queue)
961 _interceptor.service_pipeline(interceptors),
962 thread_pool, maximum_concurrent_rpcs)
969 return _common.validate_port_binding_result(
973 return _common.validate_port_binding_result(
985 return _common.wait(self.
_state.termination_event.wait,
986 self.
_state.termination_event.is_set,
993 if hasattr(self,
'_state'):
996 self.
_state.server_deallocated =
True
1000 maximum_concurrent_rpcs, compression, xds):
1002 return _Server(thread_pool, generic_rpc_handlers, interceptors, options,
1003 maximum_concurrent_rpcs, compression, xds)