25 from multiprocessing
import Process
32 from cob_object_detection_msgs.srv
import DetectObjects, DetectObjectsRequest
33 from std_msgs.msg
import String
38 os.system(
"killall play")
45 self.
PKG = rospy.get_param(
"PKG")
46 self.
mode = rospy.get_param(
"mode")
54 if(
"tolerance" in chunk):
57 elif(
"tolerance" in bag):
60 elif rospy.has_param(
'tolerance'):
61 self.
tolerance = rospy.get_param(
'tolerance')
66 os.system(
'rosbag play -d 5 -k --clock %s' % bagPath)
78 if isinstance(self.
objects, type(
None)):
91 posX = (float)(self.
objects[index][
'position'][0])
92 posY = (float)(self.
objects[index][
'position'][1])
93 posZ = (float)(self.
objects[index][
'position'][2])
95 return posX, posY, posZ
99 for i
in range(len(self.
bags)):
101 bagPath = roslib.packages.get_pkg_dir(self.
PKG) + self.
bags[i][
'bag_path']
102 yamlPath = roslib.packages.get_pkg_dir(self.
PKG) + self.
bags[i][
'yaml_path']
105 inBag = yaml.load(open(yamlPath).read())
108 if(self.
mode ==
"delay"):
109 rosbag_process = subprocess.Popen(
"rosbag play -d 2 -k --clock %s" % bagPath, shell=
True)
111 rosbag_process = subprocess.Popen(
"rosbag play --clock %s" % bagPath, shell=
True)
114 rospy.wait_for_service(
'/object_detection/detect_object', 10)
122 for t
in range(objQTY):
124 recognition_service = rospy.ServiceProxy(
'/object_detection/detect_object', DetectObjects)
126 req = DetectObjectsRequest()
129 req.object_name = String(self.
objID)
137 res = recognition_service(req)
143 addInfo =
"Object type: " + self.
objID +
", path: " + bagPath +
", index: " + (str)(t) +
" " + (str)(posX)
145 if(len(res.object_list.detections) > 0):
147 for i
in range(len(res.object_list.detections)):
149 positionX = res.object_list.detections[i].pose.pose.position.x
150 positionY = res.object_list.detections[i].pose.pose.position.y
151 positionZ = res.object_list.detections[i].pose.pose.position.z
154 self.assertTrue(abs(positionX - posX) <= self.
tolerance,
"Failed on the x axis comparison%s"%addInfo)
155 self.assertTrue(abs(positionY - posY) <= self.
tolerance,
"Failed on the y axis comparison%s"%addInfo)
156 self.assertTrue(abs(positionZ - posZ) <= self.
tolerance,
"Failed on the z axis comparison%s"%addInfo)
158 self.assertTrue(objQTY == len(res.object_list.detections),
"Number of objects in the Bagfiles are not equal to number of objects found%s"%addInfo)
160 except rospy.ServiceException
as e:
161 raise rospy.exceptions.ROSException(
"Service not available!!%s"%e)
163 rosbag_process.send_signal(signal.SIGINT)
164 os.kill(rosbag_process.pid, signal.SIGKILL)
165 os.system(
"killall play")
180 if __name__ ==
'__main__':
182 rospy.init_node(
'test', anonymous=
True)
183 rostest.rosrun(
'cob_object_detection_msgs',
'Diagnostics', TestObjectDetection, sys.argv)
def getLabel(self, index)
def test_object_detection(self)
def process_objects(self, index)
def get_bags(self, bagfiles)
def playback_bag(self, bagPath)
Alternative function for launching the bagfiles.
def object_detector(self)
def getNumberofObjects(self, inBag)
def updateTolerance(self, bag=0, chunk=0)