1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 """
36 Interface for using rostest from other Python code as well as running
37 Python unittests with additional reporting mechanisms and rosbuild
38 (CMake) integration.
39 """
40
41 from __future__ import print_function
42
43 import sys
44 import rosunit
45
46 import rosgraph
47
48 XML_OUTPUT_FLAG = '--gtest_output=xml:'
49
50 _GLOBAL_CALLER_ID = '/script'
53 """
54 Get an XMLRPC handle to the Master. It is recommended to use the
55 `rosgraph.masterapi` library instead, as it provides many
56 conveniences.
57
58 @return: XML-RPC proxy to ROS master
59 @rtype: xmlrpclib.ServerProxy
60 """
61 try:
62 import xmlrpc.client as xmlrpcclient
63 except ImportError:
64 import xmlrpclib as xmlrpcclient
65 uri = rosgraph.get_master_uri()
66 return xmlrpcclient.ServerProxy(uri)
67
69 """
70 Check whether or not master think subscriber_id subscribes to topic
71
72 :returns: ``True`` if still register as a subscriber, ``bool``
73 :raises: IOError If communication with master fails
74 """
75 m = get_master()
76 code, msg, state = m.getSystemState(_GLOBAL_CALLER_ID)
77 if code != 1:
78 raise IOError("Unable to retrieve master state: %s"%msg)
79 _, subscribers, _ = state
80 for t, l in subscribers:
81 if t == topic:
82 return subscriber_id in l
83 else:
84 return False
85
87 """
88 Predicate to check whether or not master think publisher_id
89 publishes topic
90 :returns: ``True`` if still register as a publisher, ``bool``
91 :raises: IOError If communication with master fails
92 """
93 m = get_master()
94 code, msg, state = m.getSystemState(_GLOBAL_CALLER_ID)
95 if code != 1:
96 raise IOError("Unable to retrieve master state: %s"%msg)
97 pubs, _, _ = state
98 for t, l in pubs:
99 if t == topic:
100 return publisher_id in l
101 else:
102 return False
103
104 -def rosrun(package, test_name, test, sysargs=None):
105 """
106 Run a rostest/unittest-based integration test.
107
108 @param package: name of package that test is in
109 @type package: str
110 @param test_name: name of test that is being run
111 @type test_name: str
112 @param test: a test case instance or a name resolving to a test case or suite
113 @type test: unittest.TestCase, or string
114 @param sysargs: command-line args. If not specified, this defaults to sys.argv. rostest
115 will look for the --text and --gtest_output parameters
116 @type sysargs: list
117 """
118 if sysargs is None:
119
120 import sys
121 sysargs = sys.argv
122
123
124 result_file = None
125 for arg in sysargs:
126 if arg.startswith(XML_OUTPUT_FLAG):
127 result_file = arg[len(XML_OUTPUT_FLAG):]
128 text_mode = '--text' in sysargs
129 coverage_mode = '--cov' in sysargs
130 if coverage_mode:
131 _start_coverage([package])
132
133 import unittest
134 import rospy
135
136 suite = None
137 if isinstance(test, str):
138 suite = unittest.TestLoader().loadTestsFromName(test)
139 else:
140
141 suite = unittest.TestLoader().loadTestsFromTestCase(test)
142
143 if text_mode:
144 result = unittest.TextTestRunner(verbosity=2).run(suite)
145 else:
146 result = rosunit.create_xml_runner(package, test_name, result_file).run(suite)
147 if coverage_mode:
148 _stop_coverage([package])
149 rosunit.print_unittest_summary(result)
150
151
152 rospy.signal_shutdown('test complete')
153 if not result.wasSuccessful():
154 import sys
155 sys.exit(1)
156
157
158 run = rosrun
159
160 import warnings
162 """This is a decorator which can be used to mark functions
163 as deprecated. It will result in a warning being emmitted
164 when the function is used."""
165 def newFunc(*args, **kwargs):
166 warnings.warn("Call to deprecated function %s." % func.__name__,
167 category=DeprecationWarning, stacklevel=2)
168 return func(*args, **kwargs)
169 newFunc.__name__ = func.__name__
170 newFunc.__doc__ = func.__doc__
171 newFunc.__dict__.update(func.__dict__)
172 return newFunc
173
174 @deprecated
175 -def unitrun(package, test_name, test, sysargs=None, coverage_packages=None):
176 """
177 Wrapper routine from running python unitttests with
178 JUnit-compatible XML output. This is meant for unittests that do
179 not not need a running ROS graph (i.e. offline tests only).
180
181 This enables JUnit-compatible test reporting so that
182 test results can be reported to higher-level tools.
183
184 @param package: name of ROS package that is running the test
185 @type package: str
186 @param coverage_packages: list of Python package to compute coverage results for. Defaults to package
187 @type coverage_packages: [str]
188 """
189 rosunit.unitrun(package, test_name, test, sysargs=sysargs, coverage_packages=coverage_packages)
190
191
192 _cov = None
194 global _cov
195 try:
196 import coverage
197 try:
198 _cov = coverage.coverage()
199
200 _cov.load()
201 _cov.start()
202 except coverage.CoverageException:
203 print("WARNING: you have an older version of python-coverage that is not support. Please update to the version provided by 'easy_install coverage'", file=sys.stderr)
204 except ImportError as e:
205 print("""WARNING: cannot import python-coverage, coverage tests will not run.
206 To install coverage, run 'easy_install coverage'""", file=sys.stderr)
207 try:
208
209 for package in packages:
210 if package in sys.modules:
211 reload(sys.modules[package])
212 except ImportError as e:
213 print("WARNING: cannot import '%s', will not generate coverage report"%package, file=sys.stderr)
214 return
215
217 """
218 @param packages: list of packages to generate coverage reports for
219 @type packages: [str]
220 @param html: (optional) if not None, directory to generate html report to
221 @type html: str
222 """
223 if _cov is None:
224 return
225 import sys, os
226 try:
227 _cov.stop()
228
229 _cov.save()
230
231
232
233
234
235 if os.path.exists('.coverage-modules'):
236 with open('.coverage-modules','r') as f:
237 all_packages = set([x for x in f.read().split('\n') if x.strip()] + packages)
238 else:
239 all_packages = set(packages)
240 with open('.coverage-modules','w') as f:
241 f.write('\n'.join(all_packages)+'\n')
242
243 try:
244
245 all_mods = []
246
247
248 for package in packages:
249 pkg = __import__(package)
250 m = [v for v in sys.modules.values() if v and v.__name__.startswith(package)]
251 all_mods.extend(m)
252
253
254 _cov.report(m, show_missing=0)
255 for mod in m:
256 res = _cov.analysis(mod)
257 print("\n%s:\nMissing lines: %s"%(res[0], res[3]))
258
259 if html:
260
261 print("="*80+"\ngenerating html coverage report to %s\n"%html+"="*80)
262 _cov.html_report(all_mods, directory=html)
263 except ImportError as e:
264 print("WARNING: cannot import '%s', will not generate coverage report"%package, file=sys.stderr)
265 except ImportError as e:
266 print("""WARNING: cannot import python-coverage, coverage tests will not run.
267 To install coverage, run 'easy_install coverage'""", file=sys.stderr)
268
269
270
271 -def rostestmain():
272
273 from rostest.rostest_main import rostestmain as _main
274 _main()
275