_channel_args_test.py
Go to the documentation of this file.
1 # Copyright 2016 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Tests of channel arguments on client/server side."""
15 
16 from concurrent import futures
17 import logging
18 import unittest
19 
20 import grpc
21 
22 
23 class TestPointerWrapper(object):
24 
25  def __int__(self):
26  return 123456
27 
28 
29 TEST_CHANNEL_ARGS = (
30  ('arg1', b'bytes_val'),
31  ('arg2', 'str_val'),
32  ('arg3', 1),
33  (b'arg4', 'str_val'),
34  ('arg6', TestPointerWrapper()),
35 )
36 
37 INVALID_TEST_CHANNEL_ARGS = [
38  {
39  'foo': 'bar'
40  },
41  (('key',),),
42  'str',
43 ]
44 
45 
46 class ChannelArgsTest(unittest.TestCase):
47 
48  def test_client(self):
49  grpc.insecure_channel('localhost:8080', options=TEST_CHANNEL_ARGS)
50 
51  def test_server(self):
52  grpc.server(futures.ThreadPoolExecutor(max_workers=1),
53  options=TEST_CHANNEL_ARGS)
54 
56  for invalid_arg in INVALID_TEST_CHANNEL_ARGS:
57  self.assertRaises(ValueError,
58  grpc.insecure_channel,
59  'localhost:8080',
60  options=invalid_arg)
61 
62 
63 if __name__ == '__main__':
64  logging.basicConfig()
65  unittest.main(verbosity=2)
tests.unit._channel_args_test.ChannelArgsTest.test_server
def test_server(self)
Definition: _channel_args_test.py:51
grpc.insecure_channel
def insecure_channel(target, options=None, compression=None)
Definition: src/python/grpcio/grpc/__init__.py:1962
tests.unit._channel_args_test.ChannelArgsTest.test_client
def test_client(self)
Definition: _channel_args_test.py:48
tests.unit._channel_args_test.ChannelArgsTest
Definition: _channel_args_test.py:46
grpc.server
def server(thread_pool, handlers=None, interceptors=None, options=None, maximum_concurrent_rpcs=None, compression=None, xds=False)
Definition: src/python/grpcio/grpc/__init__.py:2034
tests.unit._channel_args_test.ChannelArgsTest.test_invalid_client_args
def test_invalid_client_args(self)
Definition: _channel_args_test.py:55
tests.unit._channel_args_test.TestPointerWrapper.__int__
def __int__(self)
Definition: _channel_args_test.py:25
tests.unit._channel_args_test.TestPointerWrapper
Definition: _channel_args_test.py:23


grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:58:27