Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 import os
00019 import sys, subprocess
00020 import time
00021 import math
00022 from math import fmod, pi
00023 import signal
00024 import types
00025 import unittest
00026
00027 import roslib
00028 roslib.load_manifest("cob_object_detection_msgs")
00029
00030 import rospy
00031
00032 import rostest
00033 import rosparam
00034
00035 from cob_object_detection_msgs.msg import *
00036 from cob_object_detection_msgs.srv import *
00037 from std_msgs.msg import String
00038
00039 from std_srvs.srv import Empty
00040
00041 import rosbag
00042
00043 from multiprocessing import Process
00044
00045 import yaml
00046
00047 class TestObjectDetection(unittest.TestCase):
00048
00049
00050 def tearDown(self):
00051
00052 os.system("killall play")
00053
00054 def setUp(self):
00055
00056
00057
00058 self.bagfiles = rospy.get_param("test_bag")
00059 self.PKG = rospy.get_param("PKG")
00060 self.mode = rospy.get_param("mode")
00061
00062 self.bags = self.get_bags(self.bagfiles)
00063 self.tolerance = 0.5
00064 self.objID = None
00065
00066 def updateTolerance(self, bag=0, chunk=0):
00067
00068 if("tolerance" in chunk):
00069 self.tolerance = chunk['tolerance']
00070
00071 elif("tolerance" in bag):
00072 self.tolerance = bag['tolerance']
00073
00074 elif rospy.has_param('tolerance'):
00075
00076 self.tolerance = rospy.get_param('tolerance')
00077
00078
00079
00080 def playback_bag(self, bagPath):
00081 os.system('rosbag play -d 5 -k --clock %s' % bagPath)
00082
00083 def get_bags(self, bagfiles):
00084
00085 bags = []
00086
00087 for item in bagfiles:
00088 bags.append(item)
00089 return bags
00090
00091 def getNumberofObjects(self,inBag):
00092
00093 self.objects = inBag['objects']
00094 if isinstance(self.objects, types.NoneType):
00095 objectsQTY = 0
00096 else:
00097 objectsQTY = len(self.objects)
00098
00099 return objectsQTY
00100
00101 def getLabel(self, index):
00102
00103 self.objID = self.objects[index]['label']
00104
00105 def process_objects(self, index):
00106
00107 posX = (float)(self.objects[index]['position'][0])
00108 posY = (float)(self.objects[index]['position'][1])
00109 posZ = (float)(self.objects[index]['position'][2])
00110
00111 return posX, posY, posZ
00112
00113 def object_detector(self):
00114
00115 for i in range(len(self.bags)):
00116
00117 bagPath = roslib.packages.get_pkg_dir(self.PKG) + self.bags[i]['bag_path']
00118 yamlPath = roslib.packages.get_pkg_dir(self.PKG) + self.bags[i]['yaml_path']
00119
00120
00121 inBag = yaml.load(open(yamlPath).read())
00122
00123
00124
00125 if(self.mode == "delay"):
00126 rosbag_process = subprocess.Popen("rosbag play -d 2 -k --clock %s" % bagPath, shell=True)
00127 else:
00128 rosbag_process = subprocess.Popen("rosbag play --clock %s" % bagPath, shell=True)
00129
00130
00131 rospy.wait_for_service('/object_detection/detect_object', 10)
00132
00133
00134
00135
00136
00137
00138 try:
00139 objQTY = self.getNumberofObjects(inBag)
00140
00141 for t in range(objQTY):
00142
00143 recognition_service = rospy.ServiceProxy('/object_detection/detect_object', DetectObjects)
00144
00145 req = DetectObjectsRequest()
00146
00147 self.getLabel(t)
00148 req.object_name = String(self.objID)
00149
00150
00151 req.roi.x_offset = 0;
00152 req.roi.y_offset = 0;
00153 req.roi.width = 0;
00154 req.roi.height = 0;
00155
00156 res = recognition_service(req)
00157
00158 [posX, posY, posZ] = self.process_objects(t)
00159
00160 self.updateTolerance(bag = self.bags[i], chunk = self.objects[t])
00161
00162 addInfo = "Object type: " + self.objID + ", path: " + bagPath + ", index: " + (str)(t) + " " + (str)(posX)
00163
00164 if(len(res.object_list.detections) > 0):
00165
00166
00167
00168 for i in range(len(res.object_list.detections)):
00169
00170 positionX = res.object_list.detections[i].pose.pose.position.x
00171 positionY = res.object_list.detections[i].pose.pose.position.y
00172 positionZ = res.object_list.detections[i].pose.pose.position.z
00173
00174
00175 self.assertTrue(abs(positionX - posX) <= self.tolerance, "Failed on the x axis comparison%s"%addInfo)
00176 self.assertTrue(abs(positionY - posY) <= self.tolerance, "Failed on the y axis comparison%s"%addInfo)
00177 self.assertTrue(abs(positionZ - posZ) <= self.tolerance, "Failed on the z axis comparison%s"%addInfo)
00178
00179 self.assertTrue(objQTY == len(res.object_list.detections), "Number of objects in the Bagfiles are not equal to number of objects found%s"%addInfo)
00180
00181
00182 except rospy.ServiceException, e:
00183 raise rospy.exceptions.ROSException("Service not available!!%s"%e)
00184
00185 rosbag_process.send_signal(signal.SIGINT)
00186 os.kill(rosbag_process.pid, signal.SIGKILL)
00187 os.system("killall play")
00188
00189
00190
00191
00192
00193
00194
00195
00196
00197 def test_object_detection(self):
00198
00199 self.object_detector()
00200
00201
00202
00203 if __name__ == '__main__':
00204
00205 rospy.init_node('test', anonymous=True)
00206 rostest.rosrun('cob_object_detection_msgs', 'Diagnostics',
00207 TestObjectDetection, sys.argv)