grpc/_server.py
Go to the documentation of this file.
1 # Copyright 2016 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Service-side implementation of gRPC Python."""
15 
16 import collections
17 from concurrent import futures
18 import enum
19 import logging
20 import threading
21 import time
22 
23 import grpc
24 from grpc import _common
25 from grpc import _compression
26 from grpc import _interceptor
27 from grpc._cython import cygrpc
28 import six
29 
30 _LOGGER = logging.getLogger(__name__)
31 
32 _SHUTDOWN_TAG = 'shutdown'
33 _REQUEST_CALL_TAG = 'request_call'
34 
35 _RECEIVE_CLOSE_ON_SERVER_TOKEN = 'receive_close_on_server'
36 _SEND_INITIAL_METADATA_TOKEN = 'send_initial_metadata'
37 _RECEIVE_MESSAGE_TOKEN = 'receive_message'
38 _SEND_MESSAGE_TOKEN = 'send_message'
39 _SEND_INITIAL_METADATA_AND_SEND_MESSAGE_TOKEN = (
40  'send_initial_metadata * send_message')
41 _SEND_STATUS_FROM_SERVER_TOKEN = 'send_status_from_server'
42 _SEND_INITIAL_METADATA_AND_SEND_STATUS_FROM_SERVER_TOKEN = (
43  'send_initial_metadata * send_status_from_server')
44 
45 _OPEN = 'open'
46 _CLOSED = 'closed'
47 _CANCELLED = 'cancelled'
48 
49 _EMPTY_FLAGS = 0
50 
51 _DEALLOCATED_SERVER_CHECK_PERIOD_S = 1.0
52 _INF_TIMEOUT = 1e9
53 
54 
55 def _serialized_request(request_event):
56  return request_event.batch_operations[0].message()
57 
58 
60  cygrpc_code = _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE.get(code)
61  return cygrpc.StatusCode.unknown if cygrpc_code is None else cygrpc_code
62 
63 
64 def _completion_code(state):
65  if state.code is None:
66  return cygrpc.StatusCode.ok
67  else:
68  return _application_code(state.code)
69 
70 
71 def _abortion_code(state, code):
72  if state.code is None:
73  return code
74  else:
75  return _application_code(state.code)
76 
77 
78 def _details(state):
79  return b'' if state.details is None else state.details
80 
81 
83  collections.namedtuple('_HandlerCallDetails', (
84  'method',
85  'invocation_metadata',
87  pass
88 
89 
90 class _RPCState(object):
91 
92  def __init__(self):
93  self.condition = threading.Condition()
94  self.due = set()
95  self.request = None
96  self.client = _OPEN
100  self.trailing_metadata = None
101  self.code = None
102  self.details = None
103  self.statused = False
104  self.rpc_errors = []
105  self.callbacks = []
106  self.aborted = False
107 
108 
109 def _raise_rpc_error(state):
110  rpc_error = grpc.RpcError()
111  state.rpc_errors.append(rpc_error)
112  raise rpc_error
113 
114 
115 def _possibly_finish_call(state, token):
116  state.due.remove(token)
117  if not _is_rpc_state_active(state) and not state.due:
118  callbacks = state.callbacks
119  state.callbacks = None
120  return state, callbacks
121  else:
122  return None, ()
123 
124 
125 def _send_status_from_server(state, token):
126 
127  def send_status_from_server(unused_send_status_from_server_event):
128  with state.condition:
129  return _possibly_finish_call(state, token)
130 
131  return send_status_from_server
132 
133 
134 def _get_initial_metadata(state, metadata):
135  with state.condition:
136  if state.compression_algorithm:
137  compression_metadata = (
138  _compression.compression_algorithm_to_metadata(
139  state.compression_algorithm),)
140  if metadata is None:
141  return compression_metadata
142  else:
143  return compression_metadata + tuple(metadata)
144  else:
145  return metadata
146 
147 
148 def _get_initial_metadata_operation(state, metadata):
149  operation = cygrpc.SendInitialMetadataOperation(
150  _get_initial_metadata(state, metadata), _EMPTY_FLAGS)
151  return operation
152 
153 
154 def _abort(state, call, code, details):
155  if state.client is not _CANCELLED:
156  effective_code = _abortion_code(state, code)
157  effective_details = details if state.details is None else state.details
158  if state.initial_metadata_allowed:
159  operations = (
160  _get_initial_metadata_operation(state, None),
161  cygrpc.SendStatusFromServerOperation(state.trailing_metadata,
162  effective_code,
163  effective_details,
164  _EMPTY_FLAGS),
165  )
166  token = _SEND_INITIAL_METADATA_AND_SEND_STATUS_FROM_SERVER_TOKEN
167  else:
168  operations = (cygrpc.SendStatusFromServerOperation(
169  state.trailing_metadata, effective_code, effective_details,
170  _EMPTY_FLAGS),)
171  token = _SEND_STATUS_FROM_SERVER_TOKEN
172  call.start_server_batch(operations,
173  _send_status_from_server(state, token))
174  state.statused = True
175  state.due.add(token)
176 
177 
179 
180  def receive_close_on_server(receive_close_on_server_event):
181  with state.condition:
182  if receive_close_on_server_event.batch_operations[0].cancelled():
183  state.client = _CANCELLED
184  elif state.client is _OPEN:
185  state.client = _CLOSED
186  state.condition.notify_all()
187  return _possibly_finish_call(state, _RECEIVE_CLOSE_ON_SERVER_TOKEN)
188 
189  return receive_close_on_server
190 
191 
192 def _receive_message(state, call, request_deserializer):
193 
194  def receive_message(receive_message_event):
195  serialized_request = _serialized_request(receive_message_event)
196  if serialized_request is None:
197  with state.condition:
198  if state.client is _OPEN:
199  state.client = _CLOSED
200  state.condition.notify_all()
201  return _possibly_finish_call(state, _RECEIVE_MESSAGE_TOKEN)
202  else:
203  request = _common.deserialize(serialized_request,
204  request_deserializer)
205  with state.condition:
206  if request is None:
207  _abort(state, call, cygrpc.StatusCode.internal,
208  b'Exception deserializing request!')
209  else:
210  state.request = request
211  state.condition.notify_all()
212  return _possibly_finish_call(state, _RECEIVE_MESSAGE_TOKEN)
213 
214  return receive_message
215 
216 
218 
219  def send_initial_metadata(unused_send_initial_metadata_event):
220  with state.condition:
221  return _possibly_finish_call(state, _SEND_INITIAL_METADATA_TOKEN)
222 
223  return send_initial_metadata
224 
225 
226 def _send_message(state, token):
227 
228  def send_message(unused_send_message_event):
229  with state.condition:
230  state.condition.notify_all()
231  return _possibly_finish_call(state, token)
232 
233  return send_message
234 
235 
237 
238  def __init__(self, rpc_event, state, request_deserializer):
239  self._rpc_event = rpc_event
240  self._state = state
241  self._request_deserializer = request_deserializer
242 
243  def is_active(self):
244  with self._state.condition:
245  return _is_rpc_state_active(self._state)
246 
247  def time_remaining(self):
248  return max(self._rpc_event.call_details.deadline - time.time(), 0)
249 
250  def cancel(self):
251  self._rpc_event.call.cancel()
252 
253  def add_callback(self, callback):
254  with self._state.condition:
255  if self._state.callbacks is None:
256  return False
257  else:
258  self._state.callbacks.append(callback)
259  return True
260 
262  with self._state.condition:
263  self._state.disable_next_compression = True
264 
266  return self._rpc_event.invocation_metadata
267 
268  def peer(self):
269  return _common.decode(self._rpc_event.call.peer())
270 
271  def peer_identities(self):
272  return cygrpc.peer_identities(self._rpc_event.call)
273 
274  def peer_identity_key(self):
275  id_key = cygrpc.peer_identity_key(self._rpc_event.call)
276  return id_key if id_key is None else _common.decode(id_key)
277 
278  def auth_context(self):
279  return {
280  _common.decode(key): value for key, value in six.iteritems(
281  cygrpc.auth_context(self._rpc_event.call))
282  }
283 
284  def set_compression(self, compression):
285  with self._state.condition:
286  self._state.compression_algorithm = compression
287 
288  def send_initial_metadata(self, initial_metadata):
289  with self._state.condition:
290  if self._state.client is _CANCELLED:
292  else:
293  if self._state.initial_metadata_allowed:
295  self._state, initial_metadata)
296  self._rpc_event.call.start_server_batch(
297  (operation,), _send_initial_metadata(self._state))
298  self._state.initial_metadata_allowed = False
299  self._state.due.add(_SEND_INITIAL_METADATA_TOKEN)
300  else:
301  raise ValueError('Initial metadata no longer allowed!')
302 
303  def set_trailing_metadata(self, trailing_metadata):
304  with self._state.condition:
305  self._state.trailing_metadata = trailing_metadata
306 
307  def trailing_metadata(self):
308  return self._state.trailing_metadata
309 
310  def abort(self, code, details):
311  # treat OK like other invalid arguments: fail the RPC
312  if code == grpc.StatusCode.OK:
313  _LOGGER.error(
314  'abort() called with StatusCode.OK; returning UNKNOWN')
315  code = grpc.StatusCode.UNKNOWN
316  details = ''
317  with self._state.condition:
318  self._state.code = code
319  self._state.details = _common.encode(details)
320  self._state.aborted = True
321  raise Exception()
322 
323  def abort_with_status(self, status):
324  self._state.trailing_metadata = status.trailing_metadata
325  self.abort(status.code, status.details)
326 
327  def set_code(self, code):
328  with self._state.condition:
329  self._state.code = code
330 
331  def code(self):
332  return self._state.code
333 
334  def set_details(self, details):
335  with self._state.condition:
336  self._state.details = _common.encode(details)
337 
338  def details(self):
339  return self._state.details
340 
341  def _finalize_state(self):
342  pass
343 
344 
345 class _RequestIterator(object):
346 
347  def __init__(self, state, call, request_deserializer):
348  self._state = state
349  self._call = call
350  self._request_deserializer = request_deserializer
351 
353  if self._state.client is _CANCELLED:
355  elif not _is_rpc_state_active(self._state):
356  raise StopIteration()
357  else:
358  self._call.start_server_batch(
359  (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
360  _receive_message(self._state, self._call,
361  self._request_deserializer))
362  self._state.due.add(_RECEIVE_MESSAGE_TOKEN)
363 
364  def _look_for_request(self):
365  if self._state.client is _CANCELLED:
367  elif (self._state.request is None and
368  _RECEIVE_MESSAGE_TOKEN not in self._state.due):
369  raise StopIteration()
370  else:
371  request = self._state.request
372  self._state.request = None
373  return request
374 
375  raise AssertionError() # should never run
376 
377  def _next(self):
378  with self._state.condition:
380  while True:
381  self._state.condition.wait()
382  request = self._look_for_request()
383  if request is not None:
384  return request
385 
386  def __iter__(self):
387  return self
388 
389  def __next__(self):
390  return self._next()
391 
392  def next(self):
393  return self._next()
394 
395 
396 def _unary_request(rpc_event, state, request_deserializer):
397 
398  def unary_request():
399  with state.condition:
400  if not _is_rpc_state_active(state):
401  return None
402  else:
403  rpc_event.call.start_server_batch(
404  (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
405  _receive_message(state, rpc_event.call,
406  request_deserializer))
407  state.due.add(_RECEIVE_MESSAGE_TOKEN)
408  while True:
409  state.condition.wait()
410  if state.request is None:
411  if state.client is _CLOSED:
412  details = '"{}" requires exactly one request message.'.format(
413  rpc_event.call_details.method)
414  _abort(state, rpc_event.call,
415  cygrpc.StatusCode.unimplemented,
416  _common.encode(details))
417  return None
418  elif state.client is _CANCELLED:
419  return None
420  else:
421  request = state.request
422  state.request = None
423  return request
424 
425  return unary_request
426 
427 
428 def _call_behavior(rpc_event,
429  state,
430  behavior,
431  argument,
432  request_deserializer,
433  send_response_callback=None):
434  from grpc import _create_servicer_context
435  with _create_servicer_context(rpc_event, state,
436  request_deserializer) as context:
437  try:
438  response_or_iterator = None
439  if send_response_callback is not None:
440  response_or_iterator = behavior(argument, context,
441  send_response_callback)
442  else:
443  response_or_iterator = behavior(argument, context)
444  return response_or_iterator, True
445  except Exception as exception: # pylint: disable=broad-except
446  with state.condition:
447  if state.aborted:
448  _abort(state, rpc_event.call, cygrpc.StatusCode.unknown,
449  b'RPC Aborted')
450  elif exception not in state.rpc_errors:
451  details = 'Exception calling application: {}'.format(
452  exception)
453  _LOGGER.exception(details)
454  _abort(state, rpc_event.call, cygrpc.StatusCode.unknown,
455  _common.encode(details))
456  return None, False
457 
458 
459 def _take_response_from_response_iterator(rpc_event, state, response_iterator):
460  try:
461  return next(response_iterator), True
462  except StopIteration:
463  return None, True
464  except Exception as exception: # pylint: disable=broad-except
465  with state.condition:
466  if state.aborted:
467  _abort(state, rpc_event.call, cygrpc.StatusCode.unknown,
468  b'RPC Aborted')
469  elif exception not in state.rpc_errors:
470  details = 'Exception iterating responses: {}'.format(exception)
471  _LOGGER.exception(details)
472  _abort(state, rpc_event.call, cygrpc.StatusCode.unknown,
473  _common.encode(details))
474  return None, False
475 
476 
477 def _serialize_response(rpc_event, state, response, response_serializer):
478  serialized_response = _common.serialize(response, response_serializer)
479  if serialized_response is None:
480  with state.condition:
481  _abort(state, rpc_event.call, cygrpc.StatusCode.internal,
482  b'Failed to serialize response!')
483  return None
484  else:
485  return serialized_response
486 
487 
489  if state.disable_next_compression:
490  return cygrpc.WriteFlag.no_compress
491  else:
492  return _EMPTY_FLAGS
493 
494 
496  with state.condition:
497  state.disable_next_compression = False
498 
499 
500 def _send_response(rpc_event, state, serialized_response):
501  with state.condition:
502  if not _is_rpc_state_active(state):
503  return False
504  else:
505  if state.initial_metadata_allowed:
506  operations = (
507  _get_initial_metadata_operation(state, None),
508  cygrpc.SendMessageOperation(
509  serialized_response,
511  )
512  state.initial_metadata_allowed = False
513  token = _SEND_INITIAL_METADATA_AND_SEND_MESSAGE_TOKEN
514  else:
515  operations = (cygrpc.SendMessageOperation(
516  serialized_response,
518  token = _SEND_MESSAGE_TOKEN
519  rpc_event.call.start_server_batch(operations,
520  _send_message(state, token))
521  state.due.add(token)
523  while True:
524  state.condition.wait()
525  if token not in state.due:
526  return _is_rpc_state_active(state)
527 
528 
529 def _status(rpc_event, state, serialized_response):
530  with state.condition:
531  if state.client is not _CANCELLED:
532  code = _completion_code(state)
533  details = _details(state)
534  operations = [
535  cygrpc.SendStatusFromServerOperation(state.trailing_metadata,
536  code, details,
537  _EMPTY_FLAGS),
538  ]
539  if state.initial_metadata_allowed:
540  operations.append(_get_initial_metadata_operation(state, None))
541  if serialized_response is not None:
542  operations.append(
543  cygrpc.SendMessageOperation(
544  serialized_response,
546  rpc_event.call.start_server_batch(
547  operations,
548  _send_status_from_server(state, _SEND_STATUS_FROM_SERVER_TOKEN))
549  state.statused = True
551  state.due.add(_SEND_STATUS_FROM_SERVER_TOKEN)
552 
553 
554 def _unary_response_in_pool(rpc_event, state, behavior, argument_thunk,
555  request_deserializer, response_serializer):
556  cygrpc.install_context_from_request_call_event(rpc_event)
557  try:
558  argument = argument_thunk()
559  if argument is not None:
560  response, proceed = _call_behavior(rpc_event, state, behavior,
561  argument, request_deserializer)
562  if proceed:
563  serialized_response = _serialize_response(
564  rpc_event, state, response, response_serializer)
565  if serialized_response is not None:
566  _status(rpc_event, state, serialized_response)
567  finally:
568  cygrpc.uninstall_context()
569 
570 
571 def _stream_response_in_pool(rpc_event, state, behavior, argument_thunk,
572  request_deserializer, response_serializer):
573  cygrpc.install_context_from_request_call_event(rpc_event)
574 
575  def send_response(response):
576  if response is None:
577  _status(rpc_event, state, None)
578  else:
579  serialized_response = _serialize_response(rpc_event, state,
580  response,
581  response_serializer)
582  if serialized_response is not None:
583  _send_response(rpc_event, state, serialized_response)
584 
585  try:
586  argument = argument_thunk()
587  if argument is not None:
588  if hasattr(behavior, 'experimental_non_blocking'
589  ) and behavior.experimental_non_blocking:
590  _call_behavior(rpc_event,
591  state,
592  behavior,
593  argument,
594  request_deserializer,
595  send_response_callback=send_response)
596  else:
597  response_iterator, proceed = _call_behavior(
598  rpc_event, state, behavior, argument, request_deserializer)
599  if proceed:
601  rpc_event, state, send_response, response_iterator)
602  finally:
603  cygrpc.uninstall_context()
604 
605 
607  return state.client is not _CANCELLED and not state.statused
608 
609 
611  send_response_callback,
612  response_iterator):
613  while True:
614  response, proceed = _take_response_from_response_iterator(
615  rpc_event, state, response_iterator)
616  if proceed:
617  send_response_callback(response)
618  if not _is_rpc_state_active(state):
619  break
620  else:
621  break
622 
623 
624 def _select_thread_pool_for_behavior(behavior, default_thread_pool):
625  if hasattr(behavior, 'experimental_thread_pool') and isinstance(
626  behavior.experimental_thread_pool, futures.ThreadPoolExecutor):
627  return behavior.experimental_thread_pool
628  else:
629  return default_thread_pool
630 
631 
632 def _handle_unary_unary(rpc_event, state, method_handler, default_thread_pool):
633  unary_request = _unary_request(rpc_event, state,
634  method_handler.request_deserializer)
635  thread_pool = _select_thread_pool_for_behavior(method_handler.unary_unary,
636  default_thread_pool)
637  return thread_pool.submit(_unary_response_in_pool, rpc_event, state,
638  method_handler.unary_unary, unary_request,
639  method_handler.request_deserializer,
640  method_handler.response_serializer)
641 
642 
643 def _handle_unary_stream(rpc_event, state, method_handler, default_thread_pool):
644  unary_request = _unary_request(rpc_event, state,
645  method_handler.request_deserializer)
646  thread_pool = _select_thread_pool_for_behavior(method_handler.unary_stream,
647  default_thread_pool)
648  return thread_pool.submit(_stream_response_in_pool, rpc_event, state,
649  method_handler.unary_stream, unary_request,
650  method_handler.request_deserializer,
651  method_handler.response_serializer)
652 
653 
654 def _handle_stream_unary(rpc_event, state, method_handler, default_thread_pool):
655  request_iterator = _RequestIterator(state, rpc_event.call,
656  method_handler.request_deserializer)
657  thread_pool = _select_thread_pool_for_behavior(method_handler.stream_unary,
658  default_thread_pool)
659  return thread_pool.submit(_unary_response_in_pool, rpc_event, state,
660  method_handler.stream_unary,
661  lambda: request_iterator,
662  method_handler.request_deserializer,
663  method_handler.response_serializer)
664 
665 
666 def _handle_stream_stream(rpc_event, state, method_handler,
667  default_thread_pool):
668  request_iterator = _RequestIterator(state, rpc_event.call,
669  method_handler.request_deserializer)
670  thread_pool = _select_thread_pool_for_behavior(method_handler.stream_stream,
671  default_thread_pool)
672  return thread_pool.submit(_stream_response_in_pool, rpc_event, state,
673  method_handler.stream_stream,
674  lambda: request_iterator,
675  method_handler.request_deserializer,
676  method_handler.response_serializer)
677 
678 
679 def _find_method_handler(rpc_event, generic_handlers, interceptor_pipeline):
680 
681  def query_handlers(handler_call_details):
682  for generic_handler in generic_handlers:
683  method_handler = generic_handler.service(handler_call_details)
684  if method_handler is not None:
685  return method_handler
686  return None
687 
688  handler_call_details = _HandlerCallDetails(
689  _common.decode(rpc_event.call_details.method),
690  rpc_event.invocation_metadata)
691 
692  if interceptor_pipeline is not None:
693  return interceptor_pipeline.execute(query_handlers,
694  handler_call_details)
695  else:
696  return query_handlers(handler_call_details)
697 
698 
699 def _reject_rpc(rpc_event, status, details):
700  rpc_state = _RPCState()
701  operations = (
702  _get_initial_metadata_operation(rpc_state, None),
703  cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS),
704  cygrpc.SendStatusFromServerOperation(None, status, details,
705  _EMPTY_FLAGS),
706  )
707  rpc_event.call.start_server_batch(operations, lambda ignored_event: (
708  rpc_state,
709  (),
710  ))
711  return rpc_state
712 
713 
714 def _handle_with_method_handler(rpc_event, method_handler, thread_pool):
715  state = _RPCState()
716  with state.condition:
717  rpc_event.call.start_server_batch(
718  (cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS),),
720  state.due.add(_RECEIVE_CLOSE_ON_SERVER_TOKEN)
721  if method_handler.request_streaming:
722  if method_handler.response_streaming:
723  return state, _handle_stream_stream(rpc_event, state,
724  method_handler, thread_pool)
725  else:
726  return state, _handle_stream_unary(rpc_event, state,
727  method_handler, thread_pool)
728  else:
729  if method_handler.response_streaming:
730  return state, _handle_unary_stream(rpc_event, state,
731  method_handler, thread_pool)
732  else:
733  return state, _handle_unary_unary(rpc_event, state,
734  method_handler, thread_pool)
735 
736 
737 def _handle_call(rpc_event, generic_handlers, interceptor_pipeline, thread_pool,
738  concurrency_exceeded):
739  if not rpc_event.success:
740  return None, None
741  if rpc_event.call_details.method is not None:
742  try:
743  method_handler = _find_method_handler(rpc_event, generic_handlers,
744  interceptor_pipeline)
745  except Exception as exception: # pylint: disable=broad-except
746  details = 'Exception servicing handler: {}'.format(exception)
747  _LOGGER.exception(details)
748  return _reject_rpc(rpc_event, cygrpc.StatusCode.unknown,
749  b'Error in service handler!'), None
750  if method_handler is None:
751  return _reject_rpc(rpc_event, cygrpc.StatusCode.unimplemented,
752  b'Method not found!'), None
753  elif concurrency_exceeded:
754  return _reject_rpc(rpc_event, cygrpc.StatusCode.resource_exhausted,
755  b'Concurrent RPC limit exceeded!'), None
756  else:
757  return _handle_with_method_handler(rpc_event, method_handler,
758  thread_pool)
759  else:
760  return None, None
761 
762 
763 @enum.unique
764 class _ServerStage(enum.Enum):
765  STOPPED = 'stopped'
766  STARTED = 'started'
767  GRACE = 'grace'
768 
769 
770 class _ServerState(object):
771 
772  # pylint: disable=too-many-arguments
773  def __init__(self, completion_queue, server, generic_handlers,
774  interceptor_pipeline, thread_pool, maximum_concurrent_rpcs):
775  self.lock = threading.RLock()
776  self.completion_queue = completion_queue
777  self.server = server
778  self.generic_handlers = list(generic_handlers)
779  self.interceptor_pipeline = interceptor_pipeline
780  self.thread_pool = thread_pool
781  self.stage = _ServerStage.STOPPED
782  self.termination_event = threading.Event()
784  self.maximum_concurrent_rpcs = maximum_concurrent_rpcs
785  self.active_rpc_count = 0
786 
787  # TODO(https://github.com/grpc/grpc/issues/6597): eliminate these fields.
788  self.rpc_states = set()
789  self.due = set()
790 
791  # A "volatile" flag to interrupt the daemon serving thread
792  self.server_deallocated = False
793 
794 
795 def _add_generic_handlers(state, generic_handlers):
796  with state.lock:
797  state.generic_handlers.extend(generic_handlers)
798 
799 
800 def _add_insecure_port(state, address):
801  with state.lock:
802  return state.server.add_http2_port(address)
803 
804 
805 def _add_secure_port(state, address, server_credentials):
806  with state.lock:
807  return state.server.add_http2_port(address,
808  server_credentials._credentials)
809 
810 
811 def _request_call(state):
812  state.server.request_call(state.completion_queue, state.completion_queue,
813  _REQUEST_CALL_TAG)
814  state.due.add(_REQUEST_CALL_TAG)
815 
816 
817 # TODO(https://github.com/grpc/grpc/issues/6597): delete this function.
818 def _stop_serving(state):
819  if not state.rpc_states and not state.due:
820  state.server.destroy()
821  for shutdown_event in state.shutdown_events:
822  shutdown_event.set()
823  state.stage = _ServerStage.STOPPED
824  return True
825  else:
826  return False
827 
828 
830  with state.lock:
831  state.active_rpc_count -= 1
832 
833 
834 def _process_event_and_continue(state, event):
835  should_continue = True
836  if event.tag is _SHUTDOWN_TAG:
837  with state.lock:
838  state.due.remove(_SHUTDOWN_TAG)
839  if _stop_serving(state):
840  should_continue = False
841  elif event.tag is _REQUEST_CALL_TAG:
842  with state.lock:
843  state.due.remove(_REQUEST_CALL_TAG)
844  concurrency_exceeded = (
845  state.maximum_concurrent_rpcs is not None and
846  state.active_rpc_count >= state.maximum_concurrent_rpcs)
847  rpc_state, rpc_future = _handle_call(event, state.generic_handlers,
848  state.interceptor_pipeline,
849  state.thread_pool,
850  concurrency_exceeded)
851  if rpc_state is not None:
852  state.rpc_states.add(rpc_state)
853  if rpc_future is not None:
854  state.active_rpc_count += 1
855  rpc_future.add_done_callback(
856  lambda unused_future: _on_call_completed(state))
857  if state.stage is _ServerStage.STARTED:
858  _request_call(state)
859  elif _stop_serving(state):
860  should_continue = False
861  else:
862  rpc_state, callbacks = event.tag(event)
863  for callback in callbacks:
864  try:
865  callback()
866  except Exception: # pylint: disable=broad-except
867  _LOGGER.exception('Exception calling callback!')
868  if rpc_state is not None:
869  with state.lock:
870  state.rpc_states.remove(rpc_state)
871  if _stop_serving(state):
872  should_continue = False
873  return should_continue
874 
875 
876 def _serve(state):
877  while True:
878  timeout = time.time() + _DEALLOCATED_SERVER_CHECK_PERIOD_S
879  event = state.completion_queue.poll(timeout)
880  if state.server_deallocated:
881  _begin_shutdown_once(state)
882  if event.completion_type != cygrpc.CompletionType.queue_timeout:
883  if not _process_event_and_continue(state, event):
884  return
885  # We want to force the deletion of the previous event
886  # ~before~ we poll again; if the event has a reference
887  # to a shutdown Call object, this can induce spinlock.
888  event = None
889 
890 
892  with state.lock:
893  if state.stage is _ServerStage.STARTED:
894  state.server.shutdown(state.completion_queue, _SHUTDOWN_TAG)
895  state.stage = _ServerStage.GRACE
896  state.due.add(_SHUTDOWN_TAG)
897 
898 
899 def _stop(state, grace):
900  with state.lock:
901  if state.stage is _ServerStage.STOPPED:
902  shutdown_event = threading.Event()
903  shutdown_event.set()
904  return shutdown_event
905  else:
906  _begin_shutdown_once(state)
907  shutdown_event = threading.Event()
908  state.shutdown_events.append(shutdown_event)
909  if grace is None:
910  state.server.cancel_all_calls()
911  else:
912 
913  def cancel_all_calls_after_grace():
914  shutdown_event.wait(timeout=grace)
915  with state.lock:
916  state.server.cancel_all_calls()
917 
918  thread = threading.Thread(target=cancel_all_calls_after_grace)
919  thread.start()
920  return shutdown_event
921  shutdown_event.wait()
922  return shutdown_event
923 
924 
925 def _start(state):
926  with state.lock:
927  if state.stage is not _ServerStage.STOPPED:
928  raise ValueError('Cannot start already-started server!')
929  state.server.start()
930  state.stage = _ServerStage.STARTED
931  _request_call(state)
932 
933  thread = threading.Thread(target=_serve, args=(state,))
934  thread.daemon = True
935  thread.start()
936 
937 
938 def _validate_generic_rpc_handlers(generic_rpc_handlers):
939  for generic_rpc_handler in generic_rpc_handlers:
940  service_attribute = getattr(generic_rpc_handler, 'service', None)
941  if service_attribute is None:
942  raise AttributeError(
943  '"{}" must conform to grpc.GenericRpcHandler type but does '
944  'not have "service" method!'.format(generic_rpc_handler))
945 
946 
947 def _augment_options(base_options, compression):
948  compression_option = _compression.create_channel_option(compression)
949  return tuple(base_options) + compression_option
950 
951 
953 
954  # pylint: disable=too-many-arguments
955  def __init__(self, thread_pool, generic_handlers, interceptors, options,
956  maximum_concurrent_rpcs, compression, xds):
957  completion_queue = cygrpc.CompletionQueue()
958  server = cygrpc.Server(_augment_options(options, compression), xds)
959  server.register_completion_queue(completion_queue)
960  self._state = _ServerState(completion_queue, server, generic_handlers,
961  _interceptor.service_pipeline(interceptors),
962  thread_pool, maximum_concurrent_rpcs)
963 
964  def add_generic_rpc_handlers(self, generic_rpc_handlers):
965  _validate_generic_rpc_handlers(generic_rpc_handlers)
966  _add_generic_handlers(self._state, generic_rpc_handlers)
967 
968  def add_insecure_port(self, address):
969  return _common.validate_port_binding_result(
970  address, _add_insecure_port(self._state, _common.encode(address)))
971 
972  def add_secure_port(self, address, server_credentials):
973  return _common.validate_port_binding_result(
974  address,
975  _add_secure_port(self._state, _common.encode(address),
976  server_credentials))
977 
978  def start(self):
979  _start(self._state)
980 
981  def wait_for_termination(self, timeout=None):
982  # NOTE(https://bugs.python.org/issue35935)
983  # Remove this workaround once threading.Event.wait() is working with
984  # CTRL+C across platforms.
985  return _common.wait(self._state.termination_event.wait,
986  self._state.termination_event.is_set,
987  timeout=timeout)
988 
989  def stop(self, grace):
990  return _stop(self._state, grace)
991 
992  def __del__(self):
993  if hasattr(self, '_state'):
994  # We can not grab a lock in __del__(), so set a flag to signal the
995  # serving daemon thread (if it exists) to initiate shutdown.
996  self._state.server_deallocated = True
997 
998 
999 def create_server(thread_pool, generic_rpc_handlers, interceptors, options,
1000  maximum_concurrent_rpcs, compression, xds):
1001  _validate_generic_rpc_handlers(generic_rpc_handlers)
1002  return _Server(thread_pool, generic_rpc_handlers, interceptors, options,
1003  maximum_concurrent_rpcs, compression, xds)
grpc._server._handle_with_method_handler
def _handle_with_method_handler(rpc_event, method_handler, thread_pool)
Definition: grpc/_server.py:714
grpc._server._handle_stream_unary
def _handle_stream_unary(rpc_event, state, method_handler, default_thread_pool)
Definition: grpc/_server.py:654
grpc._server._Context.set_trailing_metadata
def set_trailing_metadata(self, trailing_metadata)
Definition: grpc/_server.py:303
grpc._server._ServerState.due
due
Definition: grpc/_server.py:788
grpc._create_servicer_context
def _create_servicer_context(rpc_event, state, request_deserializer)
Definition: src/python/grpcio/grpc/__init__.py:2076
grpc._server._receive_message
def _receive_message(state, call, request_deserializer)
Definition: grpc/_server.py:192
grpc._server._handle_unary_stream
def _handle_unary_stream(rpc_event, state, method_handler, default_thread_pool)
Definition: grpc/_server.py:643
grpc._server._ServerState.generic_handlers
generic_handlers
Definition: grpc/_server.py:777
http2_test_server.format
format
Definition: http2_test_server.py:118
grpc._server._RequestIterator._request_deserializer
_request_deserializer
Definition: grpc/_server.py:350
grpc._server._start
def _start(state)
Definition: grpc/_server.py:925
grpc._server._Context.set_code
def set_code(self, code)
Definition: grpc/_server.py:327
grpc._server._handle_unary_unary
def _handle_unary_unary(rpc_event, state, method_handler, default_thread_pool)
Definition: grpc/_server.py:632
grpc._server._RPCState.condition
condition
Definition: grpc/_server.py:93
grpc._server._HandlerCallDetails
Definition: grpc/_server.py:86
grpc._server._RPCState.rpc_errors
rpc_errors
Definition: grpc/_server.py:104
grpc._server._Context.send_initial_metadata
def send_initial_metadata(self, initial_metadata)
Definition: grpc/_server.py:288
grpc._server._receive_close_on_server
def _receive_close_on_server(state)
Definition: grpc/_server.py:178
grpc._server._Context.peer
def peer(self)
Definition: grpc/_server.py:268
grpc._server._find_method_handler
def _find_method_handler(rpc_event, generic_handlers, interceptor_pipeline)
Definition: grpc/_server.py:679
grpc._server._application_code
def _application_code(code)
Definition: grpc/_server.py:59
grpc._server._ServerStage
Definition: grpc/_server.py:764
grpc._server._serve
def _serve(state)
Definition: grpc/_server.py:876
grpc._server._send_message_callback_to_blocking_iterator_adapter
def _send_message_callback_to_blocking_iterator_adapter(rpc_event, state, send_response_callback, response_iterator)
Definition: grpc/_server.py:610
grpc._server._ServerState.shutdown_events
shutdown_events
Definition: grpc/_server.py:782
grpc._server._unary_request
def _unary_request(rpc_event, state, request_deserializer)
Definition: grpc/_server.py:396
grpc._server._RPCState.disable_next_compression
disable_next_compression
Definition: grpc/_server.py:99
grpc._server._Context._request_deserializer
_request_deserializer
Definition: grpc/_server.py:241
grpc._server._RequestIterator._call
_call
Definition: grpc/_server.py:349
grpc._server._ServerState.active_rpc_count
active_rpc_count
Definition: grpc/_server.py:784
grpc._server._add_insecure_port
def _add_insecure_port(state, address)
Definition: grpc/_server.py:800
grpc._server._Context._finalize_state
def _finalize_state(self)
Definition: grpc/_server.py:341
grpc._server._ServerState.interceptor_pipeline
interceptor_pipeline
Definition: grpc/_server.py:778
grpc._server._select_thread_pool_for_behavior
def _select_thread_pool_for_behavior(behavior, default_thread_pool)
Definition: grpc/_server.py:624
grpc._server._reject_rpc
def _reject_rpc(rpc_event, status, details)
Definition: grpc/_server.py:699
grpc.ServicerContext.abort
def abort(self, code, details)
Definition: src/python/grpcio/grpc/__init__.py:1192
grpc.RpcError
Definition: src/python/grpcio/grpc/__init__.py:302
grpc._server._Server.__del__
def __del__(self)
Definition: grpc/_server.py:992
grpc._server._ServerState.stage
stage
Definition: grpc/_server.py:780
message
char * message
Definition: libuv/docs/code/tty-gravity/main.c:12
grpc._server._Context.set_details
def set_details(self, details)
Definition: grpc/_server.py:334
grpc._server._abort
def _abort(state, call, code, details)
Definition: grpc/_server.py:154
grpc._server._stream_response_in_pool
def _stream_response_in_pool(rpc_event, state, behavior, argument_thunk, request_deserializer, response_serializer)
Definition: grpc/_server.py:571
grpc._server._Server.start
def start(self)
Definition: grpc/_server.py:978
grpc._server._take_response_from_response_iterator
def _take_response_from_response_iterator(rpc_event, state, response_iterator)
Definition: grpc/_server.py:459
grpc._server._RPCState.due
due
Definition: grpc/_server.py:94
grpc._server._Context.abort
def abort(self, code, details)
Definition: grpc/_server.py:310
grpc._server._Context.disable_next_message_compression
def disable_next_message_compression(self)
Definition: grpc/_server.py:261
max
int max
Definition: bloaty/third_party/zlib/examples/enough.c:170
send_message
Definition: send_message.py:1
grpc._server._details
def _details(state)
Definition: grpc/_server.py:78
grpc._server._handle_call
def _handle_call(rpc_event, generic_handlers, interceptor_pipeline, thread_pool, concurrency_exceeded)
Definition: grpc/_server.py:737
grpc._server._stop_serving
def _stop_serving(state)
Definition: grpc/_server.py:818
grpc._server._Context.__init__
def __init__(self, rpc_event, state, request_deserializer)
Definition: grpc/_server.py:238
grpc._server._unary_response_in_pool
def _unary_response_in_pool(rpc_event, state, behavior, argument_thunk, request_deserializer, response_serializer)
Definition: grpc/_server.py:554
grpc._server._Context._rpc_event
_rpc_event
Definition: grpc/_server.py:239
grpc._server._RPCState.request
request
Definition: grpc/_server.py:95
grpc._server._RPCState.callbacks
callbacks
Definition: grpc/_server.py:105
grpc._server._Server.wait_for_termination
def wait_for_termination(self, timeout=None)
Definition: grpc/_server.py:981
grpc._server._status
def _status(rpc_event, state, serialized_response)
Definition: grpc/_server.py:529
grpc._server._RequestIterator._next
def _next(self)
Definition: grpc/_server.py:377
grpc._server._Context.trailing_metadata
def trailing_metadata(self)
Definition: grpc/_server.py:307
grpc._server._raise_rpc_error
def _raise_rpc_error(state)
Definition: grpc/_server.py:109
grpc._server._ServerState.rpc_states
rpc_states
Definition: grpc/_server.py:787
grpc._server._Context.time_remaining
def time_remaining(self)
Definition: grpc/_server.py:247
grpc._server._add_secure_port
def _add_secure_port(state, address, server_credentials)
Definition: grpc/_server.py:805
grpc._server.create_server
def create_server(thread_pool, generic_rpc_handlers, interceptors, options, maximum_concurrent_rpcs, compression, xds)
Definition: grpc/_server.py:999
grpc._server._ServerState.maximum_concurrent_rpcs
maximum_concurrent_rpcs
Definition: grpc/_server.py:783
grpc._server._possibly_finish_call
def _possibly_finish_call(state, token)
Definition: grpc/_server.py:115
grpc._server._Context.is_active
def is_active(self)
Definition: grpc/_server.py:243
grpc._server._RPCState.client
client
Definition: grpc/_server.py:96
grpc._server._get_initial_metadata_operation
def _get_initial_metadata_operation(state, metadata)
Definition: grpc/_server.py:148
grpc._server._augment_options
def _augment_options(base_options, compression)
Definition: grpc/_server.py:947
callback
static void callback(void *arg, int status, int timeouts, struct hostent *host)
Definition: acountry.c:224
grpc._server._Context._state
_state
Definition: grpc/_server.py:240
grpc._server._request_call
def _request_call(state)
Definition: grpc/_server.py:811
grpc._server._begin_shutdown_once
def _begin_shutdown_once(state)
Definition: grpc/_server.py:891
grpc._server._ServerState.thread_pool
thread_pool
Definition: grpc/_server.py:779
grpc._server._Server._state
_state
Definition: grpc/_server.py:959
grpc._server._RPCState.initial_metadata_allowed
initial_metadata_allowed
Definition: grpc/_server.py:97
grpc._server._send_status_from_server
def _send_status_from_server(state, token)
Definition: grpc/_server.py:125
grpc._server._get_send_message_op_flags_from_state
def _get_send_message_op_flags_from_state(state)
Definition: grpc/_server.py:488
grpc._server._ServerState
Definition: grpc/_server.py:770
grpc._server._serialized_request
def _serialized_request(request_event)
Definition: grpc/_server.py:55
grpc._server._process_event_and_continue
def _process_event_and_continue(state, event)
Definition: grpc/_server.py:834
grpc._server._Server.__init__
def __init__(self, thread_pool, generic_handlers, interceptors, options, maximum_concurrent_rpcs, compression, xds)
Definition: grpc/_server.py:955
grpc._server._Context.add_callback
def add_callback(self, callback)
Definition: grpc/_server.py:253
grpc._server._ServerState.termination_event
termination_event
Definition: grpc/_server.py:781
grpc._server._Server.stop
def stop(self, grace)
Definition: grpc/_server.py:989
grpc._server._Server.add_secure_port
def add_secure_port(self, address, server_credentials)
Definition: grpc/_server.py:972
grpc._server._add_generic_handlers
def _add_generic_handlers(state, generic_handlers)
Definition: grpc/_server.py:795
grpc._server._ServerState.completion_queue
completion_queue
Definition: grpc/_server.py:775
grpc.ServicerContext
Definition: src/python/grpcio/grpc/__init__.py:1083
grpc._server._ServerState.server
server
Definition: grpc/_server.py:776
grpc._server._RPCState.statused
statused
Definition: grpc/_server.py:103
grpc._server._abortion_code
def _abortion_code(state, code)
Definition: grpc/_server.py:71
grpc._server._on_call_completed
def _on_call_completed(state)
Definition: grpc/_server.py:829
grpc._server._RequestIterator.__next__
def __next__(self)
Definition: grpc/_server.py:389
grpc._server._send_initial_metadata
def _send_initial_metadata(state)
Definition: grpc/_server.py:217
grpc._server._handle_stream_stream
def _handle_stream_stream(rpc_event, state, method_handler, default_thread_pool)
Definition: grpc/_server.py:666
grpc._server._send_response
def _send_response(rpc_event, state, serialized_response)
Definition: grpc/_server.py:500
grpc::Server
Definition: include/grpcpp/server.h:59
grpc._server._stop
def _stop(state, grace)
Definition: grpc/_server.py:899
grpc._server._ServerState.server_deallocated
server_deallocated
Definition: grpc/_server.py:791
grpc._server._is_rpc_state_active
def _is_rpc_state_active(state)
Definition: grpc/_server.py:606
grpc._server._RPCState.details
details
Definition: grpc/_server.py:102
grpc._server._RequestIterator._look_for_request
def _look_for_request(self)
Definition: grpc/_server.py:364
next
AllocList * next[kMaxLevel]
Definition: abseil-cpp/absl/base/internal/low_level_alloc.cc:100
grpc._server._RequestIterator.next
def next(self)
Definition: grpc/_server.py:392
grpc._server._RequestIterator
Definition: grpc/_server.py:345
grpc._server._call_behavior
def _call_behavior(rpc_event, state, behavior, argument, request_deserializer, send_response_callback=None)
Definition: grpc/_server.py:428
cpp.gmock_class.set
set
Definition: bloaty/third_party/googletest/googlemock/scripts/generator/cpp/gmock_class.py:44
grpc._server._RPCState.aborted
aborted
Definition: grpc/_server.py:106
grpc._server._Context.cancel
def cancel(self)
Definition: grpc/_server.py:250
grpc._server._RequestIterator.__iter__
def __iter__(self)
Definition: grpc/_server.py:386
grpc._server._ServerState.lock
lock
Definition: grpc/_server.py:774
grpc._server._ServerState.__init__
def __init__(self, completion_queue, server, generic_handlers, interceptor_pipeline, thread_pool, maximum_concurrent_rpcs)
Definition: grpc/_server.py:773
grpc._server._RequestIterator._raise_or_start_receive_message
def _raise_or_start_receive_message(self)
Definition: grpc/_server.py:352
grpc._server._get_initial_metadata
def _get_initial_metadata(state, metadata)
Definition: grpc/_server.py:134
grpc._server._Server
Definition: grpc/_server.py:952
grpc._server._Context.invocation_metadata
def invocation_metadata(self)
Definition: grpc/_server.py:265
grpc._server._reset_per_message_state
def _reset_per_message_state(state)
Definition: grpc/_server.py:495
grpc._cython
Definition: src/python/grpcio/grpc/_cython/__init__.py:1
grpc._server._RPCState.trailing_metadata
trailing_metadata
Definition: grpc/_server.py:100
grpc._server._Context.peer_identities
def peer_identities(self)
Definition: grpc/_server.py:271
grpc._server._serialize_response
def _serialize_response(rpc_event, state, response, response_serializer)
Definition: grpc/_server.py:477
grpc._server._send_message
def _send_message(state, token)
Definition: grpc/_server.py:226
grpc.HandlerCallDetails
Definition: src/python/grpcio/grpc/__init__.py:1324
grpc._server._RequestIterator.__init__
def __init__(self, state, call, request_deserializer)
Definition: grpc/_server.py:347
grpc._server._Context.auth_context
def auth_context(self)
Definition: grpc/_server.py:278
grpc._server._completion_code
def _completion_code(state)
Definition: grpc/_server.py:64
grpc._server._validate_generic_rpc_handlers
def _validate_generic_rpc_handlers(generic_rpc_handlers)
Definition: grpc/_server.py:938
grpc._server._Server.add_generic_rpc_handlers
def add_generic_rpc_handlers(self, generic_rpc_handlers)
Definition: grpc/_server.py:964
grpc._server._Context.peer_identity_key
def peer_identity_key(self)
Definition: grpc/_server.py:274
grpc._server._Context.abort_with_status
def abort_with_status(self, status)
Definition: grpc/_server.py:323
grpc._server._Context.details
def details(self)
Definition: grpc/_server.py:338
grpc._server._RPCState.code
code
Definition: grpc/_server.py:101
grpc._server._RPCState.compression_algorithm
compression_algorithm
Definition: grpc/_server.py:98
grpc._server._RPCState.__init__
def __init__(self)
Definition: grpc/_server.py:92
grpc._server._Context.set_compression
def set_compression(self, compression)
Definition: grpc/_server.py:284
grpc._server._Context.code
def code(self)
Definition: grpc/_server.py:331
grpc._server._RPCState
Definition: grpc/_server.py:90
grpc._server._Context
Definition: grpc/_server.py:236
grpc._server._RequestIterator._state
_state
Definition: grpc/_server.py:348
send_initial_metadata
static void send_initial_metadata(void)
Definition: test/core/fling/server.cc:121
grpc._server._Server.add_insecure_port
def add_insecure_port(self, address)
Definition: grpc/_server.py:968


grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:58:27