port_server.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 # Copyright 2015 gRPC authors.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 """Manage TCP ports for unit tests; started by run_tests.py"""
16 
17 from __future__ import print_function
18 
19 import argparse
20 import hashlib
21 import os
22 import platform
23 import random
24 import socket
25 import sys
26 import threading
27 import time
28 
29 from six.moves.BaseHTTPServer import BaseHTTPRequestHandler
30 from six.moves.BaseHTTPServer import HTTPServer
31 from six.moves.socketserver import ThreadingMixIn
32 
33 # increment this number whenever making a change to ensure that
34 # the changes are picked up by running CI servers
35 # note that all changes must be backwards compatible
36 _MY_VERSION = 21
37 
38 if len(sys.argv) == 2 and sys.argv[1] == 'dump_version':
39  print(_MY_VERSION)
40  sys.exit(0)
41 
42 argp = argparse.ArgumentParser(description='Server for httpcli_test')
43 argp.add_argument('-p', '--port', default=12345, type=int)
44 argp.add_argument('-l', '--logfile', default=None, type=str)
45 args = argp.parse_args()
46 
47 if args.logfile is not None:
48  sys.stdin.close()
49  sys.stderr.close()
50  sys.stdout.close()
51  sys.stderr = open(args.logfile, 'w')
52  sys.stdout = sys.stderr
53 
54 print('port server running on port %d' % args.port)
55 
56 pool = []
57 in_use = {}
58 mu = threading.Lock()
59 
60 # Cronet restricts the following ports to be used (see
61 # https://cs.chromium.org/chromium/src/net/base/port_util.cc). When one of these
62 # ports is used in a Cronet test, the test would fail (see issue #12149). These
63 # ports must be excluded from pool.
64 cronet_restricted_ports = [
65  1, 7, 9, 11, 13, 15, 17, 19, 20, 21, 22, 23, 25, 37, 42, 43, 53, 77, 79, 87,
66  95, 101, 102, 103, 104, 109, 110, 111, 113, 115, 117, 119, 123, 135, 139,
67  143, 179, 389, 465, 512, 513, 514, 515, 526, 530, 531, 532, 540, 556, 563,
68  587, 601, 636, 993, 995, 2049, 3659, 4045, 6000, 6665, 6666, 6667, 6668,
69  6669, 6697
70 ]
71 
72 
73 def can_connect(port):
74  # this test is only really useful on unices where SO_REUSE_PORT is available
75  # so on Windows, where this test is expensive, skip it
76  if platform.system() == 'Windows':
77  return False
78  s = socket.socket()
79  try:
80  s.connect(('localhost', port))
81  return True
82  except socket.error as e:
83  return False
84  finally:
85  s.close()
86 
87 
88 def can_bind(port, proto):
89  s = socket.socket(proto, socket.SOCK_STREAM)
90  s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
91  try:
92  s.bind(('localhost', port))
93  return True
94  except socket.error as e:
95  return False
96  finally:
97  s.close()
98 
99 
100 def refill_pool(max_timeout, req):
101  """Scan for ports not marked for being in use"""
102  chk = [
103  port for port in range(1025, 32766)
104  if port not in cronet_restricted_ports
105  ]
106  random.shuffle(chk)
107  for i in chk:
108  if len(pool) > 100:
109  break
110  if i in in_use:
111  age = time.time() - in_use[i]
112  if age < max_timeout:
113  continue
114  req.log_message("kill old request %d" % i)
115  del in_use[i]
116  if can_bind(i, socket.AF_INET) and can_bind(
117  i, socket.AF_INET6) and not can_connect(i):
118  req.log_message("found available port %d" % i)
119  pool.append(i)
120 
121 
122 def allocate_port(req):
123  global pool
124  global in_use
125  global mu
126  mu.acquire()
127  max_timeout = 600
128  while not pool:
129  refill_pool(max_timeout, req)
130  if not pool:
131  req.log_message("failed to find ports: retrying soon")
132  mu.release()
133  time.sleep(1)
134  mu.acquire()
135  max_timeout /= 2
136  port = pool[0]
137  pool = pool[1:]
138  in_use[port] = time.time()
139  mu.release()
140  return port
141 
142 
143 keep_running = True
144 
145 
146 class Handler(BaseHTTPRequestHandler):
147 
148  def setup(self):
149  # If the client is unreachable for 5 seconds, close the connection
150  self.timeout = 5
151  BaseHTTPRequestHandler.setup(self)
152 
153  def do_GET(self):
154  global keep_running
155  global mu
156  if self.path == '/get':
157  # allocate a new port, it will stay bound for ten minutes and until
158  # it's unused
159  self.send_response(200)
160  self.send_header('Content-Type', 'text/plain')
161  self.end_headers()
162  p = allocate_port(self)
163  self.log_message('allocated port %d' % p)
164  self.wfile.write(str(p).encode('ascii'))
165  elif self.path[0:6] == '/drop/':
166  self.send_response(200)
167  self.send_header('Content-Type', 'text/plain')
168  self.end_headers()
169  p = int(self.path[6:])
170  mu.acquire()
171  if p in in_use:
172  del in_use[p]
173  pool.append(p)
174  k = 'known'
175  else:
176  k = 'unknown'
177  mu.release()
178  self.log_message('drop %s port %d' % (k, p))
179  elif self.path == '/version_number':
180  # fetch a version string and the current process pid
181  self.send_response(200)
182  self.send_header('Content-Type', 'text/plain')
183  self.end_headers()
184  self.wfile.write(str(_MY_VERSION).encode('ascii'))
185  elif self.path == '/dump':
186  # yaml module is not installed on Macs and Windows machines by default
187  # so we import it lazily (/dump action is only used for debugging)
188  import yaml
189  self.send_response(200)
190  self.send_header('Content-Type', 'text/plain')
191  self.end_headers()
192  mu.acquire()
193  now = time.time()
194  out = yaml.dump({
195  'pool': pool,
196  'in_use': dict((k, now - v) for k, v in list(in_use.items()))
197  })
198  mu.release()
199  self.wfile.write(out.encode('ascii'))
200  elif self.path == '/quitquitquit':
201  self.send_response(200)
202  self.end_headers()
203  self.server.shutdown()
204 
205 
206 class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
207  """Handle requests in a separate thread"""
208 
209 
210 ThreadedHTTPServer(('', args.port), Handler).serve_forever()
xds_interop_client.str
str
Definition: xds_interop_client.py:487
write
#define write
Definition: test-fs.c:47
capstone.range
range
Definition: third_party/bloaty/third_party/capstone/bindings/python/capstone/__init__.py:6
python_utils.port_server.Handler.setup
def setup(self)
Definition: port_server.py:148
python_utils.port_server.ThreadedHTTPServer
Definition: port_server.py:206
grpc._common.encode
def encode(s)
Definition: grpc/_common.py:68
python_utils.port_server.Handler
Definition: port_server.py:146
python_utils.port_server.Handler.timeout
timeout
Definition: port_server.py:150
xds_interop_client.int
int
Definition: xds_interop_client.py:113
python_utils.port_server.can_bind
def can_bind(port, proto)
Definition: port_server.py:88
python_utils.port_server.refill_pool
def refill_pool(max_timeout, req)
Definition: port_server.py:100
python_utils.port_server.Handler.do_GET
def do_GET(self)
Definition: port_server.py:153
python_utils.port_server.Handler.path
path
Definition: port_server.py:156
python_utils.port_server.allocate_port
def allocate_port(req)
Definition: port_server.py:122
open
#define open
Definition: test-fs.c:46
python_utils.port_server.can_connect
def can_connect(port)
Definition: port_server.py:73
len
int len
Definition: abseil-cpp/absl/base/internal/low_level_alloc_test.cc:46


grpc
Author(s):
autogenerated on Thu Mar 13 2025 03:00:54