smoketests.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 
00003 # Copyright (c) 2013-2014, Rethink Robotics
00004 # All rights reserved.
00005 #
00006 # Redistribution and use in source and binary forms, with or without
00007 # modification, are permitted provided that the following conditions are met:
00008 #
00009 # 1. Redistributions of source code must retain the above copyright notice,
00010 #    this list of conditions and the following disclaimer.
00011 # 2. Redistributions in binary form must reproduce the above copyright
00012 #    notice, this list of conditions and the following disclaimer in the
00013 #    documentation and/or other materials provided with the distribution.
00014 # 3. Neither the name of the Rethink Robotics nor the names of its
00015 #    contributors may be used to endorse or promote products derived from
00016 #    this software without specific prior written permission.
00017 #
00018 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00019 # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00020 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00021 # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
00022 # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00023 # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00024 # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00025 # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00026 # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00027 # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00028 # POSSIBILITY OF SUCH DAMAGE.
00029 
00030 """
00031 Baxter RSDK Smoke Tests
00032 """
00033 
00034 import traceback
00035 import threading
00036 import Queue
00037 
00038 import rospy
00039 
00040 import cv
00041 import cv_bridge
00042 import rospkg
00043 import std_msgs
00044 
00045 from geometry_msgs.msg import (
00046     Point,
00047     Pose,
00048     PoseStamped,
00049     Quaternion,
00050 )
00051 from sensor_msgs.msg import (
00052     Image,
00053     JointState,
00054 )
00055 
00056 import baxter_dataflow
00057 import baxter_interface
00058 
00059 from baxter_core_msgs.msg import (
00060     AnalogIOStates,
00061     EndEffectorState,
00062 )
00063 from baxter_core_msgs.srv import (
00064     ListCameras,
00065     SolvePositionIK,
00066     SolvePositionIKRequest,
00067 )
00068 from baxter_interface import CHECK_VERSION
00069 
00070 
00071 class SmokeTest(object):
00072     """
00073     Smoke tests base class.
00074     """
00075     def __init__(self, name):
00076         self._name = name
00077         self._rs = baxter_interface.RobotEnable()
00078         self._rp = rospkg.RosPack()
00079         self.result = [False, '']
00080 
00081     def start_test(self):
00082         """
00083         Set up pub/sub/actions and run test.
00084 
00085         Results return in form [Bool Success,'Stacktrace if Failure']
00086         """
00087         raise NotImplementedError()
00088 
00089     def finish_test(self, filename):
00090         """
00091         Store results to output file.
00092         """
00093 
00094         with open(filename, "a") as testlog:
00095             testlog.write(
00096                 "Test: %s ------ Result: %s\n" %
00097                 (self._name, "Success" if self.result[0] else "Failure",)
00098                 )
00099             if self.result[0] == False:
00100                 testlog.write("%s\n%s%s\n" %
00101                     ('*' * 40, self.result[1], '*' * 40,)
00102                     )
00103         print "Results of %s saved to file: %s" % (self._name, filename,)
00104         print ("------- Result: %s -------\n\n" %
00105                ("Success" if self.result[0] else "Failure",))
00106 
00107     def return_failure(self, trace):
00108         """Commonly used failure return
00109         """
00110         self.result[0] = False
00111         self.result[1] = trace
00112         try:
00113             self._rs.disable()
00114         except Exception:
00115             pass
00116 
00117 
00118 class Enable(SmokeTest):
00119     """
00120     Verify ability to enable, check state and disable baxter.
00121     """
00122     def __init__(self, name='Enable'):
00123         super(Enable, self).__init__(name)
00124 
00125     def start_test(self):
00126         """Runs Enable Smoke Test
00127         """
00128         try:
00129             print "Test: Enable - Zero G for 10 Seconds"
00130             self._rs.enable()
00131             # Allow User Zero G Testing
00132             rospy.sleep(10.0)
00133             print "Test: State"
00134             print self._rs.state()
00135             print "Test: Disable"
00136             self._rs.disable()
00137             self.result[0] = True
00138         except Exception:
00139             self.return_failure(traceback.format_exc())
00140 
00141 
00142 class Messages(SmokeTest):
00143     """
00144     Verify messages being published and ability to subscribe.
00145     """
00146     def __init__(self, name='Messages'):
00147         super(Messages, self).__init__(name)
00148 
00149     def start_test(self):
00150         """
00151         Runs Messages Smoke Test.
00152         """
00153         try:
00154             msg = 'robot/joint_states'
00155             print "Test: Subscribe to topic: /%s" % msg
00156             rospy.wait_for_message(msg, JointState, 5.0)
00157             msg = 'robot/analog_io_states'
00158             print "Test: Subscribe to topic: /%s" % msg
00159             rospy.wait_for_message(
00160                 msg,
00161                 AnalogIOStates,
00162                 5.0
00163             )
00164             self.result[0] = True
00165         except:
00166             self.return_failure(traceback.format_exc())
00167 
00168 
00169 class Services(SmokeTest):
00170     """
00171     Verify services available and ability to make calls as client.
00172     """
00173     def __init__(self, name='Services'):
00174         super(Services, self).__init__(name)
00175 
00176     def start_test(self):
00177         """Runs Services Smoke Test
00178         """
00179         try:
00180             srv = '/ExternalTools/left/PositionKinematicsNode/IKService'
00181             print "Test: Service availability: %s" % srv
00182             rospy.wait_for_service(srv, 5.0)
00183             print "Test: IK service call Solve IK"
00184             iksvc = rospy.ServiceProxy(srv, SolvePositionIK)
00185             ikreq = SolvePositionIKRequest()
00186             pose = PoseStamped(
00187                 header=std_msgs.msg.Header(
00188                 stamp=rospy.Time.now(), frame_id='base'),
00189                 pose=Pose(
00190                     position=Point(
00191                         x=0.657579481614,
00192                         y=0.851981417433,
00193                         z=0.0388352386502,
00194                         ),
00195                     orientation=Quaternion(
00196                         x=-0.366894936773,
00197                         y=0.885980397775,
00198                         z=0.108155782462,
00199                         w=0.262162481772,
00200                     ),
00201                 )
00202             )
00203             ikreq.pose_stamp.append(pose)
00204             iksvc(ikreq)
00205             self.result[0] = True
00206         except Exception:
00207             self.return_failure(traceback.format_exc())
00208 
00209 
00210 class Head(SmokeTest):
00211     """
00212     Move the head pan and tilt, display image to screen.
00213     """
00214     def __init__(self, name='Head'):
00215         super(Head, self).__init__(name)
00216 
00217     def start_test(self):
00218         """
00219         Runs Head Smoke Test.
00220         """
00221         try:
00222             print "Enabling robot..."
00223             self._rs.enable()
00224             print "Test: Moving Head to Neutral Location"
00225             head = baxter_interface.Head()
00226             head.set_pan(0.0, 5.0)
00227             print "Test: Pan Head"
00228             head.set_pan(1.0, 5.0)
00229             head.set_pan(-1.0, 5.0)
00230             head.set_pan(0.0, 5.0)
00231             print "Test: Nod Head"
00232             for _ in xrange(3):
00233                 head.command_nod()
00234             print "Test: Display Image on Screen - 5 seconds"
00235             image_path = (self._rp.get_path('baxter_tools') +
00236                           '/share/images')
00237             img = cv.LoadImage(image_path + '/baxterworking.png')
00238             msg = cv_bridge.CvBridge().cv_to_imgmsg(img)
00239             pub = rospy.Publisher('/robot/xdisplay', Image, latch=True)
00240             pub.publish(msg)
00241             rospy.sleep(5.0)
00242             img = cv.LoadImage(image_path + '/researchsdk.png')
00243             msg = cv_bridge.CvBridge().cv_to_imgmsg(img)
00244             pub.publish(msg)
00245             print "Disabling robot..."
00246             self._rs.disable()
00247             self.result[0] = True
00248         except Exception:
00249             self.return_failure(traceback.format_exc())
00250 
00251 
00252 class MoveArms(SmokeTest):
00253     """
00254     Move both arms to numerous joint positions.
00255     """
00256     def __init__(self, name='MoveArms'):
00257         super(MoveArms, self).__init__(name)
00258 
00259     def start_test(self):
00260         """Runs MoveArms Smoke Test.
00261         """
00262 
00263         def move_thread(limb, angle, queue, timeout=15.0):
00264             """
00265             Threaded joint movement allowing for simultaneous joint moves.
00266             """
00267             try:
00268                 limb.move_to_joint_positions(angle, timeout)
00269                 queue.put(None)
00270             except Exception, exception:
00271                 queue.put(traceback.format_exc())
00272                 queue.put(exception)
00273 
00274         try:
00275             print "Enabling robot..."
00276             self._rs.enable()
00277             print "Test: Create Limb Instances"
00278             right = baxter_interface.Limb('right')
00279             left = baxter_interface.Limb('left')
00280             left_queue = Queue.Queue()
00281             right_queue = Queue.Queue()
00282             # Max Joint Range (s0, s1, e0, e1, w0, w1, w2)
00283             #     ( 1.701,  1.047,  3.054,  2.618,  3.059,  2.094,  3.059)
00284             # Min Joint Range (e0, e1, s0, s1, w0, w1, w2)
00285             #     (-1.701, -2.147, -3.054, -0.050, -3.059, -1.571, -3.059)
00286             joint_moves = (
00287                  [0.0, -0.55, 0.0, 0.75, 0.0, 1.26, 0.0],
00288                  [0.5, -0.8, 2.8, 0.15, 0.0, 1.9, 2.8],
00289                 [-0.1, -0.8, -1.0, 2.5, 0.0, -1.4, -2.8],
00290                  [0.0, -0.55, 0.0, 0.75, 0.0, 1.26, 0.0],
00291                 )
00292             for move in joint_moves:
00293                 print "Test: Moving to Joint Positions: ",
00294                 print ", ".join("%.2f" % x for x in move)
00295                 left_thread = threading.Thread(
00296                     target=move_thread,
00297                     args=(left,
00298                           dict(zip(left.joint_names(), move)),
00299                           left_queue
00300                           )
00301                 )
00302                 right_thread = threading.Thread(
00303                     target=move_thread,
00304                     args=(right,
00305                           dict(zip(right.joint_names(), move)),
00306                           right_queue
00307                           )
00308                 )
00309                 left_thread.daemon = True
00310                 right_thread.daemon = True
00311                 left_thread.start()
00312                 right_thread.start()
00313                 baxter_dataflow.wait_for(
00314                     lambda: not (left_thread.is_alive() or
00315                                  right_thread.is_alive()),
00316                     timeout=20.0,
00317                     timeout_msg=("Timeout while waiting for arm move threads"
00318                                  " to finish"),
00319                     rate=10,
00320                 )
00321                 left_thread.join()
00322                 right_thread.join()
00323                 result = left_queue.get()
00324                 if not result is None:
00325                     raise left_queue.get()
00326                 result = right_queue.get()
00327                 if not result is None:
00328                     raise right_queue.get()
00329                 rospy.sleep(1.0)
00330             print "Disabling robot..."
00331             self._rs.disable()
00332             self.result[0] = True
00333             rospy.sleep(5.0)
00334         except Exception:
00335             self.return_failure(traceback.format_exc())
00336 
00337 
00338 class Grippers(SmokeTest):
00339     """
00340     Calibrate and move grippers using torque, position, velocity control.
00341     """
00342     def __init__(self, name='Grippers'):
00343         super(Grippers, self).__init__(name)
00344 
00345     def start_test(self):
00346         """
00347         Runs Grippers Smoke Test.
00348         """
00349         try:
00350             print "Enabling robot, Moving to Neutral Location..."
00351             self._rs.enable()
00352             for name in ['left', 'right']:
00353                 limb = baxter_interface.Limb(name)
00354                 gripper = baxter_interface.Gripper(name, CHECK_VERSION)
00355                 limb.move_to_neutral()
00356                 rospy.sleep(2.0)
00357                 print "Test: Verify %s Gripper Type" % (name.capitalize(),)
00358                 if gripper.type() is not 'electric':
00359                     raise NameError("Test Requires Two Electric Grippers")
00360                 s_topic = 'robot/end_effector/' + name + '_gripper/state'
00361                 ee_state = rospy.wait_for_message(s_topic,
00362                                                   EndEffectorState,
00363                                                   5.0
00364                                                   )
00365                 print "Test: Reboot %s Gripper" % (name.capitalize(),)
00366                 gripper.reboot()
00367                 print "Test: Calibrating %s Gripper" % (name.capitalize(),)
00368                 gripper.calibrate()
00369                 print "Test: Close %s Gripper" % (name.capitalize(),)
00370                 gripper.close(True)
00371                 print "Test: Open %s Gripper" % (name.capitalize(),)
00372                 gripper.open(True)
00373                 print "Test: Close %s Gripper" % (name.capitalize(),)
00374                 gripper.close(True)
00375                 print "Test: Open %s Gripper" % (name.capitalize(),)
00376                 gripper.open(True)
00377                 print "Test: %s Gripper Position Moves" % (name.capitalize(),)
00378                 gripper.command_position(50.0, True)
00379                 gripper.command_position(75.0, True)
00380                 gripper.command_position(0.0, True)
00381                 gripper.command_position(100.0, True)
00382                 print "Test: %s Gripper Velocity Moves" % (name.capitalize(),)
00383                 gripper.set_moving_force(100.0)
00384                 gripper.set_velocity(50.0)
00385                 gripper.close(True)
00386                 gripper.set_velocity(25.0)
00387                 gripper.open(True)
00388                 gripper.set_velocity(100.0)
00389                 gripper.close(True)
00390                 gripper.open(True)
00391                 gripper.set_velocity(50.0)
00392                 gripper.set_moving_force(30.0)
00393                 gripper.close(True)
00394                 gripper.open(True)
00395             print "Disabling robot..."
00396             self._rs.disable()
00397             self.result[0] = True
00398         except Exception:
00399             self.return_failure(traceback.format_exc())
00400 
00401 
00402 class BlinkLEDs(SmokeTest):
00403     """
00404     Blink Navigator LEDs.
00405     """
00406     def __init__(self, name='BlinkLEDs'):
00407         super(BlinkLEDs, self).__init__(name)
00408 
00409     def start_test(self):
00410         """
00411         Runs BlinkLEDs Smoke Test.
00412         """
00413 
00414         def _blink(io):
00415             """
00416             Toggle itb lights.
00417             """
00418             for i in range(9):
00419                 io.set_output(i % 2)
00420                 rospy.sleep(0.1)
00421 
00422         try:
00423             itb_names = (
00424                 'left_itb_light_outer',
00425                 'left_itb_light_inner',
00426                 'torso_left_itb_light_outer',
00427                 'torso_left_itb_light_inner',
00428                 'right_itb_light_outer',
00429                 'right_itb_light_inner',
00430                 'torso_right_itb_light_outer',
00431                 'torso_right_itb_light_inner',
00432                 )
00433 
00434             for itb in itb_names:
00435                 print "Test: Blink %s" % itb
00436                 io = baxter_interface.DigitalIO(itb)
00437                 _blink(io)
00438             self.result[0] = True
00439         except Exception:
00440             self.return_failure(traceback.format_exc())
00441 
00442 
00443 class Cameras(SmokeTest):
00444     """
00445     Verify camera publishing and visualization.
00446     """
00447     def __init__(self, name='Cameras'):
00448         super(Cameras, self).__init__(name)
00449 
00450     def start_test(self):
00451         """Runs Cameras Smoke Test
00452         """
00453 
00454         xpub_img = rospy.Publisher(
00455             '/robot/xdisplay',
00456             Image
00457         )
00458 
00459         def _display(camera, name):
00460             """
00461             Open camera and display to screen for 10 seconds
00462             """
00463             camera.close()
00464             camera.resolution = (960, 600,)
00465             print "Test: Opening %s" % name
00466             camera.open()
00467             print "Test: Display %s to Screen - 10 Seconds" % name
00468             rospy.Subscriber(
00469                 '/cameras/' + name + "/image",
00470                 Image,
00471                 _repub_cb
00472             )
00473             rospy.sleep(10.0)
00474             camera.close()
00475             _reset_screen()
00476 
00477         def _repub_cb(msg):
00478             """
00479             Camera image republish callback.
00480             """
00481             xpub_img.publish(msg)
00482 
00483         def _reset_screen():
00484             """
00485             Reset the screen to research sdk image.
00486             """
00487             image_path = self._rp.get_path('baxter_tools')
00488             image_path += '/share/images/researchsdk.png'
00489             img = cv.LoadImage(image_path)
00490             msg = cv_bridge.CvBridge().cv_to_imgmsg(img)
00491             xpub_img.publish(msg)
00492 
00493         def _reset_defaults():
00494             """
00495             Turn on the left/right_hand_cameras with default settings.
00496             """
00497             print "Restarting the Default Cameras..."
00498             for i in range(1, 3):
00499                 camera = baxter_interface.CameraController(camera_names[i])
00500                 camera.resolution = (640, 400,)
00501                 camera.fps = 25
00502                 camera.open()
00503 
00504         try:
00505             print "Enabling robot..."
00506             self._rs.enable()
00507             print ("Test: Verify Left_Hand, Right_Hand, and Head "
00508                    "Cameras Present")
00509             camera_names = (
00510                 'head_camera',
00511                 'left_hand_camera',
00512                 'right_hand_camera',
00513                 )
00514             srv = "/cameras/list"
00515             rospy.wait_for_service(srv, 5.0)
00516             camera_list_srv = rospy.ServiceProxy(srv, ListCameras)
00517             camera_list = camera_list_srv()
00518             for camera_name in camera_names:
00519                 if not camera_name in camera_list.cameras:
00520                     raise NameError("Could not find camera - %s" % camera_name)
00521                 else:
00522                     camera = baxter_interface.CameraController(camera_name)
00523                     camera.close()
00524             for camera_name in camera_names:
00525                 camera = baxter_interface.CameraController(camera_name)
00526                 _display(camera, camera_name)
00527             _reset_defaults()
00528             print "Disabling robot..."
00529             self._rs.disable()
00530             self.result[0] = True
00531         except Exception:
00532             self.return_failure(traceback.format_exc())


baxter_tools
Author(s): Rethink Robotics Inc.
autogenerated on Sun Oct 5 2014 22:29:27