examples/python/async_streaming/server.py
Go to the documentation of this file.
1 # Copyright 2020 The gRPC Authors
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 
15 from concurrent.futures import ThreadPoolExecutor
16 import logging
17 import threading
18 import time
19 from typing import Iterable
20 
21 from google.protobuf.json_format import MessageToJson
22 import grpc
23 
24 import phone_pb2
25 import phone_pb2_grpc
26 
27 
29  call_state: phone_pb2.CallState.State) -> phone_pb2.StreamCallResponse:
30  response = phone_pb2.StreamCallResponse()
31  response.call_state.state = call_state
32  return response
33 
34 
36 
37  def __init__(self):
38  self._id_counter = 0
39  self._lock = threading.RLock()
40 
41  def _create_call_session(self) -> phone_pb2.CallInfo:
42  call_info = phone_pb2.CallInfo()
43  with self._lock:
44  call_info.session_id = str(self._id_counter)
45  self._id_counter += 1
46  call_info.media = "https://link.to.audio.resources"
47  logging.info("Created a call session [%s]", MessageToJson(call_info))
48  return call_info
49 
50  def _clean_call_session(self, call_info: phone_pb2.CallInfo) -> None:
51  logging.info("Call session cleaned [%s]", MessageToJson(call_info))
52 
54  self, request_iterator: Iterable[phone_pb2.StreamCallRequest],
55  context: grpc.ServicerContext
56  ) -> Iterable[phone_pb2.StreamCallResponse]:
57  try:
58  request = next(request_iterator)
59  logging.info("Received a phone call request for number [%s]",
60  request.phone_number)
61  except StopIteration:
62  raise RuntimeError("Failed to receive call request")
63  # Simulate the acceptance of call request
64  time.sleep(1)
65  yield create_state_response(phone_pb2.CallState.NEW)
66  # Simulate the start of the call session
67  time.sleep(1)
68  call_info = self._create_call_session()
69  context.add_callback(lambda: self._clean_call_session(call_info))
70  response = phone_pb2.StreamCallResponse()
71  response.call_info.session_id = call_info.session_id
72  response.call_info.media = call_info.media
73  yield response
74  yield create_state_response(phone_pb2.CallState.ACTIVE)
75  # Simulate the end of the call
76  time.sleep(2)
77  yield create_state_response(phone_pb2.CallState.ENDED)
78  logging.info("Call finished [%s]", request.phone_number)
79 
80 
81 def serve(address: str) -> None:
82  server = grpc.server(ThreadPoolExecutor())
84  server.add_insecure_port(address)
85  server.start()
86  logging.info("Server serving at %s", address)
87  server.wait_for_termination()
88 
89 
90 if __name__ == "__main__":
91  logging.basicConfig(level=logging.INFO)
92  serve("[::]:50051")
google::protobuf.json_format
Definition: bloaty/third_party/protobuf/python/google/protobuf/json_format.py:1
xds_interop_client.str
str
Definition: xds_interop_client.py:487
server.Phone
Definition: examples/python/async_streaming/server.py:35
phone_pb2_grpc.PhoneServicer
Definition: phone_pb2_grpc.py:23
server.Phone.StreamCall
Iterable[phone_pb2.StreamCallResponse] StreamCall(self, Iterable[phone_pb2.StreamCallRequest] request_iterator, grpc.ServicerContext context)
Definition: examples/python/async_streaming/server.py:53
server.Phone.__init__
def __init__(self)
Definition: examples/python/async_streaming/server.py:37
server.Phone._clean_call_session
None _clean_call_session(self, phone_pb2.CallInfo call_info)
Definition: examples/python/async_streaming/server.py:50
server.Phone._create_call_session
phone_pb2.CallInfo _create_call_session(self)
Definition: examples/python/async_streaming/server.py:41
server.create_state_response
phone_pb2.StreamCallResponse create_state_response(phone_pb2.CallState.State call_state)
Definition: examples/python/async_streaming/server.py:28
server.Phone._id_counter
_id_counter
Definition: examples/python/async_streaming/server.py:38
grpc.server
def server(thread_pool, handlers=None, interceptors=None, options=None, maximum_concurrent_rpcs=None, compression=None, xds=False)
Definition: src/python/grpcio/grpc/__init__.py:2034
grpc.ServicerContext
Definition: src/python/grpcio/grpc/__init__.py:1083
google::protobuf.json_format.MessageToJson
def MessageToJson(message, including_default_value_fields=False, preserving_proto_field_name=False, indent=2, sort_keys=False, use_integers_for_enums=False, descriptor_pool=None)
Definition: bloaty/third_party/protobuf/python/google/protobuf/json_format.py:99
server.serve
None serve(str address)
Definition: examples/python/async_streaming/server.py:81
phone_pb2_grpc.add_PhoneServicer_to_server
def add_PhoneServicer_to_server(servicer, server)
Definition: phone_pb2_grpc.py:34
next
AllocList * next[kMaxLevel]
Definition: abseil-cpp/absl/base/internal/low_level_alloc.cc:100
phone_pb2.CallInfo
CallInfo
Definition: phone_pb2.py:214
phone_pb2.StreamCallResponse
StreamCallResponse
Definition: phone_pb2.py:235
server.Phone._lock
_lock
Definition: examples/python/async_streaming/server.py:39


grpc
Author(s):
autogenerated on Thu Mar 13 2025 03:01:16