22 """Perform Prometheus range queries to obtain cpu and memory data.
24 This module performs range queries through Prometheus API to obtain
25 total cpu seconds and memory during a test run for given container
26 in given pods. The cpu data obtained is total cpu second used within
27 given period of time. The memory data was instant memory usage at
34 from typing
import Any, Dict, List
36 from dateutil
import parser
41 """Objects which holds the start time, end time and query URL."""
54 """Fetches the given query with time range.
56 Fetch the given query within a time range. The pulling
57 interval is every 5s, the actual data from the query is
61 self.
url +
'/api/v1/query_range',
69 resp.raise_for_status()
73 pod_name: str) -> Dict[str, List[float]]:
74 """Fetches the cpu data for each pod.
76 Fetch total cpu seconds during the time range specified in the Prometheus instance
77 for a pod. After obtain the cpu seconds, the data are trimmed from time series to
78 a data list and saved in the Dict that keyed by the container names.
81 container_matcher: A string consist one or more container name separated by |.
84 'container_cpu_usage_seconds_total{job="kubernetes-cadvisor",pod="'
85 + pod_name +
'",container=' + container_matcher +
'}')
86 logging.debug(
'running prometheus query for cpu: %s', query)
88 logging.debug(
'raw cpu data: %s',
str(cpu_data))
91 return cpu_container_name_to_data_list
94 pod_name: str) -> Dict[str, List[float]]:
95 """Fetches memory data for each pod.
97 Fetch total memory data during the time range specified in the Prometheus instance
98 for a pod. After obtain the memory data, the data are trimmed from time series to
99 a data list and saved in the Dict that keyed by the container names.
102 container_matcher: A string consist one or more container name separated by |.
105 'container_memory_usage_bytes{job="kubernetes-cadvisor",pod="' +
106 pod_name +
'",container=' + container_matcher +
"}")
108 logging.debug(
'running prometheus query for memory: %s', query)
111 logging.debug(
'raw memory data: %s',
str(memory_data))
115 return memory_container_name_to_data_list
118 self, container_list: List[str],
119 pod_dict: Dict[str, List[str]]) -> Dict[str, Any]:
120 """Fetch total cpu seconds and memory data for multiple pods.
123 container_list: A list of container names to fetch the data for.
124 pod_dict: the pods to fetch data for, the pod_dict is keyed by
125 role of the pod: clients, driver and servers. The values
126 for the pod_dict are the list of pod names that consist
127 the same role specified in the key.
131 for role, pod_names
in pod_dict.items():
133 for pod
in pod_names:
136 container_matcher, pod).
items():
137 container_data[container] = {}
138 container_data[container][
142 container_matcher, pod).
items():
143 container_data[container][
146 pod_data[pod] = container_data
147 processed_data[role] = pod_data
148 return processed_data
152 """Constructs the container matching string used in the
154 if len(container_list) == 0:
155 raise Exception(
'no container name provided')
157 containers_to_fetch =
'"'
158 if len(container_list) == 1:
159 containers_to_fetch = container_list[0]
161 containers_to_fetch =
'~"' + container_list[0]
162 for container
in container_list[1:]:
163 containers_to_fetch = containers_to_fetch +
'|' + container
164 containers_to_fetch = containers_to_fetch +
'"'
165 return containers_to_fetch
169 """Constructs a Dict as keys are the container names and
170 values are a list of data taken from given timeseries data."""
171 if data[
'status'] !=
'success':
172 raise Exception(
'command failed: ' + data[
'status'] +
str(data))
173 if data[
'data'][
'resultType'] !=
'matrix':
174 raise Exception(
'resultType is not matrix: ' +
175 data[
'data'][
'resultType'])
177 container_name_to_data_list = {}
178 for res
in data[
"data"][
"result"]:
179 container_name = res[
"metric"][
"container"]
180 container_data_timeseries = res[
"values"]
183 for d
in container_data_timeseries:
184 container_data.append(float(d[1]))
185 container_name_to_data_list[container_name] = container_data
186 return container_name_to_data_list
190 """Computes the total cpu seconds by CPUs[end]-CPUs[start]."""
191 return cpu_data_list[
len(cpu_data_list) - 1] - cpu_data_list[0]
195 """Computes the mean and for a given list of data."""
196 return statistics.mean(memory_data_list)
200 pod_types: List[str]) -> Dict[str, List[str]]:
201 """Constructs a dict of pod names to be queried.
204 node_info_file: The file path contains the pod names to query.
205 The pods' names are put into a Dict of list that keyed by the
206 role name: clients, servers and driver.
208 with open(node_info_file,
'r')
as f:
209 pod_names = json.load(f)
210 pod_type_to_name = {
'clients': [],
'driver': [],
'servers': []}
212 for client
in pod_names[
'Clients']:
213 pod_type_to_name[
'clients'].append(client[
'Name'])
214 for server
in pod_names[
'Servers']:
215 pod_type_to_name[
'servers'].append(server[
'Name'])
217 pod_type_to_name[
"driver"].append(pod_names[
'Driver'][
'Name'])
219 pod_names_to_query = {}
220 for pod_type
in pod_types:
221 pod_names_to_query[pod_type] = pod_type_to_name[pod_type]
222 return pod_names_to_query
226 """Converts a utc timestamp string to epoch time string."""
227 parsed_time = parser.parse(utc_timestamp)
228 epoch = parsed_time.strftime(
'%s')
233 argp = argparse.ArgumentParser(
234 description=
'Fetch cpu and memory stats from prometheus')
235 argp.add_argument(
'--url', help=
'Prometheus base url', required=
True)
237 '--scenario_result_file',
238 default=
'scenario_result.json',
240 help=
'File contains epoch seconds for start and end time',
244 default=
'/var/data/qps_workers/node_info.json',
245 help=
'File contains pod name to query the metrics for',
251 'Pod type to query the metrics for, the options are driver, client and server',
252 choices=[
'driver',
'clients',
'servers'],
258 help=
'The container names to query the metrics for',
262 '--export_file_name',
263 default=
'prometheus_query_result.json',
265 help=
'Name of exported JSON file.',
270 help=
'Suppress informative output',
272 args = argp.parse_args()
275 logging.getLogger().setLevel(logging.DEBUG)
277 with open(args.scenario_result_file,
'r')
as q:
278 scenario_result = json.load(q)
280 scenario_result[
'summary'][
'startTime'])
289 processed_data = p.fetch_cpu_and_memory_data(
290 container_list=args.container_name, pod_dict=pod_dict)
291 processed_data[
'testDurationSeconds'] = float(end_time) - float(start_time)
293 logging.debug(json.dumps(processed_data, sort_keys=
True, indent=4))
295 with open(args.export_file_name,
'w', encoding=
'utf8')
as export_file:
296 json.dump(processed_data, export_file, sort_keys=
True, indent=4)
299 if __name__ ==
'__main__':