Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059 import os
00060 import sys, subprocess
00061 import time
00062 import math
00063 from math import fmod, pi
00064 import signal
00065 import types
00066 import unittest
00067
00068 import roslib
00069 roslib.load_manifest("cob_object_detection_msgs")
00070
00071 import rospy
00072
00073 import rostest
00074 import rosparam
00075
00076 from cob_object_detection_msgs.msg import *
00077 from cob_object_detection_msgs.srv import *
00078 from std_msgs.msg import String
00079
00080 from std_srvs.srv import Empty
00081
00082 import rosbag
00083
00084 from multiprocessing import Process
00085
00086 import yaml
00087
00088 class TestObjectDetection(unittest.TestCase):
00089
00090
00091 def tearDown(self):
00092
00093 os.system("killall play")
00094
00095 def setUp(self):
00096
00097
00098
00099 self.bagfiles = rospy.get_param("test_bag")
00100 self.PKG = rospy.get_param("PKG")
00101 self.mode = rospy.get_param("mode")
00102
00103 self.bags = self.get_bags(self.bagfiles)
00104 self.tolerance = 0.5
00105 self.objID = None
00106
00107 def updateTolerance(self, bag=0, chunk=0):
00108
00109 if("tolerance" in chunk):
00110 self.tolerance = chunk['tolerance']
00111
00112 elif("tolerance" in bag):
00113 self.tolerance = bag['tolerance']
00114
00115 elif rospy.has_param('tolerance'):
00116
00117 self.tolerance = rospy.get_param('tolerance')
00118
00119
00120
00121 def playback_bag(self, bagPath):
00122 os.system('rosbag play -d 5 -k --clock %s' % bagPath)
00123
00124 def get_bags(self, bagfiles):
00125
00126 bags = []
00127
00128 for item in bagfiles:
00129 bags.append(item)
00130 return bags
00131
00132 def getNumberofObjects(self,inBag):
00133
00134 self.objects = inBag['objects']
00135 if isinstance(self.objects, types.NoneType):
00136 objectsQTY = 0
00137 else:
00138 objectsQTY = len(self.objects)
00139
00140 return objectsQTY
00141
00142 def getLabel(self, index):
00143
00144 self.objID = self.objects[index]['label']
00145
00146 def process_objects(self, index):
00147
00148 posX = (float)(self.objects[index]['position'][0])
00149 posY = (float)(self.objects[index]['position'][1])
00150 posZ = (float)(self.objects[index]['position'][2])
00151
00152 return posX, posY, posZ
00153
00154 def object_detector(self):
00155
00156 for i in range(len(self.bags)):
00157
00158 bagPath = roslib.packages.get_pkg_dir(self.PKG) + self.bags[i]['bag_path']
00159 yamlPath = roslib.packages.get_pkg_dir(self.PKG) + self.bags[i]['yaml_path']
00160
00161
00162 inBag = yaml.load(open(yamlPath).read())
00163
00164
00165
00166 if(self.mode == "delay"):
00167 rosbag_process = subprocess.Popen("rosbag play -d 2 -k --clock %s" % bagPath, shell=True)
00168 else:
00169 rosbag_process = subprocess.Popen("rosbag play --clock %s" % bagPath, shell=True)
00170
00171
00172 rospy.wait_for_service('/object_detection/detect_object', 10)
00173
00174
00175
00176
00177
00178
00179 try:
00180 objQTY = self.getNumberofObjects(inBag)
00181
00182 for t in range(objQTY):
00183
00184 recognition_service = rospy.ServiceProxy('/object_detection/detect_object', DetectObjects)
00185
00186 req = DetectObjectsRequest()
00187
00188 self.getLabel(t)
00189 req.object_name = String(self.objID)
00190
00191
00192 req.roi.x_offset = 0;
00193 req.roi.y_offset = 0;
00194 req.roi.width = 0;
00195 req.roi.height = 0;
00196
00197 res = recognition_service(req)
00198
00199 [posX, posY, posZ] = self.process_objects(t)
00200
00201 self.updateTolerance(bag = self.bags[i], chunk = self.objects[t])
00202
00203 addInfo = "Object type: " + self.objID + ", path: " + bagPath + ", index: " + (str)(t) + " " + (str)(posX)
00204
00205 if(len(res.object_list.detections) > 0):
00206
00207
00208
00209 for i in range(len(res.object_list.detections)):
00210
00211 positionX = res.object_list.detections[i].pose.pose.position.x
00212 positionY = res.object_list.detections[i].pose.pose.position.y
00213 positionZ = res.object_list.detections[i].pose.pose.position.z
00214
00215
00216 self.assertTrue(abs(positionX - posX) <= self.tolerance, "Failed on the x axis comparison%s"%addInfo)
00217 self.assertTrue(abs(positionY - posY) <= self.tolerance, "Failed on the y axis comparison%s"%addInfo)
00218 self.assertTrue(abs(positionZ - posZ) <= self.tolerance, "Failed on the z axis comparison%s"%addInfo)
00219
00220 self.assertTrue(objQTY == len(res.object_list.detections), "Number of objects in the Bagfiles are not equal to number of objects found%s"%addInfo)
00221
00222
00223 except rospy.ServiceException, e:
00224 raise rospy.exceptions.ROSException("Service not available!!%s"%e)
00225
00226 rosbag_process.send_signal(signal.SIGINT)
00227 os.kill(rosbag_process.pid, signal.SIGKILL)
00228 os.system("killall play")
00229
00230
00231
00232
00233
00234
00235
00236
00237
00238 def test_object_detection(self):
00239
00240 self.object_detector()
00241
00242
00243
00244 if __name__ == '__main__':
00245
00246 rospy.init_node('test', anonymous=True)
00247 rostest.rosrun('cob_object_detection_msgs', 'Diagnostics',
00248 TestObjectDetection, sys.argv)