rviz_config_check.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
35 
36 """
37 Integration test node that checks if the rviz are working.
38 see example below
39 
40 <test name="rviz_config_check"
41  test-name="rviz_config_check"
42  pkg="jsk_rviz_plugins" type="rviz_config_check.py">
43  <rosparam>
44  wait_time : 5
45  topics:
46  - name: another topic name
47  timeout: timeout for the topic
48  </rosparam>
49 </test>
50 
51 Author: Kei Okada <kei.okada@gmail.com>
52 """
53 
54 import os
55 import sys
56 import time
57 import unittest
58 
59 import rospy
60 import rosnode
61 import rosgraph
62 import rosservice
63 
64 
65 PKG = 'jsk_rviz_plugins'
66 NAME = 'rviz_config_check'
67 
68 
69 class RvizConfigCheck(unittest.TestCase):
70  def __init__(self, *args):
71  super(self.__class__, self).__init__(*args)
72  rospy.init_node(NAME)
73  # scrape rosparam
74  # check rviz arive at least test_duration
75  self.test_duration = float(rospy.get_param('~test_duration', 5.))
76  # topics to check
77  self.topics = {}
78  params = rospy.get_param('~topics', [])
79  for param in params:
80  if 'name' not in param:
81  self.fail("'name' field in rosparam is required but not specified.")
82  topic = {'timeout': 10, 'type': None} # this is not used
83  topic.update(param)
84  self.topics[topic['name']] = topic
85 
86  def setUp(self):
87  # warn on /use_sim_time is true
88  use_sim_time = rospy.get_param('/use_sim_time', False)
89  t_start = time.time()
90  while not rospy.is_shutdown() and \
91  use_sim_time and (rospy.Time.now() == rospy.Time(0)):
92  rospy.logwarn_throttle(
93  1, '/use_sim_time is specified and rostime is 0, /clock is published?')
94  if time.time() - t_start > 10:
95  self.fail('Timed out (10s) of /clock publication.')
96  # must use time.sleep because /clock isn't yet published, so rospy.sleep hangs.
97  time.sleep(0.1)
98 
99  # Use `rosservice find rviz/SendFilePath` to get rviz node name
100  rviz_service_names = []
101  while not rospy.is_shutdown() and len(rviz_service_names) == 0:
102  rviz_service_names = rosservice.rosservice_find('rviz/SendFilePath')
103  if len(rviz_service_names) == 0:
104  rospy.logwarn("[{}] Waiting rviz service (rviz/SendFilePath) for {:.3f} sec".format(rospy.get_name(), time.time() - t_start))
105  # rviz/SendFilePath only available >melodic
106  if os.environ['ROS_DISTRO'] < 'melodic':
107  rviz_service_names = rosservice.rosservice_find('std_srvs/Empty')
108  time.sleep(0.1)
109 
110  # check if
111  rospy.logwarn("[{}] Found rviz service names {}".format(rospy.get_name(), rviz_service_names))
112  rviz_node_names = set(map(rosservice.get_service_node, rviz_service_names))
113  rospy.logwarn("[{}] Found rviz node names {}".format(rospy.get_name(), rviz_node_names))
114  if len(rviz_node_names) == 0:
115  rospy.logerr("[{}] Could not find rviz nodes".format(rospy.get_name()))
116  raise AssertionError
117 
118  self.rviz_node_name = list(rviz_node_names)[0]
119 
120  def test_rviz_exists(self):
121  rospy.logwarn("[{}] Check rviz node exists {}".format(rospy.get_name(), self.rviz_node_name))
122  subs = []
123  t_start = time.time()
124  while not rospy.is_shutdown():
125  t_now = time.time()
126  t_elapsed = t_now - t_start
127  if t_elapsed > self.test_duration:
128  break
129 
130  # info = rosnode.rosnode_info(self.rviz_node_name) does not return values, so we need to expand functions
131  node_api = rosnode.rosnode_ping(self.rviz_node_name, max_count=1, skip_cache=True)
132  if not node_api:
133  rospy.logerr("[{}] Could not find rviz node api on rosmaster".format(rospy.get_name()))
134  raise AssertionError
135 
136  rospy.logwarn("[{}] {:.3f} Rviz node found at {}".format(rospy.get_name(), t_elapsed, node_api))
137 
138  # check if topic exists
139  master = rosgraph.Master('/rosnode')
140 
141  state = master.getSystemState()
142  subs.extend(sorted([t for t, l in state[1] if self.rviz_node_name in l]))
143  subs = list(set(subs))
144  rospy.logwarn('[{}] rviz subscribes {}'.format(rospy.get_name(), subs))
145  time.sleep(0.5)
146 
147  for topic in self.topics:
148  if topic in subs:
149  rospy.logwarn('[{}] rviz subscribes {}'.format(rospy.get_name(), topic))
150  else:
151  rospy.logerr('[{}] rviz did not subscribes {}'.format(rospy.get_name(), topic))
152  raise AssertionError
153 
154  rospy.logwarn("[{}] rviz keep alive for {}[sec] and found {}".format(rospy.get_name(), self.test_duration, self.topics))
155  self.assertTrue(True, "check {}/rviz alives".format(rospy.get_name()))
156 
157 
158 if __name__ == '__main__':
159  import rostest
160  rostest.run(PKG, NAME, RvizConfigCheck, sys.argv)
rviz_config_check.RvizConfigCheck.rviz_node_name
rviz_node_name
Definition: rviz_config_check.py:118
rviz_config_check.RvizConfigCheck.__init__
def __init__(self, *args)
Definition: rviz_config_check.py:70
rviz_config_check.RvizConfigCheck.test_duration
test_duration
Definition: rviz_config_check.py:75
rviz_config_check.RvizConfigCheck.setUp
def setUp(self)
Definition: rviz_config_check.py:86
rviz_config_check.RvizConfigCheck.test_rviz_exists
def test_rviz_exists(self)
Definition: rviz_config_check.py:120
rviz_config_check.RvizConfigCheck
Definition: rviz_config_check.py:69
rviz_config_check.RvizConfigCheck.topics
topics
Definition: rviz_config_check.py:77


jsk_rviz_plugins
Author(s): Kei Okada , Yohei Kakiuchi , Shohei Fujii , Ryohei Ueda
autogenerated on Fri Dec 13 2024 03:49:57