prometheus.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 
3 # Copyright 2022 The gRPC Authors
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 
17 # A script to fetch total cpu seconds and memory data from prometheus.
18 # example usage: python3 prometheus.py
19 # --url=http://prometheus.prometheus.svc.cluster.local:9090
20 # --pod_type=driver --pod_type=clients --container_name=main
21 # --container_name=sidecar
22 """Perform Prometheus range queries to obtain cpu and memory data.
23 
24 This module performs range queries through Prometheus API to obtain
25 total cpu seconds and memory during a test run for given container
26 in given pods. The cpu data obtained is total cpu second used within
27 given period of time. The memory data was instant memory usage at
28 the query time.
29 """
30 import argparse
31 import json
32 import logging
33 import statistics
34 from typing import Any, Dict, List
35 
36 from dateutil import parser
37 import requests
38 
39 
40 class Prometheus:
41  """Objects which holds the start time, end time and query URL."""
42 
43  def __init__(
44  self,
45  url: str,
46  start: str,
47  end: str,
48  ):
49  self.url = url
50  self.start = start
51  self.end = end
52 
53  def _fetch_by_query(self, query: str) -> Dict[str, Any]:
54  """Fetches the given query with time range.
55 
56  Fetch the given query within a time range. The pulling
57  interval is every 5s, the actual data from the query is
58  a time series.
59  """
60  resp = requests.get(
61  self.url + '/api/v1/query_range',
62  {
63  'query': query,
64  'start': self.start,
65  'end': self.end,
66  'step': 5
67  },
68  )
69  resp.raise_for_status()
70  return resp.json()
71 
72  def _fetch_cpu_for_pod(self, container_matcher: str,
73  pod_name: str) -> Dict[str, List[float]]:
74  """Fetches the cpu data for each pod.
75 
76  Fetch total cpu seconds during the time range specified in the Prometheus instance
77  for a pod. After obtain the cpu seconds, the data are trimmed from time series to
78  a data list and saved in the Dict that keyed by the container names.
79 
80  Args:
81  container_matcher: A string consist one or more container name separated by |.
82  """
83  query = (
84  'container_cpu_usage_seconds_total{job="kubernetes-cadvisor",pod="'
85  + pod_name + '",container=' + container_matcher + '}')
86  logging.debug('running prometheus query for cpu: %s', query)
87  cpu_data = self._fetch_by_query(query)
88  logging.debug('raw cpu data: %s', str(cpu_data))
89  cpu_container_name_to_data_list = get_data_list_from_timeseries(
90  cpu_data)
91  return cpu_container_name_to_data_list
92 
93  def _fetch_memory_for_pod(self, container_matcher: str,
94  pod_name: str) -> Dict[str, List[float]]:
95  """Fetches memory data for each pod.
96 
97  Fetch total memory data during the time range specified in the Prometheus instance
98  for a pod. After obtain the memory data, the data are trimmed from time series to
99  a data list and saved in the Dict that keyed by the container names.
100 
101  Args:
102  container_matcher: A string consist one or more container name separated by |.
103  """
104  query = (
105  'container_memory_usage_bytes{job="kubernetes-cadvisor",pod="' +
106  pod_name + '",container=' + container_matcher + "}")
107 
108  logging.debug('running prometheus query for memory: %s', query)
109  memory_data = self._fetch_by_query(query)
110 
111  logging.debug('raw memory data: %s', str(memory_data))
112  memory_container_name_to_data_list = get_data_list_from_timeseries(
113  memory_data)
114 
115  return memory_container_name_to_data_list
116 
118  self, container_list: List[str],
119  pod_dict: Dict[str, List[str]]) -> Dict[str, Any]:
120  """Fetch total cpu seconds and memory data for multiple pods.
121 
122  Args:
123  container_list: A list of container names to fetch the data for.
124  pod_dict: the pods to fetch data for, the pod_dict is keyed by
125  role of the pod: clients, driver and servers. The values
126  for the pod_dict are the list of pod names that consist
127  the same role specified in the key.
128  """
129  container_matcher = construct_container_matcher(container_list)
130  processed_data = {}
131  for role, pod_names in pod_dict.items():
132  pod_data = {}
133  for pod in pod_names:
134  container_data = {}
135  for container, data in self._fetch_cpu_for_pod(
136  container_matcher, pod).items():
137  container_data[container] = {}
138  container_data[container][
139  'cpuSeconds'] = compute_total_cpu_seconds(data)
140 
141  for container, data in self._fetch_memory_for_pod(
142  container_matcher, pod).items():
143  container_data[container][
144  'memoryMean'] = compute_average_memory_usage(data)
145 
146  pod_data[pod] = container_data
147  processed_data[role] = pod_data
148  return processed_data
149 
150 
151 def construct_container_matcher(container_list: List[str]) -> str:
152  """Constructs the container matching string used in the
153  prometheus query."""
154  if len(container_list) == 0:
155  raise Exception('no container name provided')
156 
157  containers_to_fetch = '"'
158  if len(container_list) == 1:
159  containers_to_fetch = container_list[0]
160  else:
161  containers_to_fetch = '~"' + container_list[0]
162  for container in container_list[1:]:
163  containers_to_fetch = containers_to_fetch + '|' + container
164  containers_to_fetch = containers_to_fetch + '"'
165  return containers_to_fetch
166 
167 
168 def get_data_list_from_timeseries(data: Any) -> Dict[str, List[float]]:
169  """Constructs a Dict as keys are the container names and
170  values are a list of data taken from given timeseries data."""
171  if data['status'] != 'success':
172  raise Exception('command failed: ' + data['status'] + str(data))
173  if data['data']['resultType'] != 'matrix':
174  raise Exception('resultType is not matrix: ' +
175  data['data']['resultType'])
176 
177  container_name_to_data_list = {}
178  for res in data["data"]["result"]:
179  container_name = res["metric"]["container"]
180  container_data_timeseries = res["values"]
181 
182  container_data = []
183  for d in container_data_timeseries:
184  container_data.append(float(d[1]))
185  container_name_to_data_list[container_name] = container_data
186  return container_name_to_data_list
187 
188 
189 def compute_total_cpu_seconds(cpu_data_list: List[float]) -> float:
190  """Computes the total cpu seconds by CPUs[end]-CPUs[start]."""
191  return cpu_data_list[len(cpu_data_list) - 1] - cpu_data_list[0]
192 
193 
194 def compute_average_memory_usage(memory_data_list: List[float]) -> float:
195  """Computes the mean and for a given list of data."""
196  return statistics.mean(memory_data_list)
197 
198 
199 def construct_pod_dict(node_info_file: str,
200  pod_types: List[str]) -> Dict[str, List[str]]:
201  """Constructs a dict of pod names to be queried.
202 
203  Args:
204  node_info_file: The file path contains the pod names to query.
205  The pods' names are put into a Dict of list that keyed by the
206  role name: clients, servers and driver.
207  """
208  with open(node_info_file, 'r') as f:
209  pod_names = json.load(f)
210  pod_type_to_name = {'clients': [], 'driver': [], 'servers': []}
211 
212  for client in pod_names['Clients']:
213  pod_type_to_name['clients'].append(client['Name'])
214  for server in pod_names['Servers']:
215  pod_type_to_name['servers'].append(server['Name'])
216 
217  pod_type_to_name["driver"].append(pod_names['Driver']['Name'])
218 
219  pod_names_to_query = {}
220  for pod_type in pod_types:
221  pod_names_to_query[pod_type] = pod_type_to_name[pod_type]
222  return pod_names_to_query
223 
224 
225 def convert_UTC_to_epoch(utc_timestamp: str) -> str:
226  """Converts a utc timestamp string to epoch time string."""
227  parsed_time = parser.parse(utc_timestamp)
228  epoch = parsed_time.strftime('%s')
229  return epoch
230 
231 
232 def main() -> None:
233  argp = argparse.ArgumentParser(
234  description='Fetch cpu and memory stats from prometheus')
235  argp.add_argument('--url', help='Prometheus base url', required=True)
236  argp.add_argument(
237  '--scenario_result_file',
238  default='scenario_result.json',
239  type=str,
240  help='File contains epoch seconds for start and end time',
241  )
242  argp.add_argument(
243  '--node_info_file',
244  default='/var/data/qps_workers/node_info.json',
245  help='File contains pod name to query the metrics for',
246  )
247  argp.add_argument(
248  '--pod_type',
249  action='append',
250  help=
251  'Pod type to query the metrics for, the options are driver, client and server',
252  choices=['driver', 'clients', 'servers'],
253  required=True,
254  )
255  argp.add_argument(
256  '--container_name',
257  action='append',
258  help='The container names to query the metrics for',
259  required=True,
260  )
261  argp.add_argument(
262  '--export_file_name',
263  default='prometheus_query_result.json',
264  type=str,
265  help='Name of exported JSON file.',
266  )
267  argp.add_argument(
268  '--quiet',
269  default=False,
270  help='Suppress informative output',
271  )
272  args = argp.parse_args()
273 
274  if not args.quiet:
275  logging.getLogger().setLevel(logging.DEBUG)
276 
277  with open(args.scenario_result_file, 'r') as q:
278  scenario_result = json.load(q)
279  start_time = convert_UTC_to_epoch(
280  scenario_result['summary']['startTime'])
281  end_time = convert_UTC_to_epoch(scenario_result['summary']['endTime'])
282  p = Prometheus(
283  url=args.url,
284  start=start_time,
285  end=end_time,
286  )
287 
288  pod_dict = construct_pod_dict(args.node_info_file, args.pod_type)
289  processed_data = p.fetch_cpu_and_memory_data(
290  container_list=args.container_name, pod_dict=pod_dict)
291  processed_data['testDurationSeconds'] = float(end_time) - float(start_time)
292 
293  logging.debug(json.dumps(processed_data, sort_keys=True, indent=4))
294 
295  with open(args.export_file_name, 'w', encoding='utf8') as export_file:
296  json.dump(processed_data, export_file, sort_keys=True, indent=4)
297 
298 
299 if __name__ == '__main__':
300  main()
xds_interop_client.str
str
Definition: xds_interop_client.py:487
performance.prometheus.construct_container_matcher
str construct_container_matcher(List[str] container_list)
Definition: prometheus.py:151
performance.prometheus.Prometheus.url
url
Definition: prometheus.py:44
performance.prometheus.Prometheus.__init__
def __init__(self, str url, str start, str end)
Definition: prometheus.py:43
performance.prometheus.Prometheus.start
start
Definition: prometheus.py:45
performance.prometheus.Prometheus
Definition: prometheus.py:40
performance.prometheus.Prometheus.fetch_cpu_and_memory_data
Dict[str, Any] fetch_cpu_and_memory_data(self, List[str] container_list, Dict[str, List[str]] pod_dict)
Definition: prometheus.py:117
performance.prometheus.Prometheus.end
end
Definition: prometheus.py:46
performance.prometheus.main
None main()
Definition: prometheus.py:232
performance.prometheus.Prometheus._fetch_cpu_for_pod
Dict[str, List[float]] _fetch_cpu_for_pod(self, str container_matcher, str pod_name)
Definition: prometheus.py:72
performance.prometheus.Prometheus._fetch_memory_for_pod
Dict[str, List[float]] _fetch_memory_for_pod(self, str container_matcher, str pod_name)
Definition: prometheus.py:93
performance.prometheus.get_data_list_from_timeseries
Dict[str, List[float]] get_data_list_from_timeseries(Any data)
Definition: prometheus.py:168
xds_manager.items
items
Definition: xds_manager.py:55
main
Definition: main.py:1
performance.prometheus.construct_pod_dict
Dict[str, List[str]] construct_pod_dict(str node_info_file, List[str] pod_types)
Definition: prometheus.py:199
performance.prometheus.compute_average_memory_usage
float compute_average_memory_usage(List[float] memory_data_list)
Definition: prometheus.py:194
open
#define open
Definition: test-fs.c:46
len
int len
Definition: abseil-cpp/absl/base/internal/low_level_alloc_test.cc:46
performance.prometheus.compute_total_cpu_seconds
float compute_total_cpu_seconds(List[float] cpu_data_list)
Definition: prometheus.py:189
performance.prometheus.convert_UTC_to_epoch
str convert_UTC_to_epoch(str utc_timestamp)
Definition: prometheus.py:225
performance.prometheus.Prometheus._fetch_by_query
Dict[str, Any] _fetch_by_query(self, str query)
Definition: prometheus.py:53


grpc
Author(s):
autogenerated on Thu Mar 13 2025 03:00:55