smoketests.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 
00003 # Copyright (c) 2013-2015, Rethink Robotics
00004 # All rights reserved.
00005 #
00006 # Redistribution and use in source and binary forms, with or without
00007 # modification, are permitted provided that the following conditions are met:
00008 #
00009 # 1. Redistributions of source code must retain the above copyright notice,
00010 #    this list of conditions and the following disclaimer.
00011 # 2. Redistributions in binary form must reproduce the above copyright
00012 #    notice, this list of conditions and the following disclaimer in the
00013 #    documentation and/or other materials provided with the distribution.
00014 # 3. Neither the name of the Rethink Robotics nor the names of its
00015 #    contributors may be used to endorse or promote products derived from
00016 #    this software without specific prior written permission.
00017 #
00018 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00019 # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00020 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00021 # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
00022 # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00023 # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00024 # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00025 # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00026 # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00027 # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00028 # POSSIBILITY OF SUCH DAMAGE.
00029 
00030 """
00031 Baxter RSDK Smoke Tests
00032 """
00033 
00034 import traceback
00035 import threading
00036 import Queue
00037 
00038 import rospy
00039 
00040 import cv2
00041 import cv_bridge
00042 import rospkg
00043 import std_msgs
00044 
00045 from geometry_msgs.msg import (
00046     Point,
00047     Pose,
00048     PoseStamped,
00049     Quaternion,
00050 )
00051 from sensor_msgs.msg import (
00052     Image,
00053     JointState,
00054 )
00055 
00056 import baxter_dataflow
00057 import baxter_interface
00058 
00059 from baxter_core_msgs.msg import (
00060     AnalogIOStates,
00061     EndEffectorState,
00062 )
00063 from baxter_core_msgs.srv import (
00064     ListCameras,
00065     SolvePositionIK,
00066     SolvePositionIKRequest,
00067 )
00068 from baxter_interface import CHECK_VERSION
00069 
00070 
00071 class SmokeTest(object):
00072     """
00073     Smoke tests base class.
00074     """
00075     def __init__(self, name):
00076         self._name = name
00077         self._rs = baxter_interface.RobotEnable()
00078         self._rp = rospkg.RosPack()
00079         self.result = [False, '']
00080 
00081     def start_test(self):
00082         """
00083         Set up pub/sub/actions and run test.
00084 
00085         Results return in form [Bool Success,'Stacktrace if Failure']
00086         """
00087         raise NotImplementedError()
00088 
00089     def finish_test(self, filename):
00090         """
00091         Store results to output file.
00092         """
00093 
00094         with open(filename, "a") as testlog:
00095             testlog.write(
00096                 "Test: %s ------ Result: %s\n" %
00097                 (self._name, "Success" if self.result[0] else "Failure",)
00098                 )
00099             if self.result[0] == False:
00100                 testlog.write("%s\n%s%s\n" %
00101                     ('*' * 40, self.result[1], '*' * 40,)
00102                     )
00103         print "Results of %s saved to file: %s" % (self._name, filename,)
00104         print ("------- Result: %s -------\n\n" %
00105                ("Success" if self.result[0] else "Failure",))
00106 
00107     def return_failure(self, trace):
00108         """Commonly used failure return
00109         """
00110         self.result[0] = False
00111         self.result[1] = trace
00112         try:
00113             self._rs.disable()
00114         except Exception:
00115             pass
00116 
00117 
00118 class Enable(SmokeTest):
00119     """
00120     Verify ability to enable, check state and disable baxter.
00121     """
00122     def __init__(self, name='Enable'):
00123         super(Enable, self).__init__(name)
00124 
00125     def start_test(self):
00126         """Runs Enable Smoke Test
00127         """
00128         try:
00129             print "Test: Enable - Zero G for 10 Seconds"
00130             self._rs.enable()
00131             # Allow User Zero G Testing
00132             rospy.sleep(10.0)
00133             print "Test: State"
00134             print self._rs.state()
00135             print "Test: Disable"
00136             self._rs.disable()
00137             self.result[0] = True
00138         except Exception:
00139             self.return_failure(traceback.format_exc())
00140 
00141 
00142 class Messages(SmokeTest):
00143     """
00144     Verify messages being published and ability to subscribe.
00145     """
00146     def __init__(self, name='Messages'):
00147         super(Messages, self).__init__(name)
00148 
00149     def start_test(self):
00150         """
00151         Runs Messages Smoke Test.
00152         """
00153         try:
00154             msg = 'robot/joint_states'
00155             print "Test: Subscribe to topic: /%s" % msg
00156             rospy.wait_for_message(msg, JointState, 5.0)
00157             msg = 'robot/analog_io_states'
00158             print "Test: Subscribe to topic: /%s" % msg
00159             rospy.wait_for_message(
00160                 msg,
00161                 AnalogIOStates,
00162                 5.0
00163             )
00164             self.result[0] = True
00165         except:
00166             self.return_failure(traceback.format_exc())
00167 
00168 
00169 class Services(SmokeTest):
00170     """
00171     Verify services available and ability to make calls as client.
00172     """
00173     def __init__(self, name='Services'):
00174         super(Services, self).__init__(name)
00175 
00176     def start_test(self):
00177         """Runs Services Smoke Test
00178         """
00179         try:
00180             srv = '/ExternalTools/left/PositionKinematicsNode/IKService'
00181             print "Test: Service availability: %s" % srv
00182             rospy.wait_for_service(srv, 5.0)
00183             print "Test: IK service call Solve IK"
00184             iksvc = rospy.ServiceProxy(srv, SolvePositionIK)
00185             ikreq = SolvePositionIKRequest()
00186             pose = PoseStamped(
00187                 header=std_msgs.msg.Header(
00188                 stamp=rospy.Time.now(), frame_id='base'),
00189                 pose=Pose(
00190                     position=Point(
00191                         x=0.657579481614,
00192                         y=0.851981417433,
00193                         z=0.0388352386502,
00194                         ),
00195                     orientation=Quaternion(
00196                         x=-0.366894936773,
00197                         y=0.885980397775,
00198                         z=0.108155782462,
00199                         w=0.262162481772,
00200                     ),
00201                 )
00202             )
00203             ikreq.pose_stamp.append(pose)
00204             iksvc(ikreq)
00205             self.result[0] = True
00206         except Exception:
00207             self.return_failure(traceback.format_exc())
00208 
00209 
00210 class Head(SmokeTest):
00211     """
00212     Move the head pan and tilt, display image to screen.
00213     """
00214     def __init__(self, name='Head'):
00215         super(Head, self).__init__(name)
00216 
00217     def start_test(self):
00218         """
00219         Runs Head Smoke Test.
00220         """
00221         try:
00222             print "Enabling robot..."
00223             self._rs.enable()
00224             print "Test: Moving Head to Neutral Location"
00225             head = baxter_interface.Head()
00226             head.set_pan(0.0, 5.0)
00227             print "Test: Pan Head"
00228             head.set_pan(1.0, 5.0)
00229             head.set_pan(-1.0, 5.0)
00230             head.set_pan(0.0, 5.0)
00231             print "Test: Nod Head"
00232             for _ in xrange(3):
00233                 head.command_nod()
00234             print "Test: Display Image on Screen - 5 seconds"
00235             image_path = (self._rp.get_path('baxter_tools') +
00236                           '/share/images')
00237             img = cv2.imread(image_path + '/baxterworking.png')
00238             msg = cv_bridge.CvBridge().cv2_to_imgmsg(img)
00239             pub = rospy.Publisher('/robot/xdisplay', Image,
00240                                   latch=True, queue_size=10)
00241             pub.publish(msg)
00242             rospy.sleep(5.0)
00243             img = cv2.imread(image_path + '/researchsdk.png')
00244             msg = cv_bridge.CvBridge().cv2_to_imgmsg(img)
00245             pub.publish(msg)
00246             print "Disabling robot..."
00247             self._rs.disable()
00248             self.result[0] = True
00249         except Exception:
00250             self.return_failure(traceback.format_exc())
00251 
00252 
00253 class MoveArms(SmokeTest):
00254     """
00255     Move both arms to numerous joint positions.
00256     """
00257     def __init__(self, name='MoveArms'):
00258         super(MoveArms, self).__init__(name)
00259 
00260     def start_test(self):
00261         """Runs MoveArms Smoke Test.
00262         """
00263 
00264         def move_thread(limb, angle, queue, timeout=15.0):
00265             """
00266             Threaded joint movement allowing for simultaneous joint moves.
00267             """
00268             try:
00269                 limb.move_to_joint_positions(angle, timeout)
00270                 queue.put(None)
00271             except Exception, exception:
00272                 queue.put(traceback.format_exc())
00273                 queue.put(exception)
00274 
00275         try:
00276             print "Enabling robot..."
00277             self._rs.enable()
00278             print "Test: Create Limb Instances"
00279             right = baxter_interface.Limb('right')
00280             left = baxter_interface.Limb('left')
00281             left_queue = Queue.Queue()
00282             right_queue = Queue.Queue()
00283             # Max Joint Range (s0, s1, e0, e1, w0, w1, w2)
00284             #     ( 1.701,  1.047,  3.054,  2.618,  3.059,  2.094,  3.059)
00285             # Min Joint Range (e0, e1, s0, s1, w0, w1, w2)
00286             #     (-1.701, -2.147, -3.054, -0.050, -3.059, -1.571, -3.059)
00287             joint_moves = (
00288                  [0.0, -0.55, 0.0, 0.75, 0.0, 1.26, 0.0],
00289                  [0.5, -0.8, 2.8, 0.15, 0.0, 1.9, 2.8],
00290                 [-0.1, -0.8, -1.0, 2.5, 0.0, -1.4, -2.8],
00291                  [0.0, -0.55, 0.0, 0.75, 0.0, 1.26, 0.0],
00292                 )
00293             for move in joint_moves:
00294                 print "Test: Moving to Joint Positions: ",
00295                 print ", ".join("%.2f" % x for x in move)
00296                 left_thread = threading.Thread(
00297                     target=move_thread,
00298                     args=(left,
00299                           dict(zip(left.joint_names(), move)),
00300                           left_queue
00301                           )
00302                 )
00303                 right_thread = threading.Thread(
00304                     target=move_thread,
00305                     args=(right,
00306                           dict(zip(right.joint_names(), move)),
00307                           right_queue
00308                           )
00309                 )
00310                 left_thread.daemon = True
00311                 right_thread.daemon = True
00312                 left_thread.start()
00313                 right_thread.start()
00314                 baxter_dataflow.wait_for(
00315                     lambda: not (left_thread.is_alive() or
00316                                  right_thread.is_alive()),
00317                     timeout=20.0,
00318                     timeout_msg=("Timeout while waiting for arm move threads"
00319                                  " to finish"),
00320                     rate=10,
00321                 )
00322                 left_thread.join()
00323                 right_thread.join()
00324                 result = left_queue.get()
00325                 if not result is None:
00326                     raise left_queue.get()
00327                 result = right_queue.get()
00328                 if not result is None:
00329                     raise right_queue.get()
00330                 rospy.sleep(1.0)
00331             print "Disabling robot..."
00332             self._rs.disable()
00333             self.result[0] = True
00334             rospy.sleep(5.0)
00335         except Exception:
00336             self.return_failure(traceback.format_exc())
00337 
00338 
00339 class Grippers(SmokeTest):
00340     """
00341     Calibrate and move grippers using torque, position, velocity control.
00342     """
00343     def __init__(self, name='Grippers'):
00344         super(Grippers, self).__init__(name)
00345 
00346     def start_test(self):
00347         """
00348         Runs Grippers Smoke Test.
00349         """
00350         try:
00351             print "Enabling robot, Moving to Neutral Location..."
00352             self._rs.enable()
00353             for name in ['left', 'right']:
00354                 limb = baxter_interface.Limb(name)
00355                 gripper = baxter_interface.Gripper(name, CHECK_VERSION)
00356                 limb.move_to_neutral()
00357                 rospy.sleep(2.0)
00358                 print "Test: Verify %s Gripper Type" % (name.capitalize(),)
00359                 if gripper.type() is not 'electric':
00360                     raise NameError("Test Requires Two Electric Grippers")
00361                 s_topic = 'robot/end_effector/' + name + '_gripper/state'
00362                 ee_state = rospy.wait_for_message(s_topic,
00363                                                   EndEffectorState,
00364                                                   5.0
00365                                                   )
00366                 print "Test: Reboot %s Gripper" % (name.capitalize(),)
00367                 gripper.reboot()
00368                 print "Test: Calibrating %s Gripper" % (name.capitalize(),)
00369                 gripper.calibrate()
00370                 print "Test: Close %s Gripper" % (name.capitalize(),)
00371                 gripper.close(True)
00372                 print "Test: Open %s Gripper" % (name.capitalize(),)
00373                 gripper.open(True)
00374                 print "Test: Close %s Gripper" % (name.capitalize(),)
00375                 gripper.close(True)
00376                 print "Test: Open %s Gripper" % (name.capitalize(),)
00377                 gripper.open(True)
00378                 print "Test: %s Gripper Position Moves" % (name.capitalize(),)
00379                 gripper.command_position(50.0, True)
00380                 gripper.command_position(75.0, True)
00381                 gripper.command_position(0.0, True)
00382                 gripper.command_position(100.0, True)
00383                 print "Test: %s Gripper Velocity Moves" % (name.capitalize(),)
00384                 gripper.set_moving_force(100.0)
00385                 gripper.set_velocity(50.0)
00386                 gripper.close(True)
00387                 gripper.set_velocity(25.0)
00388                 gripper.open(True)
00389                 gripper.set_velocity(100.0)
00390                 gripper.close(True)
00391                 gripper.open(True)
00392                 gripper.set_velocity(50.0)
00393                 gripper.set_moving_force(30.0)
00394                 gripper.close(True)
00395                 gripper.open(True)
00396             print "Disabling robot..."
00397             self._rs.disable()
00398             self.result[0] = True
00399         except Exception:
00400             self.return_failure(traceback.format_exc())
00401 
00402 
00403 class BlinkLEDs(SmokeTest):
00404     """
00405     Blink Navigator LEDs.
00406     """
00407     def __init__(self, name='BlinkLEDs'):
00408         super(BlinkLEDs, self).__init__(name)
00409 
00410     def start_test(self):
00411         """
00412         Runs BlinkLEDs Smoke Test.
00413         """
00414 
00415         def _blink(io):
00416             """
00417             Toggle itb lights.
00418             """
00419             for i in range(9):
00420                 io.set_output(i % 2)
00421                 rospy.sleep(0.1)
00422 
00423         try:
00424             itb_names = (
00425                 'left_itb_light_outer',
00426                 'left_itb_light_inner',
00427                 'torso_left_itb_light_outer',
00428                 'torso_left_itb_light_inner',
00429                 'right_itb_light_outer',
00430                 'right_itb_light_inner',
00431                 'torso_right_itb_light_outer',
00432                 'torso_right_itb_light_inner',
00433                 )
00434 
00435             for itb in itb_names:
00436                 print "Test: Blink %s" % itb
00437                 io = baxter_interface.DigitalIO(itb)
00438                 _blink(io)
00439             self.result[0] = True
00440         except Exception:
00441             self.return_failure(traceback.format_exc())
00442 
00443 
00444 class Cameras(SmokeTest):
00445     """
00446     Verify camera publishing and visualization.
00447     """
00448     def __init__(self, name='Cameras'):
00449         super(Cameras, self).__init__(name)
00450 
00451     def start_test(self):
00452         """Runs Cameras Smoke Test
00453         """
00454 
00455         xpub_img = rospy.Publisher(
00456             '/robot/xdisplay',
00457             Image,
00458             queue_size=10
00459             )
00460 
00461         def _display(camera, name):
00462             """
00463             Open camera and display to screen for 10 seconds
00464             """
00465             camera.resolution = (960, 600,)
00466             print "Test: Opening %s" % name
00467             camera.open()
00468             print "Test: Display %s to Screen - 10 Seconds" % name
00469             rospy.Subscriber(
00470                 '/cameras/' + name + "/image",
00471                 Image,
00472                 _repub_cb
00473             )
00474             rospy.sleep(10.0)
00475             camera.close()
00476             _reset_screen()
00477 
00478         def _repub_cb(msg):
00479             """
00480             Camera image republish callback.
00481             """
00482             xpub_img.publish(msg)
00483 
00484         def _reset_screen():
00485             """
00486             Reset the screen to research sdk image.
00487             """
00488             image_path = self._rp.get_path('baxter_tools')
00489             image_path += '/share/images/researchsdk.png'
00490             img = cv2.imread(image_path)
00491             msg = cv_bridge.CvBridge().cv2_to_imgmsg(img)
00492             xpub_img.publish(msg)
00493 
00494         def _list_cameras():
00495             """
00496             List the cameras with power from ROS Service call.
00497             """
00498             srv = "/cameras/list"
00499             rospy.wait_for_service(srv, 5.0)
00500             camera_list_srv = rospy.ServiceProxy(srv, ListCameras)
00501             return camera_list_srv()
00502 
00503         def _reset_defaults():
00504             """
00505             Turn on the left/right_hand_cameras with default settings.
00506             """
00507             print "Restarting the Default Cameras..."
00508             camera_list = _list_cameras()
00509             for camera_name in camera_names:
00510                 print "Restarting {0}".format(camera_name)
00511                 if not camera_name in camera_list.cameras:
00512                     # Attempt to close another camera
00513                     # and list services again
00514                     # (power to this camera was off)
00515                     baxter_interface.CameraController(camera_list.cameras[0]).close()
00516                     camera_list = _list_cameras()
00517                 camera = baxter_interface.CameraController(camera_name)
00518                 camera.resolution = (640, 400,)
00519                 camera.fps = 25
00520                 camera.open()
00521                 camera.close()
00522 
00523         try:
00524             print "Enabling robot..."
00525             self._rs.enable()
00526             print ("Test: Verify Left_Hand, Right_Hand, and Head "
00527                    "Cameras Present")
00528             camera_names = (
00529                 'left_hand_camera',
00530                 'right_hand_camera',
00531                 'head_camera',
00532                 )
00533             camera_list = _list_cameras()
00534             for camera_name in camera_names:
00535                 if not camera_name in camera_list.cameras:
00536                     baxter_interface.CameraController(camera_list.cameras[0]).close()
00537                     camera_list = _list_cameras()
00538                     if not camera_name in camera_list.cameras:
00539                         raise NameError("Could not find camera - %s" % camera_name)
00540                 camera = baxter_interface.CameraController(camera_name)
00541                 _display(camera, camera_name)
00542                 camera_list = _list_cameras()
00543             _reset_defaults()
00544             print "Disabling robot..."
00545             self._rs.disable()
00546             self.result[0] = True
00547         except Exception:
00548             self.return_failure(traceback.format_exc())


baxter_tools
Author(s): Rethink Robotics Inc.
autogenerated on Wed Aug 26 2015 10:50:53