detection_component_testing.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 #
3 # Copyright 2017 Fraunhofer Institute for Manufacturing Engineering and Automation (IPA)
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 
17 
18 import os
19 import sys
20 import time
21 import signal
22 import unittest
23 import yaml
24 import subprocess
25 from multiprocessing import Process
26 
27 import rospy
28 import roslib
29 import rostest
30 import rosbag
31 
32 from cob_object_detection_msgs.srv import DetectObjects, DetectObjectsRequest
33 from std_msgs.msg import String
34 
35 class TestObjectDetection(unittest.TestCase):
36 
37  def tearDown(self):
38  os.system("killall play")
39 
40  def setUp(self):
41 
42  # positions and tolerance for comparing with the acquired data from the simulation or from the rosbag
43 
44  self.bagfiles = rospy.get_param("test_bag")
45  self.PKG = rospy.get_param("PKG")
46  self.mode = rospy.get_param("mode")
47 
48  self.bags = self.get_bags(self.bagfiles)
49  self.tolerance = 0.5
50  self.objID = None
51 
52  def updateTolerance(self, bag=0, chunk=0):
53 
54  if("tolerance" in chunk):
55  self.tolerance = chunk['tolerance']
56 
57  elif("tolerance" in bag):
58  self.tolerance = bag['tolerance']
59 
60  elif rospy.has_param('tolerance'):
61  self.tolerance = rospy.get_param('tolerance')
62 
63 ###############################################################################
64  ## Alternative function for launching the bagfiles
65  def playback_bag(self, bagPath):
66  os.system('rosbag play -d 5 -k --clock %s' % bagPath)
67 
68  def get_bags(self, bagfiles):
69 
70  bags = []
71  for item in bagfiles:
72  bags.append(item)
73  return bags
74 
75  def getNumberofObjects(self,inBag):
76 
77  self.objects = inBag['objects']
78  if isinstance(self.objects, type(None)):
79  objectsQTY = 0
80  else:
81  objectsQTY = len(self.objects)
82 
83  return objectsQTY
84 
85  def getLabel(self, index):
86 
87  self.objID = self.objects[index]['label']
88 
89  def process_objects(self, index):
90 
91  posX = (float)(self.objects[index]['position'][0])
92  posY = (float)(self.objects[index]['position'][1])
93  posZ = (float)(self.objects[index]['position'][2])
94 
95  return posX, posY, posZ
96 
97  def object_detector(self):
98 
99  for i in range(len(self.bags)):
100 
101  bagPath = roslib.packages.get_pkg_dir(self.PKG) + self.bags[i]['bag_path']
102  yamlPath = roslib.packages.get_pkg_dir(self.PKG) + self.bags[i]['yaml_path']
103  #if(i==1):
104  # raise rospy.exceptions.ROSException("COB3 - has found no object!!%s"%bagPath, yamlPath)
105  inBag = yaml.load(open(yamlPath).read())
106 
107  # Wait the initialization of the Object Detection Service
108  if(self.mode == "delay"):
109  rosbag_process = subprocess.Popen("rosbag play -d 2 -k --clock %s" % bagPath, shell=True)
110  else:
111  rosbag_process = subprocess.Popen("rosbag play --clock %s" % bagPath, shell=True)
112  ##out = rosbag_process.communicate()
113 
114  rospy.wait_for_service('/object_detection/detect_object', 10)
115  # Alternative bagfile launching
116  #bag_playback = Process(target=self.playback_bag, args=(bagPath,))
117  #bag_playback.start()
118 
119  try:
120  objQTY = self.getNumberofObjects(inBag)
121 
122  for t in range(objQTY):
123 
124  recognition_service = rospy.ServiceProxy('/object_detection/detect_object', DetectObjects)
125 
126  req = DetectObjectsRequest()
127 
128  self.getLabel(t)
129  req.object_name = String(self.objID)
130 
131  # Definition of the Region of Interest
132  req.roi.x_offset = 0
133  req.roi.y_offset = 0
134  req.roi.width = 0
135  req.roi.height = 0
136 
137  res = recognition_service(req)
138 
139  [posX, posY, posZ] = self.process_objects(t)
140 
141  self.updateTolerance(bag = self.bags[i], chunk = self.objects[t])
142 
143  addInfo = "Object type: " + self.objID + ", path: " + bagPath + ", index: " + (str)(t) + " " + (str)(posX)
144 
145  if(len(res.object_list.detections) > 0):
146  # Get the Cartesian Coordinates positions for the detected objects
147  for i in range(len(res.object_list.detections)):
148 
149  positionX = res.object_list.detections[i].pose.pose.position.x
150  positionY = res.object_list.detections[i].pose.pose.position.y
151  positionZ = res.object_list.detections[i].pose.pose.position.z
152 
153  # Test assertions for guaranteeing the correct loading of the Object Detection System Component
154  self.assertTrue(abs(positionX - posX) <= self.tolerance, "Failed on the x axis comparison%s"%addInfo)
155  self.assertTrue(abs(positionY - posY) <= self.tolerance, "Failed on the y axis comparison%s"%addInfo)
156  self.assertTrue(abs(positionZ - posZ) <= self.tolerance, "Failed on the z axis comparison%s"%addInfo)
157 
158  self.assertTrue(objQTY == len(res.object_list.detections), "Number of objects in the Bagfiles are not equal to number of objects found%s"%addInfo)
159 
160  except rospy.ServiceException as e:
161  raise rospy.exceptions.ROSException("Service not available!!%s"%e)
162 
163  rosbag_process.send_signal(signal.SIGINT)
164  os.kill(rosbag_process.pid, signal.SIGKILL)
165  os.system("killall play")
166 
167  # Alternative bagfile launching
168 
169  #if bag_playback.is_alive():
170  # rospy.loginfo('terminating playback process')
171  # bag_playback.terminate()
172  # time.sleep(1)
173  # rospy.loginfo('playback process terminated? %s' % str(not bag_playback.is_alive()))
174 
176  self.object_detector()
177 
178 
179 # Main Function for the tests run
180 if __name__ == '__main__':
181 
182  rospy.init_node('test', anonymous=True)
183  rostest.rosrun('cob_object_detection_msgs', 'Diagnostics', TestObjectDetection, sys.argv)
def playback_bag(self, bagPath)
Alternative function for launching the bagfiles.


cob_object_detection_msgs
Author(s): Florian Weisshardt
autogenerated on Sun Oct 18 2020 13:13:08