00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030 """
00031 Baxter RSDK Smoke Tests
00032 """
00033
00034 import traceback
00035 import threading
00036 import Queue
00037
00038 import rospy
00039
00040 import cv2
00041 import cv_bridge
00042 import rospkg
00043 import std_msgs
00044
00045 from geometry_msgs.msg import (
00046 Point,
00047 Pose,
00048 PoseStamped,
00049 Quaternion,
00050 )
00051 from sensor_msgs.msg import (
00052 Image,
00053 JointState,
00054 )
00055
00056 import baxter_dataflow
00057 import baxter_interface
00058
00059 from baxter_core_msgs.msg import (
00060 AnalogIOStates,
00061 EndEffectorState,
00062 )
00063 from baxter_core_msgs.srv import (
00064 ListCameras,
00065 SolvePositionIK,
00066 SolvePositionIKRequest,
00067 )
00068 from baxter_interface import CHECK_VERSION
00069
00070
00071 class SmokeTest(object):
00072 """
00073 Smoke tests base class.
00074 """
00075 def __init__(self, name):
00076 self._name = name
00077 self._rs = baxter_interface.RobotEnable()
00078 self._rp = rospkg.RosPack()
00079 self.result = [False, '']
00080
00081 def start_test(self):
00082 """
00083 Set up pub/sub/actions and run test.
00084
00085 Results return in form [Bool Success,'Stacktrace if Failure']
00086 """
00087 raise NotImplementedError()
00088
00089 def finish_test(self, filename):
00090 """
00091 Store results to output file.
00092 """
00093
00094 with open(filename, "a") as testlog:
00095 testlog.write(
00096 "Test: %s ------ Result: %s\n" %
00097 (self._name, "Success" if self.result[0] else "Failure",)
00098 )
00099 if self.result[0] == False:
00100 testlog.write("%s\n%s%s\n" %
00101 ('*' * 40, self.result[1], '*' * 40,)
00102 )
00103 print "Results of %s saved to file: %s" % (self._name, filename,)
00104 print ("------- Result: %s -------\n\n" %
00105 ("Success" if self.result[0] else "Failure",))
00106
00107 def return_failure(self, trace):
00108 """Commonly used failure return
00109 """
00110 self.result[0] = False
00111 self.result[1] = trace
00112 try:
00113 self._rs.disable()
00114 except Exception:
00115 pass
00116
00117
00118 class Enable(SmokeTest):
00119 """
00120 Verify ability to enable, check state and disable baxter.
00121 """
00122 def __init__(self, name='Enable'):
00123 super(Enable, self).__init__(name)
00124
00125 def start_test(self):
00126 """Runs Enable Smoke Test
00127 """
00128 try:
00129 print "Test: Enable - Zero G for 10 Seconds"
00130 self._rs.enable()
00131
00132 rospy.sleep(10.0)
00133 print "Test: State"
00134 print self._rs.state()
00135 print "Test: Disable"
00136 self._rs.disable()
00137 self.result[0] = True
00138 except Exception:
00139 self.return_failure(traceback.format_exc())
00140
00141
00142 class Messages(SmokeTest):
00143 """
00144 Verify messages being published and ability to subscribe.
00145 """
00146 def __init__(self, name='Messages'):
00147 super(Messages, self).__init__(name)
00148
00149 def start_test(self):
00150 """
00151 Runs Messages Smoke Test.
00152 """
00153 try:
00154 msg = 'robot/joint_states'
00155 print "Test: Subscribe to topic: /%s" % msg
00156 rospy.wait_for_message(msg, JointState, 5.0)
00157 msg = 'robot/analog_io_states'
00158 print "Test: Subscribe to topic: /%s" % msg
00159 rospy.wait_for_message(
00160 msg,
00161 AnalogIOStates,
00162 5.0
00163 )
00164 self.result[0] = True
00165 except:
00166 self.return_failure(traceback.format_exc())
00167
00168
00169 class Services(SmokeTest):
00170 """
00171 Verify services available and ability to make calls as client.
00172 """
00173 def __init__(self, name='Services'):
00174 super(Services, self).__init__(name)
00175
00176 def start_test(self):
00177 """Runs Services Smoke Test
00178 """
00179 try:
00180 srv = '/ExternalTools/left/PositionKinematicsNode/IKService'
00181 print "Test: Service availability: %s" % srv
00182 rospy.wait_for_service(srv, 5.0)
00183 print "Test: IK service call Solve IK"
00184 iksvc = rospy.ServiceProxy(srv, SolvePositionIK)
00185 ikreq = SolvePositionIKRequest()
00186 pose = PoseStamped(
00187 header=std_msgs.msg.Header(
00188 stamp=rospy.Time.now(), frame_id='base'),
00189 pose=Pose(
00190 position=Point(
00191 x=0.657579481614,
00192 y=0.851981417433,
00193 z=0.0388352386502,
00194 ),
00195 orientation=Quaternion(
00196 x=-0.366894936773,
00197 y=0.885980397775,
00198 z=0.108155782462,
00199 w=0.262162481772,
00200 ),
00201 )
00202 )
00203 ikreq.pose_stamp.append(pose)
00204 iksvc(ikreq)
00205 self.result[0] = True
00206 except Exception:
00207 self.return_failure(traceback.format_exc())
00208
00209
00210 class Head(SmokeTest):
00211 """
00212 Move the head pan and tilt, display image to screen.
00213 """
00214 def __init__(self, name='Head'):
00215 super(Head, self).__init__(name)
00216
00217 def start_test(self):
00218 """
00219 Runs Head Smoke Test.
00220 """
00221 try:
00222 print "Enabling robot..."
00223 self._rs.enable()
00224 print "Test: Moving Head to Neutral Location"
00225 head = baxter_interface.Head()
00226 head.set_pan(0.0, 5.0)
00227 print "Test: Pan Head"
00228 head.set_pan(1.0, 5.0)
00229 head.set_pan(-1.0, 5.0)
00230 head.set_pan(0.0, 5.0)
00231 print "Test: Nod Head"
00232 for _ in xrange(3):
00233 head.command_nod()
00234 print "Test: Display Image on Screen - 5 seconds"
00235 image_path = (self._rp.get_path('baxter_tools') +
00236 '/share/images')
00237 img = cv2.imread(image_path + '/baxterworking.png')
00238 msg = cv_bridge.CvBridge().cv2_to_imgmsg(img)
00239 pub = rospy.Publisher('/robot/xdisplay', Image,
00240 latch=True, queue_size=10)
00241 pub.publish(msg)
00242 rospy.sleep(5.0)
00243 img = cv2.imread(image_path + '/researchsdk.png')
00244 msg = cv_bridge.CvBridge().cv2_to_imgmsg(img)
00245 pub.publish(msg)
00246 print "Disabling robot..."
00247 self._rs.disable()
00248 self.result[0] = True
00249 except Exception:
00250 self.return_failure(traceback.format_exc())
00251
00252
00253 class MoveArms(SmokeTest):
00254 """
00255 Move both arms to numerous joint positions.
00256 """
00257 def __init__(self, name='MoveArms'):
00258 super(MoveArms, self).__init__(name)
00259
00260 def start_test(self):
00261 """Runs MoveArms Smoke Test.
00262 """
00263
00264 def move_thread(limb, angle, queue, timeout=15.0):
00265 """
00266 Threaded joint movement allowing for simultaneous joint moves.
00267 """
00268 try:
00269 limb.move_to_joint_positions(angle, timeout)
00270 queue.put(None)
00271 except Exception, exception:
00272 queue.put(traceback.format_exc())
00273 queue.put(exception)
00274
00275 try:
00276 print "Enabling robot..."
00277 self._rs.enable()
00278 print "Test: Create Limb Instances"
00279 right = baxter_interface.Limb('right')
00280 left = baxter_interface.Limb('left')
00281 left_queue = Queue.Queue()
00282 right_queue = Queue.Queue()
00283
00284
00285
00286
00287 joint_moves = (
00288 [0.0, -0.55, 0.0, 0.75, 0.0, 1.26, 0.0],
00289 [0.5, -0.8, 2.8, 0.15, 0.0, 1.9, 2.8],
00290 [-0.1, -0.8, -1.0, 2.5, 0.0, -1.4, -2.8],
00291 [0.0, -0.55, 0.0, 0.75, 0.0, 1.26, 0.0],
00292 )
00293 for move in joint_moves:
00294 print "Test: Moving to Joint Positions: ",
00295 print ", ".join("%.2f" % x for x in move)
00296 left_thread = threading.Thread(
00297 target=move_thread,
00298 args=(left,
00299 dict(zip(left.joint_names(), move)),
00300 left_queue
00301 )
00302 )
00303 right_thread = threading.Thread(
00304 target=move_thread,
00305 args=(right,
00306 dict(zip(right.joint_names(), move)),
00307 right_queue
00308 )
00309 )
00310 left_thread.daemon = True
00311 right_thread.daemon = True
00312 left_thread.start()
00313 right_thread.start()
00314 baxter_dataflow.wait_for(
00315 lambda: not (left_thread.is_alive() or
00316 right_thread.is_alive()),
00317 timeout=20.0,
00318 timeout_msg=("Timeout while waiting for arm move threads"
00319 " to finish"),
00320 rate=10,
00321 )
00322 left_thread.join()
00323 right_thread.join()
00324 result = left_queue.get()
00325 if not result is None:
00326 raise left_queue.get()
00327 result = right_queue.get()
00328 if not result is None:
00329 raise right_queue.get()
00330 rospy.sleep(1.0)
00331 print "Disabling robot..."
00332 self._rs.disable()
00333 self.result[0] = True
00334 rospy.sleep(5.0)
00335 except Exception:
00336 self.return_failure(traceback.format_exc())
00337
00338
00339 class Grippers(SmokeTest):
00340 """
00341 Calibrate and move grippers using torque, position, velocity control.
00342 """
00343 def __init__(self, name='Grippers'):
00344 super(Grippers, self).__init__(name)
00345
00346 def start_test(self):
00347 """
00348 Runs Grippers Smoke Test.
00349 """
00350 try:
00351 print "Enabling robot, Moving to Neutral Location..."
00352 self._rs.enable()
00353 for name in ['left', 'right']:
00354 limb = baxter_interface.Limb(name)
00355 gripper = baxter_interface.Gripper(name, CHECK_VERSION)
00356 limb.move_to_neutral()
00357 rospy.sleep(2.0)
00358 print "Test: Verify %s Gripper Type" % (name.capitalize(),)
00359 if gripper.type() is not 'electric':
00360 raise NameError("Test Requires Two Electric Grippers")
00361 s_topic = 'robot/end_effector/' + name + '_gripper/state'
00362 ee_state = rospy.wait_for_message(s_topic,
00363 EndEffectorState,
00364 5.0
00365 )
00366 print "Test: Reboot %s Gripper" % (name.capitalize(),)
00367 gripper.reboot()
00368 print "Test: Calibrating %s Gripper" % (name.capitalize(),)
00369 gripper.calibrate()
00370 print "Test: Close %s Gripper" % (name.capitalize(),)
00371 gripper.close(True)
00372 print "Test: Open %s Gripper" % (name.capitalize(),)
00373 gripper.open(True)
00374 print "Test: Close %s Gripper" % (name.capitalize(),)
00375 gripper.close(True)
00376 print "Test: Open %s Gripper" % (name.capitalize(),)
00377 gripper.open(True)
00378 print "Test: %s Gripper Position Moves" % (name.capitalize(),)
00379 gripper.command_position(50.0, True)
00380 gripper.command_position(75.0, True)
00381 gripper.command_position(0.0, True)
00382 gripper.command_position(100.0, True)
00383 print "Test: %s Gripper Velocity Moves" % (name.capitalize(),)
00384 gripper.set_moving_force(100.0)
00385 gripper.set_velocity(50.0)
00386 gripper.close(True)
00387 gripper.set_velocity(25.0)
00388 gripper.open(True)
00389 gripper.set_velocity(100.0)
00390 gripper.close(True)
00391 gripper.open(True)
00392 gripper.set_velocity(50.0)
00393 gripper.set_moving_force(30.0)
00394 gripper.close(True)
00395 gripper.open(True)
00396 print "Disabling robot..."
00397 self._rs.disable()
00398 self.result[0] = True
00399 except Exception:
00400 self.return_failure(traceback.format_exc())
00401
00402
00403 class BlinkLEDs(SmokeTest):
00404 """
00405 Blink Navigator LEDs.
00406 """
00407 def __init__(self, name='BlinkLEDs'):
00408 super(BlinkLEDs, self).__init__(name)
00409
00410 def start_test(self):
00411 """
00412 Runs BlinkLEDs Smoke Test.
00413 """
00414
00415 def _blink(io):
00416 """
00417 Toggle itb lights.
00418 """
00419 for i in range(9):
00420 io.set_output(i % 2)
00421 rospy.sleep(0.1)
00422
00423 try:
00424 itb_names = (
00425 'left_itb_light_outer',
00426 'left_itb_light_inner',
00427 'torso_left_itb_light_outer',
00428 'torso_left_itb_light_inner',
00429 'right_itb_light_outer',
00430 'right_itb_light_inner',
00431 'torso_right_itb_light_outer',
00432 'torso_right_itb_light_inner',
00433 )
00434
00435 for itb in itb_names:
00436 print "Test: Blink %s" % itb
00437 io = baxter_interface.DigitalIO(itb)
00438 _blink(io)
00439 self.result[0] = True
00440 except Exception:
00441 self.return_failure(traceback.format_exc())
00442
00443
00444 class Cameras(SmokeTest):
00445 """
00446 Verify camera publishing and visualization.
00447 """
00448 def __init__(self, name='Cameras'):
00449 super(Cameras, self).__init__(name)
00450
00451 def start_test(self):
00452 """Runs Cameras Smoke Test
00453 """
00454
00455 xpub_img = rospy.Publisher(
00456 '/robot/xdisplay',
00457 Image,
00458 queue_size=10
00459 )
00460
00461 def _display(camera, name):
00462 """
00463 Open camera and display to screen for 10 seconds
00464 """
00465 camera.resolution = (960, 600,)
00466 print "Test: Opening %s" % name
00467 camera.open()
00468 print "Test: Display %s to Screen - 10 Seconds" % name
00469 rospy.Subscriber(
00470 '/cameras/' + name + "/image",
00471 Image,
00472 _repub_cb
00473 )
00474 rospy.sleep(10.0)
00475 camera.close()
00476 _reset_screen()
00477
00478 def _repub_cb(msg):
00479 """
00480 Camera image republish callback.
00481 """
00482 xpub_img.publish(msg)
00483
00484 def _reset_screen():
00485 """
00486 Reset the screen to research sdk image.
00487 """
00488 image_path = self._rp.get_path('baxter_tools')
00489 image_path += '/share/images/researchsdk.png'
00490 img = cv2.imread(image_path)
00491 msg = cv_bridge.CvBridge().cv2_to_imgmsg(img)
00492 xpub_img.publish(msg)
00493
00494 def _list_cameras():
00495 """
00496 List the cameras with power from ROS Service call.
00497 """
00498 srv = "/cameras/list"
00499 rospy.wait_for_service(srv, 5.0)
00500 camera_list_srv = rospy.ServiceProxy(srv, ListCameras)
00501 return camera_list_srv()
00502
00503 def _reset_defaults():
00504 """
00505 Turn on the left/right_hand_cameras with default settings.
00506 """
00507 print "Restarting the Default Cameras..."
00508 camera_list = _list_cameras()
00509 for camera_name in camera_names:
00510 print "Restarting {0}".format(camera_name)
00511 if not camera_name in camera_list.cameras:
00512
00513
00514
00515 baxter_interface.CameraController(camera_list.cameras[0]).close()
00516 camera_list = _list_cameras()
00517 camera = baxter_interface.CameraController(camera_name)
00518 camera.resolution = (640, 400,)
00519 camera.fps = 25
00520 camera.open()
00521 camera.close()
00522
00523 try:
00524 print "Enabling robot..."
00525 self._rs.enable()
00526 print ("Test: Verify Left_Hand, Right_Hand, and Head "
00527 "Cameras Present")
00528 camera_names = (
00529 'left_hand_camera',
00530 'right_hand_camera',
00531 'head_camera',
00532 )
00533 camera_list = _list_cameras()
00534 for camera_name in camera_names:
00535 if not camera_name in camera_list.cameras:
00536 baxter_interface.CameraController(camera_list.cameras[0]).close()
00537 camera_list = _list_cameras()
00538 if not camera_name in camera_list.cameras:
00539 raise NameError("Could not find camera - %s" % camera_name)
00540 camera = baxter_interface.CameraController(camera_name)
00541 _display(camera, camera_name)
00542 camera_list = _list_cameras()
00543 _reset_defaults()
00544 print "Disabling robot..."
00545 self._rs.disable()
00546 self.result[0] = True
00547 except Exception:
00548 self.return_failure(traceback.format_exc())