google/protobuf/internal/service_reflection_test.py
Go to the documentation of this file.
1 #! /usr/bin/env python
2 #
3 # Protocol Buffers - Google's data interchange format
4 # Copyright 2008 Google Inc. All rights reserved.
5 # https://developers.google.com/protocol-buffers/
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions are
9 # met:
10 #
11 # * Redistributions of source code must retain the above copyright
12 # notice, this list of conditions and the following disclaimer.
13 # * Redistributions in binary form must reproduce the above
14 # copyright notice, this list of conditions and the following disclaimer
15 # in the documentation and/or other materials provided with the
16 # distribution.
17 # * Neither the name of Google Inc. nor the names of its
18 # contributors may be used to endorse or promote products derived from
19 # this software without specific prior written permission.
20 #
21 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32 
33 """Tests for google.protobuf.internal.service_reflection."""
34 
35 __author__ = 'petar@google.com (Petar Petrov)'
36 
37 
38 try:
39  import unittest2 as unittest #PY26
40 except ImportError:
41  import unittest
42 
43 from google.protobuf import unittest_pb2
44 from google.protobuf import service_reflection
45 from google.protobuf import service
46 
47 
48 class FooUnitTest(unittest.TestCase):
49 
50  def testService(self):
51  class MockRpcChannel(service.RpcChannel):
52  def CallMethod(self, method, controller, request, response, callback):
53  self.method = method
54  self.controller = controller
55  self.request = request
56  callback(response)
57 
58  class MockRpcController(service.RpcController):
59  def SetFailed(self, msg):
60  self.failure_message = msg
61 
62  self.callback_response = None
63 
64  class MyService(unittest_pb2.TestService):
65  pass
66 
67  self.callback_response = None
68 
69  def MyCallback(response):
70  self.callback_response = response
71 
72  rpc_controller = MockRpcController()
73  channel = MockRpcChannel()
74  srvc = MyService()
75  srvc.Foo(rpc_controller, unittest_pb2.FooRequest(), MyCallback)
76  self.assertEqual('Method Foo not implemented.',
77  rpc_controller.failure_message)
78  self.assertEqual(None, self.callback_response)
79 
80  rpc_controller.failure_message = None
81 
82  service_descriptor = unittest_pb2.TestService.GetDescriptor()
83  srvc.CallMethod(service_descriptor.methods[1], rpc_controller,
84  unittest_pb2.BarRequest(), MyCallback)
85  self.assertTrue(srvc.GetRequestClass(service_descriptor.methods[1]) is
86  unittest_pb2.BarRequest)
87  self.assertTrue(srvc.GetResponseClass(service_descriptor.methods[1]) is
88  unittest_pb2.BarResponse)
89  self.assertEqual('Method Bar not implemented.',
90  rpc_controller.failure_message)
91  self.assertEqual(None, self.callback_response)
92 
93  class MyServiceImpl(unittest_pb2.TestService):
94  def Foo(self, rpc_controller, request, done):
95  self.foo_called = True
96  def Bar(self, rpc_controller, request, done):
97  self.bar_called = True
98 
99  srvc = MyServiceImpl()
100  rpc_controller.failure_message = None
101  srvc.Foo(rpc_controller, unittest_pb2.FooRequest(), MyCallback)
102  self.assertEqual(None, rpc_controller.failure_message)
103  self.assertEqual(True, srvc.foo_called)
104 
105  rpc_controller.failure_message = None
106  srvc.CallMethod(service_descriptor.methods[1], rpc_controller,
107  unittest_pb2.BarRequest(), MyCallback)
108  self.assertEqual(None, rpc_controller.failure_message)
109  self.assertEqual(True, srvc.bar_called)
110 
111  def testServiceStub(self):
112  class MockRpcChannel(service.RpcChannel):
113  def CallMethod(self, method, controller, request,
114  response_class, callback):
115  self.method = method
116  self.controller = controller
117  self.request = request
118  callback(response_class())
119 
120  self.callback_response = None
121 
122  def MyCallback(response):
123  self.callback_response = response
124 
125  channel = MockRpcChannel()
126  stub = unittest_pb2.TestService_Stub(channel)
127  rpc_controller = 'controller'
128  request = 'request'
129 
130  # GetDescriptor now static, still works as instance method for compatibility
131  self.assertEqual(unittest_pb2.TestService_Stub.GetDescriptor(),
132  stub.GetDescriptor())
133 
134  # Invoke method.
135  stub.Foo(rpc_controller, request, MyCallback)
136 
137  self.assertIsInstance(self.callback_response, unittest_pb2.FooResponse)
138  self.assertEqual(request, channel.request)
139  self.assertEqual(rpc_controller, channel.controller)
140  self.assertEqual(stub.GetDescriptor().methods[0], channel.method)
141 
142 
143 if __name__ == '__main__':
144  unittest.main()
google::protobuf.service.RpcController
Definition: service.py:118
google::protobuf
Definition: data_proto2_to_proto3_util.h:12
google::protobuf.internal.service_reflection_test.FooUnitTest.controller
controller
Definition: google/protobuf/internal/service_reflection_test.py:54
google::protobuf.internal.service_reflection_test.FooUnitTest.testService
def testService(self)
Definition: google/protobuf/internal/service_reflection_test.py:50
google::protobuf.internal.service_reflection_test.FooUnitTest.request
request
Definition: google/protobuf/internal/service_reflection_test.py:55
google::protobuf.internal.service_reflection_test.FooUnitTest.failure_message
failure_message
Definition: google/protobuf/internal/service_reflection_test.py:60
google::protobuf.internal.service_reflection_test.FooUnitTest.foo_called
foo_called
Definition: google/protobuf/internal/service_reflection_test.py:95
google::protobuf.service.RpcChannel
Definition: service.py:201
google::protobuf.internal.service_reflection_test.FooUnitTest.testServiceStub
def testServiceStub(self)
Definition: google/protobuf/internal/service_reflection_test.py:111
google::protobuf.internal.service_reflection_test.FooUnitTest.bar_called
bar_called
Definition: google/protobuf/internal/service_reflection_test.py:97
google::protobuf.internal.service_reflection_test.FooUnitTest
Definition: google/protobuf/internal/service_reflection_test.py:48
google::protobuf.internal.service_reflection_test.FooUnitTest.method
method
Definition: google/protobuf/internal/service_reflection_test.py:53
google::protobuf.internal.service_reflection_test.FooUnitTest.callback_response
callback_response
Definition: google/protobuf/internal/service_reflection_test.py:62


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:58