1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 """
36 Interface for using rostest from other Python code as well as running
37 Python unittests with additional reporting mechanisms and rosbuild
38 (CMake) integration.
39 """
40
41 from __future__ import print_function
42
43 import sys
44 import rosunit
45
46 import rosgraph
47
48 XML_OUTPUT_FLAG = '--gtest_output=xml:'
49
50 _GLOBAL_CALLER_ID = '/script'
53 """
54 Get an XMLRPC handle to the Master. It is recommended to use the
55 `rosgraph.masterapi` library instead, as it provides many
56 conveniences.
57
58 @return: XML-RPC proxy to ROS master
59 @rtype: xmlrpclib.ServerProxy
60 """
61 try:
62 import xmlrpc.client as xmlrpcclient
63 except ImportError:
64 import xmlrpclib as xmlrpcclient
65 uri = rosgraph.get_master_uri()
66 return xmlrpcclient.ServerProxy(uri)
67
69 """
70 Check whether or not master think subscriber_id subscribes to topic
71
72 :returns: ``True`` if still register as a subscriber, ``bool``
73 :raises: IOError If communication with master fails
74 """
75 m = get_master()
76 code, msg, state = m.getSystemState(_GLOBAL_CALLER_ID)
77 if code != 1:
78 raise IOError("Unable to retrieve master state: %s"%msg)
79 _, subscribers, _ = state
80 for t, l in subscribers:
81 if t == topic:
82 return subscriber_id in l
83 else:
84 return False
85
87 """
88 Predicate to check whether or not master think publisher_id
89 publishes topic
90 :returns: ``True`` if still register as a publisher, ``bool``
91 :raises: IOError If communication with master fails
92 """
93 m = get_master()
94 code, msg, state = m.getSystemState(_GLOBAL_CALLER_ID)
95 if code != 1:
96 raise IOError("Unable to retrieve master state: %s"%msg)
97 pubs, _, _ = state
98 for t, l in pubs:
99 if t == topic:
100 return publisher_id in l
101 else:
102 return False
103
104 -def rosrun(package, test_name, test, sysargs=None):
105 """
106 Run a rostest/unittest-based integration test.
107
108 @param package: name of package that test is in
109 @type package: str
110 @param test_name: name of test that is being run
111 @type test_name: str
112 @param test: test class
113 @type test: unittest.TestCase
114 @param sysargs: command-line args. If not specified, this defaults to sys.argv. rostest
115 will look for the --text and --gtest_output parameters
116 @type sysargs: list
117 """
118 if sysargs is None:
119
120 import sys
121 sysargs = sys.argv
122
123
124 result_file = None
125 for arg in sysargs:
126 if arg.startswith(XML_OUTPUT_FLAG):
127 result_file = arg[len(XML_OUTPUT_FLAG):]
128 text_mode = '--text' in sysargs
129 coverage_mode = '--cov' in sysargs
130 if coverage_mode:
131 _start_coverage(package)
132
133 import unittest
134 import rospy
135
136 suite = unittest.TestLoader().loadTestsFromTestCase(test)
137 if text_mode:
138 result = unittest.TextTestRunner(verbosity=2).run(suite)
139 else:
140 result = rosunit.create_xml_runner(package, test_name, result_file).run(suite)
141 if coverage_mode:
142 _stop_coverage(package)
143 rosunit.print_unittest_summary(result)
144
145
146 rospy.signal_shutdown('test complete')
147 if not result.wasSuccessful():
148 import sys
149 sys.exit(1)
150
151
152 run = rosrun
153
154 import warnings
156 """This is a decorator which can be used to mark functions
157 as deprecated. It will result in a warning being emmitted
158 when the function is used."""
159 def newFunc(*args, **kwargs):
160 warnings.warn("Call to deprecated function %s." % func.__name__,
161 category=DeprecationWarning, stacklevel=2)
162 return func(*args, **kwargs)
163 newFunc.__name__ = func.__name__
164 newFunc.__doc__ = func.__doc__
165 newFunc.__dict__.update(func.__dict__)
166 return newFunc
167
168 @deprecated
169 -def unitrun(package, test_name, test, sysargs=None, coverage_packages=None):
170 """
171 Wrapper routine from running python unitttests with
172 JUnit-compatible XML output. This is meant for unittests that do
173 not not need a running ROS graph (i.e. offline tests only).
174
175 This enables JUnit-compatible test reporting so that
176 test results can be reported to higher-level tools.
177
178 @param package: name of ROS package that is running the test
179 @type package: str
180 @param coverage_packages: list of Python package to compute coverage results for. Defaults to package
181 @type coverage_packages: [str]
182 """
183 rosunit.unitrun(package, test_name, test, sysargs=sysargs, coverage_packages=coverage_packages)
184
185
186 _cov = None
188 global _cov
189 try:
190 import coverage
191 try:
192 _cov = coverage.coverage()
193
194 _cov.load()
195 _cov.start()
196 except coverage.CoverageException:
197 print("WARNING: you have an older version of python-coverage that is not support. Please update to the version provided by 'easy_install coverage'", file=sys.stderr)
198 except ImportError as e:
199 print("""WARNING: cannot import python-coverage, coverage tests will not run.
200 To install coverage, run 'easy_install coverage'""", file=sys.stderr)
201 try:
202
203 for package in packages:
204 if package in sys.modules:
205 reload(sys.modules[package])
206 except ImportError as e:
207 print("WARNING: cannot import '%s', will not generate coverage report"%package, file=sys.stderr)
208 return
209
211 """
212 @param packages: list of packages to generate coverage reports for
213 @type packages: [str]
214 @param html: (optional) if not None, directory to generate html report to
215 @type html: str
216 """
217 if _cov is None:
218 return
219 import sys, os
220 try:
221 _cov.stop()
222
223 _cov.save()
224
225
226
227
228
229 if os.path.exists('.coverage-modules'):
230 with open('.coverage-modules','r') as f:
231 all_packages = set([x for x in f.read().split('\n') if x.strip()] + packages)
232 else:
233 all_packages = set(packages)
234 with open('.coverage-modules','w') as f:
235 f.write('\n'.join(all_packages)+'\n')
236
237 try:
238
239 all_mods = []
240
241
242 for package in packages:
243 pkg = __import__(package)
244 m = [v for v in sys.modules.values() if v and v.__name__.startswith(package)]
245 all_mods.extend(m)
246
247
248 _cov.report(m, show_missing=0)
249 for mod in m:
250 res = _cov.analysis(mod)
251 print("\n%s:\nMissing lines: %s"%(res[0], res[3]))
252
253 if html:
254
255 print("="*80+"\ngenerating html coverage report to %s\n"%html+"="*80)
256 _cov.html_report(all_mods, directory=html)
257 except ImportError as e:
258 print("WARNING: cannot import '%s', will not generate coverage report"%package, file=sys.stderr)
259 except ImportError as e:
260 print("""WARNING: cannot import python-coverage, coverage tests will not run.
261 To install coverage, run 'easy_install coverage'""", file=sys.stderr)
262
263
264
265 -def rostestmain():
266
267 from rostest.rostest_main import rostestmain as _main
268 _main()
269