detection_component_testing.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 #
00003 # Copyright 2017 Fraunhofer Institute for Manufacturing Engineering and Automation (IPA)
00004 #
00005 # Licensed under the Apache License, Version 2.0 (the "License");
00006 # you may not use this file except in compliance with the License.
00007 # You may obtain a copy of the License at
00008 #
00009 #   http://www.apache.org/licenses/LICENSE-2.0
00010 #
00011 # Unless required by applicable law or agreed to in writing, software
00012 # distributed under the License is distributed on an "AS IS" BASIS,
00013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00014 # See the License for the specific language governing permissions and
00015 # limitations under the License.
00016 
00017 
00018 import os
00019 import sys, subprocess
00020 import time
00021 import math
00022 from math import fmod, pi
00023 import signal
00024 import types
00025 import unittest
00026 
00027 import roslib
00028 roslib.load_manifest("cob_object_detection_msgs")
00029 
00030 import rospy
00031 
00032 import rostest
00033 import rosparam
00034 
00035 from cob_object_detection_msgs.msg import *
00036 from cob_object_detection_msgs.srv import *
00037 from std_msgs.msg import String
00038 
00039 from std_srvs.srv import Empty
00040 
00041 import rosbag
00042 
00043 from multiprocessing import Process
00044 
00045 import yaml
00046 
00047 class TestObjectDetection(unittest.TestCase):
00048          
00049     
00050     def tearDown(self):
00051 
00052         os.system("killall play")
00053     
00054     def setUp(self):
00055 
00056         # positions and tolerance for comparing with the acquired data from the simulation or from the rosbag
00057 
00058                 self.bagfiles = rospy.get_param("test_bag")
00059                 self.PKG = rospy.get_param("PKG")
00060                 self.mode = rospy.get_param("mode")
00061 
00062                 self.bags = self.get_bags(self.bagfiles)
00063                 self.tolerance = 0.5
00064                 self.objID = None
00065 
00066     def updateTolerance(self, bag=0, chunk=0):
00067 
00068                 if("tolerance" in chunk):
00069                         self.tolerance = chunk['tolerance']
00070 
00071                 elif("tolerance" in bag):
00072                         self.tolerance = bag['tolerance']
00073 
00074                 elif rospy.has_param('tolerance'):
00075 
00076                         self.tolerance = rospy.get_param('tolerance')
00077 
00078 ###############################################################################
00079     ## Alternative function for launching the bagfiles
00080     def playback_bag(self, bagPath):
00081         os.system('rosbag play -d 5 -k --clock %s' % bagPath)
00082                   
00083     def get_bags(self, bagfiles):
00084  
00085                 bags = []
00086         
00087                 for item in bagfiles:
00088                         bags.append(item)
00089                 return bags
00090 
00091     def getNumberofObjects(self,inBag):
00092 
00093                 self.objects = inBag['objects']
00094                 if isinstance(self.objects, types.NoneType):
00095                         objectsQTY = 0
00096                 else:
00097                         objectsQTY = len(self.objects)
00098 
00099                 return objectsQTY
00100 
00101     def getLabel(self, index):
00102 
00103                 self.objID = self.objects[index]['label']
00104 
00105     def process_objects(self, index):
00106 
00107                 posX = (float)(self.objects[index]['position'][0])
00108                 posY = (float)(self.objects[index]['position'][1])
00109                 posZ = (float)(self.objects[index]['position'][2])
00110 
00111                 return posX, posY, posZ
00112 
00113     def object_detector(self):
00114 
00115         for i in range(len(self.bags)):
00116                         
00117                         bagPath = roslib.packages.get_pkg_dir(self.PKG) + self.bags[i]['bag_path']
00118                         yamlPath = roslib.packages.get_pkg_dir(self.PKG) + self.bags[i]['yaml_path']
00119 #                       if(i==1):
00120 #                               raise rospy.exceptions.ROSException("COB3 - has found no object!!%s"%bagPath, yamlPath)
00121                         inBag = yaml.load(open(yamlPath).read())
00122                         
00123                         
00124          # Wait the initialization of the Object Detection Service    
00125                         if(self.mode == "delay"):
00126                                 rosbag_process = subprocess.Popen("rosbag play -d 2 -k --clock %s" % bagPath, shell=True)
00127                         else:
00128                                 rosbag_process = subprocess.Popen("rosbag play --clock %s" % bagPath, shell=True)
00129                         ##out = rosbag_process.communicate()
00130 
00131                         rospy.wait_for_service('/object_detection/detect_object', 10)
00132                         
00133                 # Alternative bagfile launching
00134                         #bag_playback = Process(target=self.playback_bag, args=(bagPath,))
00135                         #bag_playback.start() 
00136                         
00137                         
00138                         try:
00139                                 objQTY = self.getNumberofObjects(inBag)
00140                                 
00141                                 for t in range(objQTY):
00142 
00143                                         recognition_service = rospy.ServiceProxy('/object_detection/detect_object', DetectObjects)
00144 
00145                                         req = DetectObjectsRequest()
00146 
00147                                         self.getLabel(t)
00148                                         req.object_name = String(self.objID)
00149                                                 
00150                         # Definition of the Region of Interest
00151                                         req.roi.x_offset = 0;
00152                                         req.roi.y_offset = 0;
00153                                         req.roi.width = 0;
00154                                         req.roi.height = 0;
00155                                         
00156                                         res = recognition_service(req)
00157                                         
00158                                         [posX, posY, posZ] =  self.process_objects(t)
00159 
00160                                         self.updateTolerance(bag = self.bags[i], chunk = self.objects[t])
00161 
00162                                         addInfo = "Object type: " + self.objID + ", path: " + bagPath + ", index: " + (str)(t) + "  " + (str)(posX)
00163 
00164                                         if(len(res.object_list.detections) > 0):
00165                                         
00166                             # Get the Cartesian Coordinates positions for the detected objects
00167                                                 
00168                                                 for i in range(len(res.object_list.detections)):
00169 
00170                                                         positionX = res.object_list.detections[i].pose.pose.position.x
00171                                                         positionY = res.object_list.detections[i].pose.pose.position.y
00172                                                         positionZ = res.object_list.detections[i].pose.pose.position.z
00173 
00174                                                 # Test assertions for guaranteeing the correct loading of the Object Detection System Component
00175                                                         self.assertTrue(abs(positionX - posX) <= self.tolerance, "Failed on the x axis comparison%s"%addInfo)
00176                                                         self.assertTrue(abs(positionY - posY) <= self.tolerance, "Failed on the y axis comparison%s"%addInfo)
00177                                                         self.assertTrue(abs(positionZ - posZ) <= self.tolerance, "Failed on the z axis comparison%s"%addInfo)
00178 
00179                                         self.assertTrue(objQTY == len(res.object_list.detections), "Number of objects in the Bagfiles are not equal to number of objects found%s"%addInfo)
00180 
00181                                 
00182                         except rospy.ServiceException, e:
00183                                 raise rospy.exceptions.ROSException("Service not available!!%s"%e)
00184        
00185                         rosbag_process.send_signal(signal.SIGINT)
00186                         os.kill(rosbag_process.pid, signal.SIGKILL)
00187                         os.system("killall play")
00188 
00189                 # Alternative bagfile launching
00190 
00191                         #if bag_playback.is_alive():
00192                         #       rospy.loginfo('terminating playback process')
00193                         #       bag_playback.terminate()
00194                         #       time.sleep(1)
00195                         #       rospy.loginfo('playback process terminated? %s' % str(not bag_playback.is_alive()))
00196 
00197     def test_object_detection(self):
00198         
00199                 self.object_detector()
00200         
00201         
00202 # Main Function for the tests run 
00203 if __name__ == '__main__':
00204 
00205         rospy.init_node('test', anonymous=True)
00206         rostest.rosrun('cob_object_detection_msgs', 'Diagnostics',
00207               TestObjectDetection, sys.argv)


cob_object_detection_msgs
Author(s): Florian Weisshardt
autogenerated on Fri Mar 15 2019 03:10:07