00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030 """
00031 Baxter RSDK Smoke Tests
00032 """
00033
00034 import traceback
00035 import threading
00036 import Queue
00037
00038 import rospy
00039
00040 import cv
00041 import cv_bridge
00042 import rospkg
00043 import std_msgs
00044
00045 from geometry_msgs.msg import (
00046 Point,
00047 Pose,
00048 PoseStamped,
00049 Quaternion,
00050 )
00051 from sensor_msgs.msg import (
00052 Image,
00053 JointState,
00054 )
00055
00056 import baxter_dataflow
00057 import baxter_interface
00058
00059 from baxter_core_msgs.msg import (
00060 AnalogIOStates,
00061 EndEffectorState,
00062 )
00063 from baxter_core_msgs.srv import (
00064 ListCameras,
00065 SolvePositionIK,
00066 SolvePositionIKRequest,
00067 )
00068 from baxter_interface import CHECK_VERSION
00069
00070
00071 class SmokeTest(object):
00072 """
00073 Smoke tests base class.
00074 """
00075 def __init__(self, name):
00076 self._name = name
00077 self._rs = baxter_interface.RobotEnable()
00078 self._rp = rospkg.RosPack()
00079 self.result = [False, '']
00080
00081 def start_test(self):
00082 """
00083 Set up pub/sub/actions and run test.
00084
00085 Results return in form [Bool Success,'Stacktrace if Failure']
00086 """
00087 raise NotImplementedError()
00088
00089 def finish_test(self, filename):
00090 """
00091 Store results to output file.
00092 """
00093
00094 with open(filename, "a") as testlog:
00095 testlog.write(
00096 "Test: %s ------ Result: %s\n" %
00097 (self._name, "Success" if self.result[0] else "Failure",)
00098 )
00099 if self.result[0] == False:
00100 testlog.write("%s\n%s%s\n" %
00101 ('*' * 40, self.result[1], '*' * 40,)
00102 )
00103 print "Results of %s saved to file: %s" % (self._name, filename,)
00104 print ("------- Result: %s -------\n\n" %
00105 ("Success" if self.result[0] else "Failure",))
00106
00107 def return_failure(self, trace):
00108 """Commonly used failure return
00109 """
00110 self.result[0] = False
00111 self.result[1] = trace
00112 try:
00113 self._rs.disable()
00114 except Exception:
00115 pass
00116
00117
00118 class Enable(SmokeTest):
00119 """
00120 Verify ability to enable, check state and disable baxter.
00121 """
00122 def __init__(self, name='Enable'):
00123 super(Enable, self).__init__(name)
00124
00125 def start_test(self):
00126 """Runs Enable Smoke Test
00127 """
00128 try:
00129 print "Test: Enable - Zero G for 10 Seconds"
00130 self._rs.enable()
00131
00132 rospy.sleep(10.0)
00133 print "Test: State"
00134 print self._rs.state()
00135 print "Test: Disable"
00136 self._rs.disable()
00137 self.result[0] = True
00138 except Exception:
00139 self.return_failure(traceback.format_exc())
00140
00141
00142 class Messages(SmokeTest):
00143 """
00144 Verify messages being published and ability to subscribe.
00145 """
00146 def __init__(self, name='Messages'):
00147 super(Messages, self).__init__(name)
00148
00149 def start_test(self):
00150 """
00151 Runs Messages Smoke Test.
00152 """
00153 try:
00154 msg = 'robot/joint_states'
00155 print "Test: Subscribe to topic: /%s" % msg
00156 rospy.wait_for_message(msg, JointState, 5.0)
00157 msg = 'robot/analog_io_states'
00158 print "Test: Subscribe to topic: /%s" % msg
00159 rospy.wait_for_message(
00160 msg,
00161 AnalogIOStates,
00162 5.0
00163 )
00164 self.result[0] = True
00165 except:
00166 self.return_failure(traceback.format_exc())
00167
00168
00169 class Services(SmokeTest):
00170 """
00171 Verify services available and ability to make calls as client.
00172 """
00173 def __init__(self, name='Services'):
00174 super(Services, self).__init__(name)
00175
00176 def start_test(self):
00177 """Runs Services Smoke Test
00178 """
00179 try:
00180 srv = '/ExternalTools/left/PositionKinematicsNode/IKService'
00181 print "Test: Service availability: %s" % srv
00182 rospy.wait_for_service(srv, 5.0)
00183 print "Test: IK service call Solve IK"
00184 iksvc = rospy.ServiceProxy(srv, SolvePositionIK)
00185 ikreq = SolvePositionIKRequest()
00186 pose = PoseStamped(
00187 header=std_msgs.msg.Header(
00188 stamp=rospy.Time.now(), frame_id='base'),
00189 pose=Pose(
00190 position=Point(
00191 x=0.657579481614,
00192 y=0.851981417433,
00193 z=0.0388352386502,
00194 ),
00195 orientation=Quaternion(
00196 x=-0.366894936773,
00197 y=0.885980397775,
00198 z=0.108155782462,
00199 w=0.262162481772,
00200 ),
00201 )
00202 )
00203 ikreq.pose_stamp.append(pose)
00204 iksvc(ikreq)
00205 self.result[0] = True
00206 except Exception:
00207 self.return_failure(traceback.format_exc())
00208
00209
00210 class Head(SmokeTest):
00211 """
00212 Move the head pan and tilt, display image to screen.
00213 """
00214 def __init__(self, name='Head'):
00215 super(Head, self).__init__(name)
00216
00217 def start_test(self):
00218 """
00219 Runs Head Smoke Test.
00220 """
00221 try:
00222 print "Enabling robot..."
00223 self._rs.enable()
00224 print "Test: Moving Head to Neutral Location"
00225 head = baxter_interface.Head()
00226 head.set_pan(0.0, 5.0)
00227 print "Test: Pan Head"
00228 head.set_pan(1.0, 5.0)
00229 head.set_pan(-1.0, 5.0)
00230 head.set_pan(0.0, 5.0)
00231 print "Test: Nod Head"
00232 for _ in xrange(3):
00233 head.command_nod()
00234 print "Test: Display Image on Screen - 5 seconds"
00235 image_path = (self._rp.get_path('baxter_tools') +
00236 '/share/images')
00237 img = cv.LoadImage(image_path + '/baxterworking.png')
00238 msg = cv_bridge.CvBridge().cv_to_imgmsg(img)
00239 pub = rospy.Publisher('/robot/xdisplay', Image, latch=True)
00240 pub.publish(msg)
00241 rospy.sleep(5.0)
00242 img = cv.LoadImage(image_path + '/researchsdk.png')
00243 msg = cv_bridge.CvBridge().cv_to_imgmsg(img)
00244 pub.publish(msg)
00245 print "Disabling robot..."
00246 self._rs.disable()
00247 self.result[0] = True
00248 except Exception:
00249 self.return_failure(traceback.format_exc())
00250
00251
00252 class MoveArms(SmokeTest):
00253 """
00254 Move both arms to numerous joint positions.
00255 """
00256 def __init__(self, name='MoveArms'):
00257 super(MoveArms, self).__init__(name)
00258
00259 def start_test(self):
00260 """Runs MoveArms Smoke Test.
00261 """
00262
00263 def move_thread(limb, angle, queue, timeout=15.0):
00264 """
00265 Threaded joint movement allowing for simultaneous joint moves.
00266 """
00267 try:
00268 limb.move_to_joint_positions(angle, timeout)
00269 queue.put(None)
00270 except Exception, exception:
00271 queue.put(traceback.format_exc())
00272 queue.put(exception)
00273
00274 try:
00275 print "Enabling robot..."
00276 self._rs.enable()
00277 print "Test: Create Limb Instances"
00278 right = baxter_interface.Limb('right')
00279 left = baxter_interface.Limb('left')
00280 left_queue = Queue.Queue()
00281 right_queue = Queue.Queue()
00282
00283
00284
00285
00286 joint_moves = (
00287 [0.0, -0.55, 0.0, 0.75, 0.0, 1.26, 0.0],
00288 [0.5, -0.8, 2.8, 0.15, 0.0, 1.9, 2.8],
00289 [-0.1, -0.8, -1.0, 2.5, 0.0, -1.4, -2.8],
00290 [0.0, -0.55, 0.0, 0.75, 0.0, 1.26, 0.0],
00291 )
00292 for move in joint_moves:
00293 print "Test: Moving to Joint Positions: ",
00294 print ", ".join("%.2f" % x for x in move)
00295 left_thread = threading.Thread(
00296 target=move_thread,
00297 args=(left,
00298 dict(zip(left.joint_names(), move)),
00299 left_queue
00300 )
00301 )
00302 right_thread = threading.Thread(
00303 target=move_thread,
00304 args=(right,
00305 dict(zip(right.joint_names(), move)),
00306 right_queue
00307 )
00308 )
00309 left_thread.daemon = True
00310 right_thread.daemon = True
00311 left_thread.start()
00312 right_thread.start()
00313 baxter_dataflow.wait_for(
00314 lambda: not (left_thread.is_alive() or
00315 right_thread.is_alive()),
00316 timeout=20.0,
00317 timeout_msg=("Timeout while waiting for arm move threads"
00318 " to finish"),
00319 rate=10,
00320 )
00321 left_thread.join()
00322 right_thread.join()
00323 result = left_queue.get()
00324 if not result is None:
00325 raise left_queue.get()
00326 result = right_queue.get()
00327 if not result is None:
00328 raise right_queue.get()
00329 rospy.sleep(1.0)
00330 print "Disabling robot..."
00331 self._rs.disable()
00332 self.result[0] = True
00333 rospy.sleep(5.0)
00334 except Exception:
00335 self.return_failure(traceback.format_exc())
00336
00337
00338 class Grippers(SmokeTest):
00339 """
00340 Calibrate and move grippers using torque, position, velocity control.
00341 """
00342 def __init__(self, name='Grippers'):
00343 super(Grippers, self).__init__(name)
00344
00345 def start_test(self):
00346 """
00347 Runs Grippers Smoke Test.
00348 """
00349 try:
00350 print "Enabling robot, Moving to Neutral Location..."
00351 self._rs.enable()
00352 for name in ['left', 'right']:
00353 limb = baxter_interface.Limb(name)
00354 gripper = baxter_interface.Gripper(name, CHECK_VERSION)
00355 limb.move_to_neutral()
00356 rospy.sleep(2.0)
00357 print "Test: Verify %s Gripper Type" % (name.capitalize(),)
00358 if gripper.type() is not 'electric':
00359 raise NameError("Test Requires Two Electric Grippers")
00360 s_topic = 'robot/end_effector/' + name + '_gripper/state'
00361 ee_state = rospy.wait_for_message(s_topic,
00362 EndEffectorState,
00363 5.0
00364 )
00365 print "Test: Reboot %s Gripper" % (name.capitalize(),)
00366 gripper.reboot()
00367 print "Test: Calibrating %s Gripper" % (name.capitalize(),)
00368 gripper.calibrate()
00369 print "Test: Close %s Gripper" % (name.capitalize(),)
00370 gripper.close(True)
00371 print "Test: Open %s Gripper" % (name.capitalize(),)
00372 gripper.open(True)
00373 print "Test: Close %s Gripper" % (name.capitalize(),)
00374 gripper.close(True)
00375 print "Test: Open %s Gripper" % (name.capitalize(),)
00376 gripper.open(True)
00377 print "Test: %s Gripper Position Moves" % (name.capitalize(),)
00378 gripper.command_position(50.0, True)
00379 gripper.command_position(75.0, True)
00380 gripper.command_position(0.0, True)
00381 gripper.command_position(100.0, True)
00382 print "Test: %s Gripper Velocity Moves" % (name.capitalize(),)
00383 gripper.set_moving_force(100.0)
00384 gripper.set_velocity(50.0)
00385 gripper.close(True)
00386 gripper.set_velocity(25.0)
00387 gripper.open(True)
00388 gripper.set_velocity(100.0)
00389 gripper.close(True)
00390 gripper.open(True)
00391 gripper.set_velocity(50.0)
00392 gripper.set_moving_force(30.0)
00393 gripper.close(True)
00394 gripper.open(True)
00395 print "Disabling robot..."
00396 self._rs.disable()
00397 self.result[0] = True
00398 except Exception:
00399 self.return_failure(traceback.format_exc())
00400
00401
00402 class BlinkLEDs(SmokeTest):
00403 """
00404 Blink Navigator LEDs.
00405 """
00406 def __init__(self, name='BlinkLEDs'):
00407 super(BlinkLEDs, self).__init__(name)
00408
00409 def start_test(self):
00410 """
00411 Runs BlinkLEDs Smoke Test.
00412 """
00413
00414 def _blink(io):
00415 """
00416 Toggle itb lights.
00417 """
00418 for i in range(9):
00419 io.set_output(i % 2)
00420 rospy.sleep(0.1)
00421
00422 try:
00423 itb_names = (
00424 'left_itb_light_outer',
00425 'left_itb_light_inner',
00426 'torso_left_itb_light_outer',
00427 'torso_left_itb_light_inner',
00428 'right_itb_light_outer',
00429 'right_itb_light_inner',
00430 'torso_right_itb_light_outer',
00431 'torso_right_itb_light_inner',
00432 )
00433
00434 for itb in itb_names:
00435 print "Test: Blink %s" % itb
00436 io = baxter_interface.DigitalIO(itb)
00437 _blink(io)
00438 self.result[0] = True
00439 except Exception:
00440 self.return_failure(traceback.format_exc())
00441
00442
00443 class Cameras(SmokeTest):
00444 """
00445 Verify camera publishing and visualization.
00446 """
00447 def __init__(self, name='Cameras'):
00448 super(Cameras, self).__init__(name)
00449
00450 def start_test(self):
00451 """Runs Cameras Smoke Test
00452 """
00453
00454 xpub_img = rospy.Publisher(
00455 '/robot/xdisplay',
00456 Image
00457 )
00458
00459 def _display(camera, name):
00460 """
00461 Open camera and display to screen for 10 seconds
00462 """
00463 camera.close()
00464 camera.resolution = (960, 600,)
00465 print "Test: Opening %s" % name
00466 camera.open()
00467 print "Test: Display %s to Screen - 10 Seconds" % name
00468 rospy.Subscriber(
00469 '/cameras/' + name + "/image",
00470 Image,
00471 _repub_cb
00472 )
00473 rospy.sleep(10.0)
00474 camera.close()
00475 _reset_screen()
00476
00477 def _repub_cb(msg):
00478 """
00479 Camera image republish callback.
00480 """
00481 xpub_img.publish(msg)
00482
00483 def _reset_screen():
00484 """
00485 Reset the screen to research sdk image.
00486 """
00487 image_path = self._rp.get_path('baxter_tools')
00488 image_path += '/share/images/researchsdk.png'
00489 img = cv.LoadImage(image_path)
00490 msg = cv_bridge.CvBridge().cv_to_imgmsg(img)
00491 xpub_img.publish(msg)
00492
00493 def _reset_defaults():
00494 """
00495 Turn on the left/right_hand_cameras with default settings.
00496 """
00497 print "Restarting the Default Cameras..."
00498 for i in range(1, 3):
00499 camera = baxter_interface.CameraController(camera_names[i])
00500 camera.resolution = (640, 400,)
00501 camera.fps = 25
00502 camera.open()
00503
00504 try:
00505 print "Enabling robot..."
00506 self._rs.enable()
00507 print ("Test: Verify Left_Hand, Right_Hand, and Head "
00508 "Cameras Present")
00509 camera_names = (
00510 'head_camera',
00511 'left_hand_camera',
00512 'right_hand_camera',
00513 )
00514 srv = "/cameras/list"
00515 rospy.wait_for_service(srv, 5.0)
00516 camera_list_srv = rospy.ServiceProxy(srv, ListCameras)
00517 camera_list = camera_list_srv()
00518 for camera_name in camera_names:
00519 if not camera_name in camera_list.cameras:
00520 raise NameError("Could not find camera - %s" % camera_name)
00521 else:
00522 camera = baxter_interface.CameraController(camera_name)
00523 camera.close()
00524 for camera_name in camera_names:
00525 camera = baxter_interface.CameraController(camera_name)
00526 _display(camera, camera_name)
00527 _reset_defaults()
00528 print "Disabling robot..."
00529 self._rs.disable()
00530 self.result[0] = True
00531 except Exception:
00532 self.return_failure(traceback.format_exc())