41 from bondpy
import bondpy
42 from diagnostic_msgs.srv
import AddDiagnostics
43 from diagnostic_msgs.msg
import DiagnosticArray, DiagnosticStatus
45 PKG =
'diagnostic_aggregator'
49 super(TestAddAnalyzer, self).
__init__(*args)
50 rospy.init_node(
'test_add_analyzer')
52 paramlist = rosparam.load_file(rospy.myargv()[1])
54 self.
expected = [paramlist[0][1] + analyzer[
'path']
for name, analyzer
in paramlist[0][0][
'analyzers'].items()]
60 for params, ns
in paramlist:
61 rosparam.upload_params(rospy.get_name() +
'/' + ns, params)
63 rospy.Subscriber(
'/diagnostics_agg', DiagnosticArray, self.
agg_cb)
64 self.
pub = rospy.Publisher(
'/diagnostics', DiagnosticArray, queue_size=1)
68 for stat
in msg.status:
72 """Start a bond to the aggregator
74 namespace = rospy.resolve_name(rospy.get_name())
75 self.
bond = bondpy.Bond(
"/diagnostics_agg/bond" + namespace, namespace)
77 rospy.wait_for_service(
'/diagnostics_agg/add_diagnostics', timeout=10)
78 add_diagnostics = rospy.ServiceProxy(
'/diagnostics_agg/add_diagnostics', AddDiagnostics)
80 resp = add_diagnostics(load_namespace=self.
namespace)
81 self.assert_(resp.success,
'Service call was unsuccessful: {0}'.format(resp.message))
85 while not self.
agg_msgs and not rospy.is_shutdown():
86 rospy.sleep(rospy.Duration(3))
93 agg_paths = [msg.name
for name, msg
in self.
agg_msgs.items()]
94 self.assert_(
not any(expected
in agg_paths
for expected
in self.
expected))
99 arr = DiagnosticArray()
100 arr.header.stamp = rospy.get_rostime()
102 DiagnosticStatus(name=
'primary', message=
'hello-primary'),
103 DiagnosticStatus(name=
'secondary', message=
'hello-secondary')
105 self.
pub.publish(arr)
111 agg_paths = [msg.name
for name, msg
in self.
agg_msgs.items()]
112 self.assert_(all(expected
in agg_paths
for expected
in self.
expected))
114 rospy.sleep(rospy.Duration(5))
115 arr.header.stamp = rospy.get_rostime()
116 self.
pub.publish(arr)
119 for name, msg
in self.
agg_msgs.items():
121 self.assert_(msg.message ==
'OK')
123 agg_paths = [msg.name
for name, msg
in self.
agg_msgs.items()]
124 self.assert_(all(expected
in agg_paths
for expected
in self.
expected))
128 rospy.sleep(rospy.Duration(5))
132 agg_paths = [msg.name
for name, msg
in self.
agg_msgs.items()]
133 self.assert_(
not any(expected
in agg_paths
for expected
in self.
expected))
135 if __name__ ==
'__main__':
136 print(
'SYS ARGS:', sys.argv)
137 rostest.run(PKG, sys.argv[0], TestAddAnalyzer, sys.argv)