14 """Translates gRPC's server-side API into gRPC's server-side Beta API."""
20 from grpc
import _common
32 _DEFAULT_POOL_SIZE = 8
59 raise NotImplementedError(
60 'add_abortion_callback no longer supported server-side!')
73 _metadata.unbeta(initial_metadata))
77 _metadata.unbeta(terminal_metadata))
88 def adaptation(request, servicer_context):
89 return unary_request_inline(request,
97 def adaptation(request_iterator, servicer_context):
98 return stream_request_inline(request_iterator,
137 raise abandonment.Abandoned()
149 raise abandonment.Abandoned()
151 all_values = tuple(self.
_values)
160 thread_joined = threading.Event()
163 for request
in request_iterator:
164 if not servicer_context.is_active()
or thread_joined.is_set():
166 request_consumer.consume(request)
167 if not servicer_context.is_active()
or thread_joined.is_set():
169 request_consumer.terminate()
171 request_pipe_thread = threading.Thread(target=pipe_requests)
172 request_pipe_thread.daemon =
True
173 request_pipe_thread.start()
178 def adaptation(request, servicer_context):
180 if not servicer_context.add_callback(callback.cancel):
181 raise abandonment.Abandoned()
184 return callback.draw_all_values()[0]
191 def adaptation(request, servicer_context):
193 if not servicer_context.add_callback(callback.cancel):
194 raise abandonment.Abandoned()
198 response = callback.draw_one_value()
209 def adaptation(request_iterator, servicer_context):
211 if not servicer_context.add_callback(callback.cancel):
212 raise abandonment.Abandoned()
214 callback.consume_and_terminate,
218 return callback.draw_all_values()[0]
225 def adaptation(request_iterator, servicer_context):
227 if not servicer_context.add_callback(callback.cancel):
228 raise abandonment.Abandoned()
234 response = callback.draw_one_value()
244 collections.namedtuple(
'_MethodHandler', (
246 'response_streaming',
247 'request_deserializer',
248 'response_serializer',
258 response_serializer):
259 if implementation.style
is style.Service.INLINE:
260 if implementation.cardinality
is cardinality.Cardinality.UNARY_UNARY:
262 False,
False, request_deserializer, response_serializer,
265 elif implementation.cardinality
is cardinality.Cardinality.UNARY_STREAM:
267 False,
True, request_deserializer, response_serializer,
None,
270 elif implementation.cardinality
is cardinality.Cardinality.STREAM_UNARY:
272 True,
False, request_deserializer, response_serializer,
None,
275 implementation.stream_unary_inline),
None)
276 elif implementation.cardinality
is cardinality.Cardinality.STREAM_STREAM:
278 True,
True, request_deserializer, response_serializer,
None,
281 implementation.stream_stream_inline))
282 elif implementation.style
is style.Service.EVENT:
283 if implementation.cardinality
is cardinality.Cardinality.UNARY_UNARY:
285 False,
False, request_deserializer, response_serializer,
288 elif implementation.cardinality
is cardinality.Cardinality.UNARY_STREAM:
290 False,
True, request_deserializer, response_serializer,
None,
293 elif implementation.cardinality
is cardinality.Cardinality.STREAM_UNARY:
295 True,
False, request_deserializer, response_serializer,
None,
299 elif implementation.cardinality
is cardinality.Cardinality.STREAM_STREAM:
301 True,
True, request_deserializer, response_serializer,
None,
308 method_pair_map = method_pair_map
or {}
310 for method_pair
in method_pair_map:
311 method = _common.fully_qualified_method(method_pair[0], method_pair[1])
312 flat_map[method] = method_pair_map[method_pair]
318 def __init__(self, method_implementations, multi_method_implementation,
319 request_deserializers, response_serializers):
321 method_implementations)
323 request_deserializers)
325 response_serializers)
330 handler_call_details.method)
331 if method_implementation
is not None:
333 method_implementation,
341 except face.NoSuchMethodError:
371 def server(service_implementations, multi_method_implementation,
372 request_deserializers, response_serializers, thread_pool,
375 multi_method_implementation,
376 request_deserializers,
377 response_serializers)
378 if thread_pool
is None:
379 effective_thread_pool = logging_pool.pool(
380 _DEFAULT_POOL_SIZE
if thread_pool_size
is None else thread_pool_size
383 effective_thread_pool = thread_pool
385 grpc.server(effective_thread_pool, handlers=(generic_rpc_handler,)))