add_analyzers_test.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # Software License Agreement (BSD License)
00003 #
00004 # Copyright (c) 2009, Willow Garage, Inc.
00005 # All rights reserved.
00006 #
00007 # Redistribution and use in source and binary forms, with or without
00008 # modification, are permitted provided that the following conditions
00009 # are met:
00010 #
00011 #  * Redistributions of source code must retain the above copyright
00012 #    notice, this list of conditions and the following disclaimer.
00013 #  * Redistributions in binary form must reproduce the above
00014 #    copyright notice, this list of conditions and the following
00015 #    disclaimer in the documentation and/or other materials provided
00016 #    with the distribution.
00017 #  * Neither the name of the Willow Garage nor the names of its
00018 #    contributors may be used to endorse or promote products derived
00019 #    from this software without specific prior written permission.
00020 #
00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032 # POSSIBILITY OF SUCH DAMAGE.
00033 #
00034 
00035 import unittest
00036 import rospy, rostest
00037 import rosparam
00038 import optparse
00039 import sys
00040 import threading
00041 from bondpy import bondpy
00042 from diagnostic_msgs.srv import AddDiagnostics
00043 from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus
00044 
00045 PKG = 'diagnostic_aggregator'
00046 
00047 class TestAddAnalyzer(unittest.TestCase):
00048     def __init__(self, *args):
00049         super(TestAddAnalyzer, self).__init__(*args)
00050         rospy.init_node('test_add_analyzer')
00051         self.namespace = rospy.get_name()
00052         paramlist = rosparam.load_file(rospy.myargv()[1])
00053         # expect to receive these paths in the added analyzers
00054         self.expected = [paramlist[0][1] + analyzer['path'] for name, analyzer in paramlist[0][0]['analyzers'].iteritems()]
00055 
00056         self._mutex = threading.Lock()
00057         self.agg_msgs = {}
00058 
00059         # put parameters in the node namespace so they can be read by the aggregator
00060         for params, ns in paramlist:
00061             rosparam.upload_params(rospy.get_name() + '/' + ns, params)
00062 
00063         rospy.Subscriber('/diagnostics_agg', DiagnosticArray, self.agg_cb)
00064         self.pub = rospy.Publisher('/diagnostics', DiagnosticArray, queue_size=1)
00065 
00066     def agg_cb(self, msg):
00067         with self._mutex:
00068             for stat in msg.status:
00069                 self.agg_msgs[stat.name] = stat
00070 
00071     def add_analyzer(self):
00072         """Start a bond to the aggregator
00073         """
00074         namespace = rospy.resolve_name(rospy.get_name())
00075         self.bond = bondpy.Bond("/diagnostics_agg/bond" + namespace, namespace)
00076         self.bond.start()
00077         rospy.wait_for_service('/diagnostics_agg/add_diagnostics', timeout=10)
00078         add_diagnostics = rospy.ServiceProxy('/diagnostics_agg/add_diagnostics', AddDiagnostics)
00079         print(self.namespace)
00080         resp = add_diagnostics(load_namespace=self.namespace)
00081         self.assert_(resp.success, 'Service call was unsuccessful: {0}'.format(resp.message))
00082 
00083     def wait_for_agg(self):
00084         self.agg_msgs = {}
00085         while not self.agg_msgs and not rospy.is_shutdown():
00086             rospy.sleep(rospy.Duration(3))
00087 
00088     def test_add_agg(self):
00089         self.wait_for_agg()
00090 
00091         # confirm that the things we're going to add aren't there already
00092         with self._mutex:
00093             agg_paths = [msg.name for name, msg in self.agg_msgs.iteritems()]
00094             self.assert_(not any(expected in agg_paths for expected in self.expected))
00095             
00096         # add the new groups
00097         self.add_analyzer()
00098 
00099         arr = DiagnosticArray()
00100         arr.header.stamp = rospy.get_rostime()
00101         arr.status = [
00102             DiagnosticStatus(name='primary', message='hello-primary'),
00103             DiagnosticStatus(name='secondary', message='hello-secondary')
00104         ]
00105         self.pub.publish(arr)
00106         self.wait_for_agg()
00107         # the new aggregator data should contain the extra paths. At this point
00108         # the paths are probably still in the 'Other' group because the bond
00109         # hasn't been fully formed
00110         with self._mutex:
00111             agg_paths = [msg.name for name, msg in self.agg_msgs.iteritems()]
00112             self.assert_(all(expected in agg_paths for expected in self.expected))
00113 
00114         rospy.sleep(rospy.Duration(5)) # wait a bit for the new items to move to the right group
00115         arr.header.stamp = rospy.get_rostime()
00116         self.pub.publish(arr) # publish again to get the correct groups to show OK
00117         self.wait_for_agg()
00118 
00119         for name, msg in self.agg_msgs.iteritems():
00120             if name in self.expected: # should have just received messages on the analyzer
00121                 self.assert_(msg.message == 'OK')
00122                 
00123             agg_paths = [msg.name for name, msg in self.agg_msgs.iteritems()]
00124             self.assert_(all(expected in agg_paths for expected in self.expected))
00125                 
00126 
00127         self.bond.shutdown()
00128         rospy.sleep(rospy.Duration(5)) # wait a bit for the analyzers to unload
00129         self.wait_for_agg()
00130         # the aggregator data should no longer contain the paths once the bond is shut down
00131         with self._mutex:
00132             agg_paths = [msg.name for name, msg in self.agg_msgs.iteritems()]
00133             self.assert_(not any(expected in agg_paths for expected in self.expected))
00134         
00135 if __name__ == '__main__':
00136     print 'SYS ARGS:', sys.argv
00137     rostest.run(PKG, sys.argv[0], TestAddAnalyzer, sys.argv)


diagnostic_aggregator
Author(s): Kevin Watts, Brice Rebsamen
autogenerated on Mon Aug 14 2017 02:51:51