helloworld.py
Go to the documentation of this file.
1 # Copyright 2019 the gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """The Python implementation of the GRPC helloworld.Greeter client."""
15 
16 from concurrent import futures
17 import contextlib
18 import datetime
19 import logging
20 import unittest
21 
22 from google.protobuf import duration_pb2
23 from google.protobuf import timestamp_pb2
24 import grpc
25 import helloworld_pb2
26 import helloworld_pb2_grpc
27 
28 _HOST = 'localhost'
29 _SERVER_ADDRESS = '{}:0'.format(_HOST)
30 
31 
33 
34  def SayHello(self, request, context):
35  request_in_flight = datetime.datetime.now() - \
36  request.request_initiation.ToDatetime()
37  request_duration = duration_pb2.Duration()
38  request_duration.FromTimedelta(request_in_flight)
40  message='Hello, %s!' % request.name,
41  request_duration=request_duration,
42  )
43 
44 
45 @contextlib.contextmanager
47  server = grpc.server(futures.ThreadPoolExecutor())
49  port = server.add_insecure_port(_SERVER_ADDRESS)
50  server.start()
51  try:
52  yield port
53  finally:
54  server.stop(0)
55 
56 
57 class ImportTest(unittest.TestCase):
58 
59  def test_import(self):
60  with _listening_server() as port:
61  with grpc.insecure_channel('{}:{}'.format(_HOST, port)) as channel:
62  stub = helloworld_pb2_grpc.GreeterStub(channel)
63  request_timestamp = timestamp_pb2.Timestamp()
64  request_timestamp.GetCurrentTime()
65  response = stub.SayHello(helloworld_pb2.HelloRequest(
66  name='you',
67  request_initiation=request_timestamp,
68  ),
69  wait_for_ready=True)
70  self.assertEqual(response.message, "Hello, you!")
71  self.assertGreater(response.request_duration.nanos, 0)
72 
73 
74 if __name__ == '__main__':
75  logging.basicConfig()
76  unittest.main()
helloworld_pb2_grpc.GreeterStub
Definition: helloworld/helloworld_pb2_grpc.py:7
grpc.insecure_channel
def insecure_channel(target, options=None, compression=None)
Definition: src/python/grpcio/grpc/__init__.py:1962
http2_test_server.format
format
Definition: http2_test_server.py:118
helloworld_pb2.HelloRequest
HelloRequest
Definition: helloworld/helloworld_pb2.py:93
helloworld.ImportTest.test_import
def test_import(self)
Definition: helloworld.py:59
helloworld.Greeter.SayHello
def SayHello(self, request, context)
Definition: helloworld.py:34
google::protobuf
Definition: bloaty/third_party/protobuf/benchmarks/util/data_proto2_to_proto3_util.h:12
helloworld._listening_server
def _listening_server()
Definition: helloworld.py:46
helloworld_pb2.HelloReply
HelloReply
Definition: helloworld/helloworld_pb2.py:100
grpc.server
def server(thread_pool, handlers=None, interceptors=None, options=None, maximum_concurrent_rpcs=None, compression=None, xds=False)
Definition: src/python/grpcio/grpc/__init__.py:2034
helloworld.Greeter
Definition: helloworld.py:32
helloworld_pb2_grpc.GreeterServicer
Definition: helloworld/helloworld_pb2_grpc.py:24
helloworld.ImportTest
Definition: helloworld.py:57
helloworld_pb2_grpc.add_GreeterServicer_to_server
def add_GreeterServicer_to_server(servicer, server)
Definition: helloworld/helloworld_pb2_grpc.py:36


grpc
Author(s):
autogenerated on Thu Mar 13 2025 03:00:12