00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 import roslib.message;roslib.load_manifest('tug_ist_diagnosis_generator')
00023 import rospy
00024 import sys
00025 import xmlrpclib
00026 import os
00027 import subprocess
00028 import time
00029 import shlex
00030 import thread
00031 from math import sqrt
00032 import signal
00033 interrupted = False
00034 import numpy
00035 import matplotlib.pyplot as plt
00036
00037 class node_data_structure(object):
00038 def __init__(self,node_name):
00039 self.node_name = node_name
00040 self.pub_topic_list = []
00041 self.sub_topic_list = []
00042 self.cpu_list = []
00043 self.mem_list = []
00044 self.max_cpu = 0
00045 self.max_mem = 0
00046 self.node_pid = 0
00047 def get_node_name(self):
00048 return self.node_name
00049 def add_pub_topic(self,topic):
00050 self.pub_topic_list.append(topic)
00051 def add_sub_topic(self,topic):
00052 self.sub_topic_list.append(topic)
00053 def get_pub_topics(self):
00054 return self.pub_topic_list
00055 def get_sub_topics(self):
00056 return self.sub_topic_list
00057 def set_cpu(self,cpu):
00058 self.cpu_list.append(cpu)
00059 def get_cpu(self):
00060 n = len(self.cpu_list)
00061 print 'cpu list len'+str(n)
00062 print self.cpu_list
00063 if n== 0:
00064 mean = 0
00065 sd = 0
00066 else:
00067 mean = sum(self.cpu_list)/n
00068 dev = [x - mean for x in self.cpu_list]
00069 dev2 = [x*x for x in dev]
00070 sd = sqrt( sum(dev2) / n)
00071 return mean+2*sd
00072 def set_mem(self,mem):
00073 self.mem_list.append(mem)
00074 def get_mem(self):
00075 n = len(self.mem_list)
00076 mean = sum(self.mem_list)/n
00077 dev = [x - mean for x in self.mem_list]
00078 dev2 = [x*x for x in dev]
00079 sd = sqrt( sum(dev2) / n)
00080 return mean+2*sd
00081 def set_node_pid(self,pid):
00082 self.node_pid = pid
00083 def get_node_pid(self):
00084 return self.node_pid
00085
00086
00087 class topic_data_structure(object):
00088 def __init__(self,topic_name,ws):
00089 self.topic_name = topic_name
00090 self.ws = ws
00091 self.ws1 = ws/1000
00092 self.circular_queu = [0 for i in xrange(self.ws)]
00093
00094 self.frq_list = []
00095 self.occur_list = []
00096 self.calculated_frq = 0
00097 self.calculated_frq_dev = 0
00098 self.prev_t = time.time()
00099 def get_topic_name(self):
00100 return self.topic_name
00101 def set_prev_t(self,prev_t):
00102 self.prev_t = prev_t
00103 def get_prev_t(self):
00104 return self.prev_t
00105 def set_occur(self,val):
00106 self.occur_list.append(val)
00107 def calc_frq(self,delta_t):
00108 self.circular_queu.pop(0)
00109 self.circular_queu.append(delta_t)
00110 sm = numpy.sum(self.circular_queu)
00111
00112
00113
00114 frq = 1/(float(sm)/self.ws)
00115 self.frq_list.append(delta_t)
00116 self.frq_list.append(delta_t+self.ws)
00117
00118
00119
00120 def calculate_frq_dev(self):
00121 frq_list = self.frq_list[self.ws:]
00122 n = len(frq_list)
00123 if n!=0:
00124
00125 mean = numpy.mean(frq_list)
00126
00127 self.calculated_frq_dev = numpy.std(frq_list)
00128 self.calculated_frq = mean
00129 else:
00130 mean = 0
00131 self.calculated_frq_dev = 0
00132 self.calculated_frq = 0
00133 def calculate_mean(self,par_list):
00134 n = len(par_list)
00135 s = sum(par_list)
00136 return float(s)/n
00137 def calculate_standard_deviation(self,par_list):
00138 n = len(par_list)
00139 mean = self.calculate_mean(par_list)
00140 return float(sqrt(sum((x-mean)**2 for x in par_list))/n)
00141 def get_calculated_frq(self):
00142 return self.calculated_frq
00143 def get_calculated_frq_dev(self):
00144 return self.calculated_frq_dev
00145 def get_signal_nature(self):
00146
00147 signal_mean = numpy.mean(self.occur_list)
00148
00149 signal_dev = numpy.std(self.occur_list)
00150 print self.topic_name, signal_mean, signal_dev
00151 def print_frq_list(self):
00152 print 'FILE: for'+self.topic_name
00153 fl = open(self.topic_name[1:len(self.topic_name)]+".txt", "w")
00154 for f in self.frq_list:
00155 fl.write(str(f)+"\n")
00156 print f
00157 fl.close()
00158
00159
00160 class Generator(object):
00161 def __init__(self):
00162 rospy.init_node('observers_generator', anonymous=False)
00163 self.m = xmlrpclib.ServerProxy(os.environ['ROS_MASTER_URI'])
00164 self.caller_id = '/script'
00165 self.param_gen_ws = rospy.get_param('~gen_ws', 100)
00166 self.param_g_ws = rospy.get_param('~gob_ws', 10)
00167 self.param_p_ws = rospy.get_param('~pob_ws', 10)
00168 self.p_mismatch_th = rospy.get_param('~pob_mismatch_th', 5)
00169 self.brd_topic = rospy.get_param('~board_topic', '/board_measurments')
00170
00171 self.nodes_list = []
00172 self.topics_list = []
00173 self.zero_frq_topic_list = []
00174 self.topic_data_structure = []
00175 self.node_data_structure = []
00176 self.node_topic_list = []
00177 self.topics_type_list = []
00178 self.counter = 0
00179 self.global_list = []
00180 self.param_frq = 0
00181 self.param_dev = 0
00182 self.maxCpu = 0
00183 self.maxMem = 0
00184
00185
00186 def start(self):
00187 signal.signal(signal.SIGINT, self.signal_handler)
00188 self.extract_nodes_topics()
00189 thread.start_new_thread(self.threaded_extract_nodes_topics, ())
00190 thread.start_new_thread(self.threaded_extract_mem_cpu, ())
00191 thread.start_new_thread(self.spin_thread, ())
00192 while True:
00193 if interrupted:
00194 break;
00195
00196
00197
00198
00199
00200
00201
00202
00203
00204
00205
00206
00207
00208
00209
00210
00211
00212
00213
00214
00215
00216
00217
00218
00219
00220
00221
00222
00223
00224
00225
00226
00227 def auto_correlate(self,tpc_ds):
00228 lst = tpc_ds.occur_list
00229 mean = numpy.mean(lst)
00230 lst = lst-mean
00231 autocorr = numpy.correlate(lst, lst, mode='full')
00232
00233 return autocorr
00234
00235
00236 def signal_handler(self,signum, frame):
00237 global interrupted
00238 START_TIME = time.time()
00239 print 'START Time='+str(START_TIME)
00240 interrupted = True
00241 time.sleep(1)
00242 print 'making observers started....'
00243 self.make_obs_launch()
00244 print 'making observers finished....'
00245 print 'making model started....'
00246 self.make_mdl_yaml()
00247 print 'making model finished....'
00248 END_TIME = time.time()
00249 print 'END Time='+str(END_TIME)
00250 print 'Total Calculated Time='+str(END_TIME - START_TIME)
00251
00252
00253
00254 def spin_thread(self):
00255 rospy.spin()
00256
00257 def topic_signal_nature(self):
00258 for tpc_ds in self.topic_data_structure:
00259 tpc_ds.get_signal_nature()
00260
00261 def callback(self,data,topic):
00262 curr_t = time.time()
00263 for tpc_ds in self.topic_data_structure:
00264 occurance = 0
00265 if topic == tpc_ds.get_topic_name():
00266 delta_t = curr_t - tpc_ds.get_prev_t()
00267
00268 tpc_ds.calc_frq(curr_t)
00269 tpc_ds.set_prev_t(curr_t)
00270 occurance = 1
00271 tpc_ds.set_occur(occurance)
00272
00273
00274
00275 def threaded_extract_mem_cpu(self):
00276 while True :
00277 for node_ds in self.node_data_structure:
00278 node_name = node_ds.get_node_name()
00279 node_pid = node_ds.get_node_pid()
00280 p = subprocess.Popen("top -b -n 1 | grep -i %s" %node_pid, shell=True,stdout=subprocess.PIPE)
00281 self.out = p.communicate()[0]
00282 self.out1 = shlex.split(self.out)
00283 cpu = self.out1[8]
00284 mem = self.out1[9]
00285
00286
00287
00288
00289 node_ds.set_cpu(float(cpu))
00290 node_ds.set_mem(float(mem))
00291
00292 time.sleep(0.25)
00293
00294 def extract_nodes_topics(self):
00295 pubcode, statusMessage, topicList = self.m.getPublishedTopics(self.caller_id, "")
00296
00297 for topic in topicList:
00298
00299
00300 if topic[0] not in self.topics_list:
00301 self.topics_list.append(topic[0])
00302 msg_class = roslib.message.get_message_class(topic[1])
00303 self.topic_data_structure.append(topic_data_structure(topic[0],self.param_gen_ws))
00304 rospy.Subscriber(topic[0], msg_class, self.callback, topic[0])
00305 for n in self.nodes_list:
00306 print n
00307 for t in self.topics_list:
00308 print t
00309 print '==================='
00310
00311 code, statusMessage, sysState = self.m.getSystemState(self.caller_id)
00312
00313 Pubs = sysState[0]
00314 Subs = sysState[1]
00315 for lst in sysState:
00316 for row in lst:
00317 for node in row[1]:
00318
00319 if (node == '/observers_generator') | ('rostopic' in node) :
00320 continue
00321 if node not in self.nodes_list:
00322 self.nodes_list.append(node)
00323 a = subprocess.Popen("rosnode info " + node , shell=True,stdout=subprocess.PIPE)
00324 parts = shlex.split(a.communicate()[0])
00325 indx = parts.index("Pid:")
00326 pid = parts[indx+1]
00327 nd_ds = node_data_structure(node)
00328 nd_ds.set_node_pid(pid)
00329 for item in Pubs:
00330 if (node in item[1]) & (item[0] in self.topics_list):
00331 nd_ds.add_pub_topic(item[0])
00332 for item in Subs:
00333 if (node in item[1]) & (item[0] in self.topics_list):
00334 nd_ds.add_sub_topic(item[0])
00335 self.node_data_structure.append(nd_ds)
00336 print 'node=',node,' pid=',pid
00337
00338 def threaded_extract_nodes_topics(self):
00339 while True :
00340 pubcode, statusMessage, topicList = self.m.getPublishedTopics(self.caller_id, "")
00341
00342 for topic in topicList:
00343
00344
00345 if topic[0] not in self.topics_list:
00346 self.topics_list.append(topic[0])
00347 msg_class = roslib.message.get_message_class(topic[1])
00348 self.topic_data_structure.append(topic_data_structure(topic[0],self.param_gen_ws))
00349 rospy.Subscriber(topic[0], msg_class, self.callback, topic[0])
00350
00351
00352
00353 code, statusMessage, sysState = self.m.getSystemState(self.caller_id)
00354
00355 Pubs = sysState[0]
00356 Subs = sysState[1]
00357 for lst in sysState:
00358 for row in lst:
00359 for node in row[1]:
00360
00361 if (node == '/observers_generator') | ('rostopic' in node) :
00362 continue
00363 if node not in self.nodes_list:
00364 self.nodes_list.append(node)
00365
00366 a = subprocess.Popen("rosnode info " + node , shell=True,stdout=subprocess.PIPE)
00367 parts = shlex.split(a.communicate()[0])
00368 indx = parts.index("Pid:")
00369 pid = parts[indx+1]
00370 nd_ds = node_data_structure(node)
00371 nd_ds.set_node_pid(pid)
00372 for item in Pubs:
00373 if (node in item[1]) & (item[0] in self.topics_list):
00374 nd_ds.add_pub_topic(item[0])
00375 for item in Subs:
00376 if (node in item[1]) & (item[0] in self.topics_list):
00377 nd_ds.add_sub_topic(item[0])
00378 self.node_data_structure.append(nd_ds)
00379
00380
00381 time.sleep(0.5)
00382
00383 def make_obs_launch(self):
00384 temp_path = "/home/safdar/my_workspace/model_based_diagnosis/tug_ist_diagnosis_launch/launch/obs_auto.launch"
00385 file = open(temp_path, 'wb')
00386 file.write('<?xml version="1.0"?>\n<launch>\n')
00387 if self.brd_topic in self.topics_list:
00388 file.write('<node pkg="tug_ist_diagnosis_observers" type="HObs.py" name="BoardHObs" >\n')
00389 file.write('</node>\n')
00390 for nd_ds in self.node_data_structure:
00391 node_name = nd_ds.get_node_name()
00392 node_name = node_name[1:len(node_name)]
00393 node_mxCpu = nd_ds.get_cpu()
00394 node_mxMem = nd_ds.get_mem()
00395 nobs_name = node_name+'NObs'
00396 pubs = nd_ds.get_pub_topics()
00397
00398
00399
00400
00401
00402 file.write('<node pkg="tug_ist_diagnosis_observers" type="NObs.py" name="'+nobs_name+'" >\n')
00403 file.write('\t<param name="node" value="'+node_name+'" />\n')
00404 file.write('</node>\n')
00405 pobs_name = node_name+'CpuPObs'
00406 file.write('<node pkg="tug_ist_diagnosis_observers" type="PObs.py" name="'+pobs_name+'" >\n')
00407 file.write('\t<param name="node" value="'+node_name+'" />\n')
00408 file.write('\t<param name="property" value="cpu" />\n')
00409 file.write('\t<param name="max_val" value="'+str(node_mxCpu)+'" />\n')
00410 file.write('\t<param name="mismatch_th" value="'+str(self.p_mismatch_th)+'" />\n')
00411 file.write('\t<param name="ws" value="'+str(self.param_p_ws)+'" />\n')
00412 file.write('</node>\n')
00413 pobs_name = node_name+'MemPObs'
00414 file.write('<node pkg="tug_ist_diagnosis_observers" type="PObs.py" name="'+pobs_name+'" >\n')
00415 file.write('\t<param name="node" value="'+node_name+'" />\n')
00416 file.write('\t<param name="property" value="mem" />\n')
00417 file.write('\t<param name="max_val" value="'+str(node_mxMem)+'" />\n')
00418 file.write('\t<param name="mismatch_th" value="'+str(self.p_mismatch_th)+'" />\n')
00419 file.write('\t<param name="ws" value="'+str(self.param_p_ws)+'" />\n')
00420 file.write('</node>\n')
00421 for t in self.topics_list:
00422 for tpc_ds in self.topic_data_structure:
00423 if t == tpc_ds.get_topic_name():
00424 tpc_ds.calculate_frq_dev()
00425 y = 'imu'
00426 if y in t:
00427 tpc_ds.print_frq_list()
00428 self.param_frq = tpc_ds.get_calculated_frq()
00429 self.param_dev = tpc_ds.get_calculated_frq_dev()
00430 break
00431 if self.param_frq == 0:
00432 self.zero_frq_topic_list.append(t)
00433 continue
00434 topic_name = t[1:len(t)]
00435 gobs_name = topic_name+'GObs'
00436 file.write('<node pkg="tug_ist_diagnosis_observers" type="GObs.py" name="'+gobs_name+'" >\n')
00437 file.write('\t<param name="topic" value="'+topic_name+'" />\n')
00438 file.write('\t<param name="frq" value="'+str(self.param_frq)+'" />\n')
00439 file.write('\t<param name="dev" value="'+str(2*self.param_dev)+'" />\n')
00440 file.write('\t<param name="ws" value="'+str(self.param_g_ws)+'" />\n')
00441 file.write('</node>\n')
00442 file.write('</launch>')
00443 file.close()
00444
00445 def make_mdl_yaml(self):
00446 temp_path = "/home/safdar/my_workspace/model_based_diagnosis/tug_ist_diagnosis_model/diagnosis_model.yaml"
00447 file = open(temp_path, 'wb')
00448 file.write('ab: "AB"\nnab: "NAB"\nneg_prefix: "not_"\n\nprops:\n')
00449 for n in self.nodes_list:
00450 node_name = n[1:len(n)]
00451 file.write(' - running('+node_name+')\n')
00452 file.write(' - ok('+node_name+',cpu)\n')
00453 file.write(' - ok('+node_name+',mem)\n')
00454 for t in self.topics_list:
00455 if t in self.zero_frq_topic_list:
00456 continue
00457 topic_name = t[1:len(t)]
00458 file.write(' - ok('+topic_name+')\n')
00459 file.write('rules:\n')
00460 for nd_ds in self.node_data_structure:
00461 node_name = nd_ds.get_node_name()
00462 node_name = node_name[1:len(node_name)]
00463 file.write(' - NAB('+node_name+') -> running('+node_name+')\n')
00464 file.write(' - NAB('+node_name+'), running('+node_name+') -> ok('+node_name+',cpu)\n')
00465 file.write(' - NAB('+node_name+'), running('+node_name+') -> ok('+node_name+',mem)\n')
00466 pubs = nd_ds.get_pub_topics()
00467 subs = nd_ds.get_sub_topics()
00468 for p in pubs:
00469 if p in self.zero_frq_topic_list:
00470 continue
00471 str_subs = ''
00472 for s in subs:
00473 if s in self.zero_frq_topic_list:
00474 continue
00475 str_subs = str_subs + ', ok('+ s[1:len(s)] +')'
00476 file.write(' - NAB('+node_name+')'+str_subs+' -> ok('+p[1:len(p)]+')\n')
00477 file.close()
00478 def extract_nodes(self):
00479 code, statusMessage, sysState = self.m.getSystemState(self.caller_id)
00480 for lst in sysState:
00481 for row in lst:
00482 for node in row[1]:
00483
00484 if (node == '/observers_generator'):
00485 continue
00486 if node not in self.nodes_list:
00487 self.nodes_list.append(node)
00488
00489 i = 0
00490 for node in self.nodes_list:
00491 self.node_data_structure.append(node_data_structure(node))
00492 node_ds = self.node_data_structure[i]
00493 self.extract_mem_cpu(node,node_ds)
00494 for item in sysState[0]:
00495 if (node in item[1]) & (item[0] in self.topics_list):
00496 node_ds.add_pub_topic(item[0])
00497 for item in sysState[1]:
00498 if (node in item[1]) & (item[0] in self.topics_list):
00499 node_ds.add_sub_topic(item[0])
00500 i = i + 1
00501
00502
00503 def extract_mem_cpu(self, node, node_ds):
00504 a = subprocess.Popen("rosnode info " + node , shell=True,stdout=subprocess.PIPE)
00505 parts = shlex.split(a.communicate()[0])
00506 indx = parts.index("Pid:")
00507 pid = parts[indx+1]
00508 t = 0
00509 maxMem = 0
00510 maxCpu = 0
00511 while t<=6:
00512 p = subprocess.Popen("top -b -n 1 | grep -i %s" %pid, shell=True,stdout=subprocess.PIPE)
00513 self.out = p.communicate()[0]
00514 self.out1 = shlex.split(self.out)
00515 cpu = self.out1[8]
00516 mem = self.out1[9]
00517
00518
00519
00520
00521 t = t + 1
00522 node_ds.set_cpu(float(cpu))
00523 node_ds.set_mem(float(mem))
00524
00525 def extract_topics(self):
00526 pubcode, statusMessage, topicList = self.m.getPublishedTopics(self.caller_id, "")
00527 i = 0
00528 for topic in topicList:
00529
00530
00531 if topic[0] not in self.topics_list:
00532 self.topics_list.append(topic[0])
00533 print 'topic 0', topic[0],' topic 1', topic[1]
00534 msg_class = roslib.message.get_message_class(topic[1])
00535 self.topic_data_structure.append(topic_data_structure(topic[0],self.param_gen_ws))
00536 sb=rospy.Subscriber(topic[0], msg_class, self.callback, topic[0])
00537 i = i + 1
00538 thread.start_new_thread(self.spin_thread,('thread', 1))
00539 time.sleep(1)
00540
00541
00542 def average_delta_t(self):
00543 s = 0
00544 for val in self.circular_queu:
00545 s = s + val
00546 return s/self.param_ws
00547
00548 def run_observers(self):
00549 output = subprocess.Popen("roslaunch tug_ist_diagnosis_launch obs_auto.launch" , shell=True,stdout=subprocess.PIPE)
00550
00551 def run_model_server(self):
00552 command = "rosrun tug_ist_diagnosis_model diagnosis_model_server.py"
00553 param = "_model:=/home/safdar/my_workspace/model_based_diagnosis/tug_ist_diagnosis_model/diagnosis_model.yaml"
00554 output = subprocess.Popen(command+param , shell=True,stdout=subprocess.PIPE)
00555
00556
00557 def report_error(self):
00558 print '\n No Running System!'
00559 sys.exit(os.EX_USAGE)
00560
00561 if __name__ == '__main__':
00562 generator = Generator()
00563 generator.start()