helloworld_moved.py
Go to the documentation of this file.
1 # Copyright 2019 the gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """The Python implementation of the GRPC helloworld.Greeter client."""
15 
16 from concurrent import futures
17 import contextlib
18 import datetime
19 import logging
20 import unittest
21 
22 # TODO(https://github.com/grpc/grpc/issues/29284)
23 # isort: off
24 import grpc
25 from google.protobuf import duration_pb2
26 from google.protobuf import timestamp_pb2
27 from google.cloud import helloworld_pb2
28 from google.cloud import helloworld_pb2_grpc
29 # isort: on
30 
31 _HOST = 'localhost'
32 _SERVER_ADDRESS = '{}:0'.format(_HOST)
33 
34 
36 
37  def SayHello(self, request, context):
38  request_in_flight = datetime.datetime.now() - \
39  request.request_initiation.ToDatetime()
40  request_duration = duration_pb2.Duration()
41  request_duration.FromTimedelta(request_in_flight)
43  message='Hello, %s!' % request.name,
44  request_duration=request_duration,
45  )
46 
47 
48 @contextlib.contextmanager
50  server = grpc.server(futures.ThreadPoolExecutor())
52  port = server.add_insecure_port(_SERVER_ADDRESS)
53  server.start()
54  try:
55  yield port
56  finally:
57  server.stop(0)
58 
59 
60 class ImportTest(unittest.TestCase):
61 
62  def test_import(self):
63  with _listening_server() as port:
64  with grpc.insecure_channel('{}:{}'.format(_HOST, port)) as channel:
65  stub = helloworld_pb2_grpc.GreeterStub(channel)
66  request_timestamp = timestamp_pb2.Timestamp()
67  request_timestamp.GetCurrentTime()
68  response = stub.SayHello(helloworld_pb2.HelloRequest(
69  name='you',
70  request_initiation=request_timestamp,
71  ),
72  wait_for_ready=True)
73  self.assertEqual(response.message, "Hello, you!")
74  self.assertGreater(response.request_duration.nanos, 0)
75 
76 
77 if __name__ == '__main__':
78  logging.basicConfig()
79  unittest.main()
helloworld_pb2_grpc.GreeterStub
Definition: helloworld/helloworld_pb2_grpc.py:7
grpc.insecure_channel
def insecure_channel(target, options=None, compression=None)
Definition: src/python/grpcio/grpc/__init__.py:1962
http2_test_server.format
format
Definition: http2_test_server.py:118
helloworld_pb2.HelloRequest
HelloRequest
Definition: helloworld/helloworld_pb2.py:93
helloworld_moved.Greeter
Definition: helloworld_moved.py:35
google::protobuf
Definition: bloaty/third_party/protobuf/benchmarks/util/data_proto2_to_proto3_util.h:12
helloworld_moved.ImportTest.test_import
def test_import(self)
Definition: helloworld_moved.py:62
helloworld_pb2.HelloReply
HelloReply
Definition: helloworld/helloworld_pb2.py:100
grpc.server
def server(thread_pool, handlers=None, interceptors=None, options=None, maximum_concurrent_rpcs=None, compression=None, xds=False)
Definition: src/python/grpcio/grpc/__init__.py:2034
helloworld_moved.ImportTest
Definition: helloworld_moved.py:60
helloworld_pb2_grpc.GreeterServicer
Definition: helloworld/helloworld_pb2_grpc.py:24
helloworld_moved._listening_server
def _listening_server()
Definition: helloworld_moved.py:49
helloworld_pb2_grpc.add_GreeterServicer_to_server
def add_GreeterServicer_to_server(servicer, server)
Definition: helloworld/helloworld_pb2_grpc.py:36
helloworld_moved.Greeter.SayHello
def SayHello(self, request, context)
Definition: helloworld_moved.py:37


grpc
Author(s):
autogenerated on Thu Mar 13 2025 03:00:12