Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 import unittest
00036 import rospy, rostest
00037 import rosparam
00038 import optparse
00039 import sys
00040 import threading
00041 from bondpy import bondpy
00042 from diagnostic_msgs.srv import AddDiagnostics
00043 from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus
00044
00045 PKG = 'diagnostic_aggregator'
00046
00047 class TestAddAnalyzer(unittest.TestCase):
00048 def __init__(self, *args):
00049 super(TestAddAnalyzer, self).__init__(*args)
00050 rospy.init_node('test_add_analyzer')
00051 self.namespace = rospy.get_name()
00052 paramlist = rosparam.load_file(rospy.myargv()[1])
00053
00054 self.expected = [paramlist[0][1] + analyzer['path'] for name, analyzer in paramlist[0][0]['analyzers'].iteritems()]
00055
00056 self._mutex = threading.Lock()
00057 self.agg_msgs = {}
00058
00059
00060 for params, ns in paramlist:
00061 rosparam.upload_params(rospy.get_name() + '/' + ns, params)
00062
00063 rospy.Subscriber('/diagnostics_agg', DiagnosticArray, self.agg_cb)
00064 self.pub = rospy.Publisher('/diagnostics', DiagnosticArray, queue_size=1)
00065
00066 def agg_cb(self, msg):
00067 with self._mutex:
00068 for stat in msg.status:
00069 self.agg_msgs[stat.name] = stat
00070
00071 def add_analyzer(self):
00072 """Start a bond to the aggregator
00073 """
00074 namespace = rospy.resolve_name(rospy.get_name())
00075 self.bond = bondpy.Bond("/diagnostics_agg/bond" + namespace, namespace)
00076 self.bond.start()
00077 rospy.wait_for_service('/diagnostics_agg/add_diagnostics', timeout=10)
00078 add_diagnostics = rospy.ServiceProxy('/diagnostics_agg/add_diagnostics', AddDiagnostics)
00079 print(self.namespace)
00080 resp = add_diagnostics(load_namespace=self.namespace)
00081 self.assert_(resp.success, 'Service call was unsuccessful: {0}'.format(resp.message))
00082
00083 def wait_for_agg(self):
00084 self.agg_msgs = {}
00085 while not self.agg_msgs and not rospy.is_shutdown():
00086 rospy.sleep(rospy.Duration(3))
00087
00088 def test_add_agg(self):
00089 self.wait_for_agg()
00090
00091
00092 with self._mutex:
00093 agg_paths = [msg.name for name, msg in self.agg_msgs.iteritems()]
00094 self.assert_(not any(expected in agg_paths for expected in self.expected))
00095
00096
00097 self.add_analyzer()
00098
00099 arr = DiagnosticArray()
00100 arr.header.stamp = rospy.get_rostime()
00101 arr.status = [
00102 DiagnosticStatus(name='primary', message='hello-primary'),
00103 DiagnosticStatus(name='secondary', message='hello-secondary')
00104 ]
00105 self.pub.publish(arr)
00106 self.wait_for_agg()
00107
00108
00109
00110 with self._mutex:
00111 agg_paths = [msg.name for name, msg in self.agg_msgs.iteritems()]
00112 self.assert_(all(expected in agg_paths for expected in self.expected))
00113
00114 rospy.sleep(rospy.Duration(5))
00115 arr.header.stamp = rospy.get_rostime()
00116 self.pub.publish(arr)
00117 self.wait_for_agg()
00118
00119 for name, msg in self.agg_msgs.iteritems():
00120 if name in self.expected:
00121 self.assert_(msg.message == 'OK')
00122
00123 agg_paths = [msg.name for name, msg in self.agg_msgs.iteritems()]
00124 self.assert_(all(expected in agg_paths for expected in self.expected))
00125
00126
00127 self.bond.shutdown()
00128 rospy.sleep(rospy.Duration(5))
00129 self.wait_for_agg()
00130
00131 with self._mutex:
00132 agg_paths = [msg.name for name, msg in self.agg_msgs.iteritems()]
00133 self.assert_(not any(expected in agg_paths for expected in self.expected))
00134
00135 if __name__ == '__main__':
00136 print 'SYS ARGS:', sys.argv
00137 rostest.run(PKG, sys.argv[0], TestAddAnalyzer, sys.argv)