protobuf/python/google/protobuf/internal/service_reflection_test.py
Go to the documentation of this file.
1 # Protocol Buffers - Google's data interchange format
2 # Copyright 2008 Google Inc. All rights reserved.
3 # https://developers.google.com/protocol-buffers/
4 #
5 # Redistribution and use in source and binary forms, with or without
6 # modification, are permitted provided that the following conditions are
7 # met:
8 #
9 # * Redistributions of source code must retain the above copyright
10 # notice, this list of conditions and the following disclaimer.
11 # * Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following disclaimer
13 # in the documentation and/or other materials provided with the
14 # distribution.
15 # * Neither the name of Google Inc. nor the names of its
16 # contributors may be used to endorse or promote products derived from
17 # this software without specific prior written permission.
18 #
19 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
20 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
21 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
22 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
23 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
26 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
27 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
28 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
29 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 
31 """Tests for google.protobuf.internal.service_reflection."""
32 
33 __author__ = 'petar@google.com (Petar Petrov)'
34 
35 
36 import unittest
37 
38 from google.protobuf import unittest_pb2
39 from google.protobuf import service_reflection
40 from google.protobuf import service
41 
42 
43 class FooUnitTest(unittest.TestCase):
44 
45  def testService(self):
46  class MockRpcChannel(service.RpcChannel):
47  def CallMethod(self, method, controller, request, response, callback):
48  self.method = method
49  self.controller = controller
50  self.request = request
51  callback(response)
52 
53  class MockRpcController(service.RpcController):
54  def SetFailed(self, msg):
55  self.failure_message = msg
56 
57  self.callback_response = None
58 
59  class MyService(unittest_pb2.TestService):
60  pass
61 
62  self.callback_response = None
63 
64  def MyCallback(response):
65  self.callback_response = response
66 
67  rpc_controller = MockRpcController()
68  channel = MockRpcChannel()
69  srvc = MyService()
70  srvc.Foo(rpc_controller, unittest_pb2.FooRequest(), MyCallback)
71  self.assertEqual('Method Foo not implemented.',
72  rpc_controller.failure_message)
73  self.assertEqual(None, self.callback_response)
74 
75  rpc_controller.failure_message = None
76 
77  service_descriptor = unittest_pb2.TestService.GetDescriptor()
78  srvc.CallMethod(service_descriptor.methods[1], rpc_controller,
79  unittest_pb2.BarRequest(), MyCallback)
80  self.assertTrue(srvc.GetRequestClass(service_descriptor.methods[1]) is
81  unittest_pb2.BarRequest)
82  self.assertTrue(srvc.GetResponseClass(service_descriptor.methods[1]) is
83  unittest_pb2.BarResponse)
84  self.assertEqual('Method Bar not implemented.',
85  rpc_controller.failure_message)
86  self.assertEqual(None, self.callback_response)
87 
88  class MyServiceImpl(unittest_pb2.TestService):
89  def Foo(self, rpc_controller, request, done):
90  self.foo_called = True
91  def Bar(self, rpc_controller, request, done):
92  self.bar_called = True
93 
94  srvc = MyServiceImpl()
95  rpc_controller.failure_message = None
96  srvc.Foo(rpc_controller, unittest_pb2.FooRequest(), MyCallback)
97  self.assertEqual(None, rpc_controller.failure_message)
98  self.assertEqual(True, srvc.foo_called)
99 
100  rpc_controller.failure_message = None
101  srvc.CallMethod(service_descriptor.methods[1], rpc_controller,
102  unittest_pb2.BarRequest(), MyCallback)
103  self.assertEqual(None, rpc_controller.failure_message)
104  self.assertEqual(True, srvc.bar_called)
105 
106  def testServiceStub(self):
107  class MockRpcChannel(service.RpcChannel):
108  def CallMethod(self, method, controller, request,
109  response_class, callback):
110  self.method = method
111  self.controller = controller
112  self.request = request
113  callback(response_class())
114 
115  self.callback_response = None
116 
117  def MyCallback(response):
118  self.callback_response = response
119 
120  channel = MockRpcChannel()
121  stub = unittest_pb2.TestService_Stub(channel)
122  rpc_controller = 'controller'
123  request = 'request'
124 
125  # GetDescriptor now static, still works as instance method for compatibility
126  self.assertEqual(unittest_pb2.TestService_Stub.GetDescriptor(),
127  stub.GetDescriptor())
128 
129  # Invoke method.
130  stub.Foo(rpc_controller, request, MyCallback)
131 
132  self.assertIsInstance(self.callback_response, unittest_pb2.FooResponse)
133  self.assertEqual(request, channel.request)
134  self.assertEqual(rpc_controller, channel.controller)
135  self.assertEqual(stub.GetDescriptor().methods[0], channel.method)
136 
137 
138 if __name__ == '__main__':
139  unittest.main()
google::protobuf.service.RpcController
Definition: third_party/bloaty/third_party/protobuf/python/google/protobuf/service.py:118
google::protobuf
Definition: bloaty/third_party/protobuf/benchmarks/util/data_proto2_to_proto3_util.h:12
google::protobuf.internal.service_reflection_test.FooUnitTest.controller
controller
Definition: bloaty/third_party/protobuf/python/google/protobuf/internal/service_reflection_test.py:54
google::protobuf.internal.service_reflection_test.FooUnitTest.testService
def testService(self)
Definition: bloaty/third_party/protobuf/python/google/protobuf/internal/service_reflection_test.py:50
Foo
Definition: abseil-cpp/absl/debugging/symbolize_test.cc:65
google::protobuf.internal.service_reflection_test.FooUnitTest.request
request
Definition: bloaty/third_party/protobuf/python/google/protobuf/internal/service_reflection_test.py:55
callback
static void callback(void *arg, int status, int timeouts, struct hostent *host)
Definition: acountry.c:224
google::protobuf.internal.service_reflection_test.FooUnitTest.failure_message
failure_message
Definition: bloaty/third_party/protobuf/python/google/protobuf/internal/service_reflection_test.py:60
google::protobuf.service.RpcChannel
Definition: third_party/bloaty/third_party/protobuf/python/google/protobuf/service.py:201
google::protobuf.internal.service_reflection_test.FooUnitTest.testServiceStub
def testServiceStub(self)
Definition: bloaty/third_party/protobuf/python/google/protobuf/internal/service_reflection_test.py:111
google::protobuf.internal.service_reflection_test.FooUnitTest.method
method
Definition: bloaty/third_party/protobuf/python/google/protobuf/internal/service_reflection_test.py:53
google::protobuf.internal.service_reflection_test.FooUnitTest.callback_response
callback_response
Definition: bloaty/third_party/protobuf/python/google/protobuf/internal/service_reflection_test.py:62


grpc
Author(s):
autogenerated on Fri May 16 2025 03:00:12