examples/python/async_streaming/client.py
Go to the documentation of this file.
1 # Copyright 2020 The gRPC Authors
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 
15 from concurrent.futures import ThreadPoolExecutor
16 import logging
17 import threading
18 from typing import Iterator
19 
20 import grpc
21 
22 import phone_pb2
23 import phone_pb2_grpc
24 
25 
26 class CallMaker:
27 
28  def __init__(self, executor: ThreadPoolExecutor, channel: grpc.Channel,
29  phone_number: str) -> None:
30  self._executor = executor
31  self._channel = channel
33  self._phone_number = phone_number
34  self._session_id = None
35  self._audio_session_link = None
36  self._call_state = None
37  self._peer_responded = threading.Event()
38  self._call_finished = threading.Event()
39  self._consumer_future = None
40 
42  self,
43  response_iterator: Iterator[phone_pb2.StreamCallResponse]) -> None:
44  try:
45  for response in response_iterator:
46  # NOTE: All fields in Proto3 are optional. This is the recommended way
47  # to check if a field is present or not, or to exam which one-of field is
48  # fulfilled by this message.
49  if response.HasField("call_info"):
50  self._on_call_info(response.call_info)
51  elif response.HasField("call_state"):
52  self._on_call_state(response.call_state.state)
53  else:
54  raise RuntimeError(
55  "Received StreamCallResponse without call_info and call_state"
56  )
57  except Exception as e:
58  self._peer_responded.set()
59  raise
60 
61  def _on_call_info(self, call_info: phone_pb2.CallInfo) -> None:
62  self._session_id = call_info.session_id
63  self._audio_session_link = call_info.media
64 
65  def _on_call_state(self, call_state: phone_pb2.CallState.State) -> None:
66  logging.info("Call toward [%s] enters [%s] state", self._phone_number,
67  phone_pb2.CallState.State.Name(call_state))
68  self._call_state = call_state
69  if call_state == phone_pb2.CallState.State.ACTIVE:
70  self._peer_responded.set()
71  if call_state == phone_pb2.CallState.State.ENDED:
72  self._peer_responded.set()
73  self._call_finished.set()
74 
75  def call(self) -> None:
76  request = phone_pb2.StreamCallRequest()
77  request.phone_number = self._phone_number
78  response_iterator = self._stub.StreamCall(iter((request,)))
79  # Instead of consuming the response on current thread, spawn a consumption thread.
80  self._consumer_future = self._executor.submit(self._response_watcher,
81  response_iterator)
82 
83  def wait_peer(self) -> bool:
84  logging.info("Waiting for peer to connect [%s]...", self._phone_number)
85  self._peer_responded.wait(timeout=None)
86  if self._consumer_future.done():
87  # If the future raises, forwards the exception here
89  return self._call_state == phone_pb2.CallState.State.ACTIVE
90 
91  def audio_session(self) -> None:
92  assert self._audio_session_link is not None
93  logging.info("Consuming audio resource [%s]", self._audio_session_link)
94  self._call_finished.wait(timeout=None)
95  logging.info("Audio session finished [%s]", self._audio_session_link)
96 
97 
98 def process_call(executor: ThreadPoolExecutor, channel: grpc.Channel,
99  phone_number: str) -> None:
100  call_maker = CallMaker(executor, channel, phone_number)
101  call_maker.call()
102  if call_maker.wait_peer():
103  call_maker.audio_session()
104  logging.info("Call finished!")
105  else:
106  logging.info("Call failed: peer didn't answer")
107 
108 
109 def run():
110  executor = ThreadPoolExecutor()
111  with grpc.insecure_channel("localhost:50051") as channel:
112  future = executor.submit(process_call, executor, channel,
113  "555-0100-XXXX")
114  future.result()
115 
116 
117 if __name__ == '__main__':
118  logging.basicConfig(level=logging.INFO)
119  run()
_gevent_test_main.result
result
Definition: _gevent_test_main.py:96
grpc.insecure_channel
def insecure_channel(target, options=None, compression=None)
Definition: src/python/grpcio/grpc/__init__.py:1962
client.CallMaker._channel
_channel
Definition: examples/python/async_streaming/client.py:30
phone_pb2_grpc.PhoneStub
Definition: phone_pb2_grpc.py:7
phone_pb2.StreamCallRequest
StreamCallRequest
Definition: phone_pb2.py:228
client.CallMaker._session_id
_session_id
Definition: examples/python/async_streaming/client.py:33
client.CallMaker._audio_session_link
_audio_session_link
Definition: examples/python/async_streaming/client.py:34
client.CallMaker._response_watcher
None _response_watcher(self, Iterator[phone_pb2.StreamCallResponse] response_iterator)
Definition: examples/python/async_streaming/client.py:41
grpc::Channel
Channels represent a connection to an endpoint. Created by CreateChannel.
Definition: include/grpcpp/channel.h:54
client.CallMaker.audio_session
None audio_session(self)
Definition: examples/python/async_streaming/client.py:91
client.CallMaker._consumer_future
_consumer_future
Definition: examples/python/async_streaming/client.py:38
done
struct tab * done
Definition: bloaty/third_party/zlib/examples/enough.c:176
client.CallMaker._phone_number
_phone_number
Definition: examples/python/async_streaming/client.py:32
client.CallMaker
Definition: examples/python/async_streaming/client.py:26
client.CallMaker._executor
_executor
Definition: examples/python/async_streaming/client.py:29
client.CallMaker.call
None call(self)
Definition: examples/python/async_streaming/client.py:75
client.CallMaker._peer_responded
_peer_responded
Definition: examples/python/async_streaming/client.py:36
client.CallMaker._on_call_info
None _on_call_info(self, phone_pb2.CallInfo call_info)
Definition: examples/python/async_streaming/client.py:61
client.run
def run()
Definition: examples/python/async_streaming/client.py:109
client.CallMaker.__init__
None __init__(self, ThreadPoolExecutor executor, grpc.Channel channel, str phone_number)
Definition: examples/python/async_streaming/client.py:28
client.CallMaker._call_state
_call_state
Definition: examples/python/async_streaming/client.py:35
cpp.gmock_class.set
set
Definition: bloaty/third_party/googletest/googlemock/scripts/generator/cpp/gmock_class.py:44
client.CallMaker._stub
_stub
Definition: examples/python/async_streaming/client.py:31
wait
static void wait(notification *n)
Definition: alts_tsi_handshaker_test.cc:114
iter
Definition: test_winkernel.cpp:47
client.process_call
None process_call(ThreadPoolExecutor executor, grpc.Channel channel, str phone_number)
Definition: examples/python/async_streaming/client.py:98
client.CallMaker._call_finished
_call_finished
Definition: examples/python/async_streaming/client.py:37
client.CallMaker._on_call_state
None _on_call_state(self, phone_pb2.CallState.State call_state)
Definition: examples/python/async_streaming/client.py:65
client.CallMaker.wait_peer
bool wait_peer(self)
Definition: examples/python/async_streaming/client.py:83


grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:58:46