lower_step_detector.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # coding: utf-8
00003 
00004 """
00005 fix original laser scan data to detect down step
00006 """
00007 __author__  = "MasaruMorita <p595201m@mail.kyutech.jp>"
00008 __version__ = "1.01"
00009 __date__    = "18 Oct 2015"
00010 
00011 import rospy
00012 import copy
00013 import math
00014 import sys
00015 import numpy as np
00016 from sensor_msgs.msg import LaserScan
00017 
00018 NAME_NODE = 'lower_step_detector'
00019 
00020 class LaserScanEx():
00021     threshold_intensity = 0
00022     angle_laser_rad = 0
00023     angle_laser_deg = 0
00024     angle_laser_to_calc_intensity = 0
00025     virtual_intensity_to_ground = 0
00026     is_beyond_threshold_before = False
00027     is_beyond_threshold_after = False
00028 
00029     def __init__(self):
00030         self.threshold = 0
00031         self.angle_laser_rad = 0
00032         self.angle_laser_deg = 0
00033         self.angle_laser_to_calc_intensity = 0
00034         self.is_beyond_threshold_before = False
00035         self.is_beyond_threshold_after = False
00036         self.intensity_to_ground = 0
00037 
00038 class ChunkAngle():
00039     index_start_laser_range = 0
00040     index_end_laser_range = 0
00041     count_elem = 0
00042     count_beyond_thresh = 0
00043     thresh_ratio_detect = 0
00044     ratio_detect = 0
00045     is_beyond_threshold = False
00046 
00047     def __init__(self, i_count_elem, i_index_start, i_thresh_ratio_detect):
00048         self.count_elem = i_count_elem
00049         self.index_start_laser_range = i_index_start
00050         self.index_end_laser_range = self.index_start_laser_range + self.count_elem
00051         self.thresh_ratio_detect = i_thresh_ratio_detect
00052 
00053     def judge_beyond_thresh(self, i_laser_scan_exs):
00054         self.count_beyond_thresh = 0
00055         for i in range(int(self.index_start_laser_range), int(self.index_end_laser_range)):
00056             if i_laser_scan_exs[i].is_beyond_threshold_before == True:
00057                 self.count_beyond_thresh += 1
00058 
00059         self.ratio_detect = self.count_beyond_thresh / self.count_elem
00060         if self.ratio_detect > self.thresh_ratio_detect:
00061             self.is_beyond_threshold = True
00062         else:
00063             self.is_beyond_threshold = False
00064 
00065     def update_intensity(self, i_tmp_fix_data, i_laser_scan_exs):
00066         if self.is_beyond_threshold == True:
00067             for i in range(int(self.index_start_laser_range), int(self.index_end_laser_range)):
00068                 i_tmp_fix_data[i] = copy.deepcopy(i_laser_scan_exs[i].virtual_intensity_to_ground)
00069                 #i_tmp_fix_data[i] = 1.0#i_laser_scan_exs[i].virtual_intensity_to_ground
00070 
00071 class LowerStepDetector():
00072     ## public constants
00073     # topic names
00074     NAME_TOPIC_SUFFIX_TO_PUBLISH = '_fix'
00075     NAME_TOPIC_LASER_ORI = 'base_scan1'
00076     NAME_TOPIC_LASER_FIX = NAME_TOPIC_LASER_ORI + NAME_TOPIC_SUFFIX_TO_PUBLISH
00077     # parameter names
00078     NAME_PARAM_LASER_INTENSITY = NAME_NODE + '/laser_intensity_max'
00079     NAME_PARAM_VIRTUAL_LASER_INTENSITY = NAME_NODE + '/virtual_laser_intensity'
00080     NAME_PARAM_LASER_SCAN_RANGE_DEG = NAME_NODE + '/laser_scan_range_deg'
00081     NAME_PARAM_DETECT_STEP_ANGLE_MIN_DEG = NAME_NODE + '/detect_step_angle_min_deg'
00082     NAME_PARAM_MARGIN_BETWEEN_PLANE_AND_DOWN_STEP = NAME_NODE + '/margin_between_plane_and_down_step'
00083     NAME_PARAM_CHUNK_ANGLE_FOR_NOISE_DEG = NAME_NODE + '/chunk_angle_for_noise_deg'
00084     NAME_PARAM_RATIO_DETECT_IN_CHUNK = NAME_NODE + '/ratio_detect_in_chunk'
00085 
00086     # default parameter values
00087     DEFAULT_LASER_INTENSITY_MAX = 1.5
00088     DEFAULT_MARGIN_BETWEEN_PLANE_AND_DOWN_STEP = 1.0
00089     DEFAULT_VIRTUAL_LASER_INTENSITY = 1.0
00090     DEFAULT_LASER_SCAN_RANGE_DEG = 180.0
00091     DEFAULT_DETECT_STEP_ANGLE_MIN_DEG = 10.0
00092     DEFAULT_CHUNK_ANGLE_FOR_NOISE_DEG = 5.0
00093     DEFAULT_RATIO_DETECT_IN_CHUNK = 0.8
00094 
00095     ## private member variables
00096     # original laser scan subscriber
00097     _laser_ori_sub = 0
00098     # fixed laser scan publisher
00099     _laser_fix_pub = 0
00100     _laser_intensity_max = 0
00101     _virtual_laser_intensity = 0
00102     _detect_index_min = 0
00103     _detect_index_max = 0
00104     #
00105     _laser_scan_range_deg = 0
00106     _detect_step_angle_min_deg = 0
00107     _detect_step_angle_max_deg = 0
00108     _detect_angle_center_deg = 0
00109     _margin_between_plane_and_down_step = 0
00110     _chunk_angle_for_noise_deg = 0
00111     _ratio_detect_in_chunk = 0
00112     #
00113     _is_init = True
00114     _laser_scan_exs = []
00115 
00116     _count_chunk_angle = 0
00117     _chunks_to_detect_noise = []
00118 
00119     ## constructor
00120     def __init__(self):
00121         rospy.init_node(NAME_NODE, anonymous=False)
00122         self.load_topic_name_to_sub()
00123         self.load_rosparam()
00124         self._laser_ori_sub = rospy.Subscriber(self.NAME_TOPIC_LASER_ORI, LaserScan, self.on_subscribe_laser_scan)
00125         self._laser_fix_pub = rospy.Publisher(self.NAME_TOPIC_LASER_FIX, LaserScan, queue_size = 1)
00126         # start callback
00127         rospy.spin()
00128 
00129     ## methods
00130     def load_topic_name_to_sub(self):
00131         argvs = sys.argv
00132         count_arg = len(argvs)
00133         if count_arg > 1:
00134             self.NAME_TOPIC_LASER_ORI = argvs[1]
00135             self.NAME_TOPIC_LASER_FIX = self.NAME_TOPIC_LASER_ORI + self.NAME_TOPIC_SUFFIX_TO_PUBLISH
00136 
00137     def load_rosparam(self):
00138         self._laser_intensity_max = rospy.get_param(self.NAME_PARAM_LASER_INTENSITY, self.DEFAULT_LASER_INTENSITY_MAX)
00139         self._margin_between_plane_and_down_step = rospy.get_param(self.NAME_PARAM_MARGIN_BETWEEN_PLANE_AND_DOWN_STEP, self.DEFAULT_MARGIN_BETWEEN_PLANE_AND_DOWN_STEP)
00140         self._virtual_laser_intensity = rospy.get_param(self.NAME_PARAM_VIRTUAL_LASER_INTENSITY, self.DEFAULT_VIRTUAL_LASER_INTENSITY)
00141         self._laser_scan_range_deg = rospy.get_param(self.NAME_PARAM_LASER_SCAN_RANGE_DEG, self.DEFAULT_LASER_SCAN_RANGE_DEG)
00142         self._detect_step_angle_min_deg = rospy.get_param(self.NAME_PARAM_DETECT_STEP_ANGLE_MIN_DEG, self.DEFAULT_DETECT_STEP_ANGLE_MIN_DEG)
00143         self._detect_step_angle_max_deg = self._laser_scan_range_deg - self._detect_step_angle_min_deg
00144         self._detect_angle_center_deg = self._laser_scan_range_deg / 2.0
00145         self._chunk_angle_for_noise_deg = rospy.get_param(self.NAME_PARAM_CHUNK_ANGLE_FOR_NOISE_DEG, self.DEFAULT_CHUNK_ANGLE_FOR_NOISE_DEG)
00146         self._ratio_detect_in_chunk = rospy.get_param(self.NAME_PARAM_RATIO_DETECT_IN_CHUNK, self.DEFAULT_RATIO_DETECT_IN_CHUNK)
00147 
00148     def calculate_threshold_intensity(self, i_laser_sensor_msg_ori):
00149          # calculate parameters
00150          angle_increment = i_laser_sensor_msg_ori.angle_increment
00151          self._detect_index_min = math.radians(self._detect_step_angle_min_deg) / angle_increment
00152          self._detect_index_max = math.radians(self._detect_step_angle_max_deg) / angle_increment
00153          detect_index_mid = math.radians(self._detect_angle_center_deg) / angle_increment
00154 
00155          for i in range(len(i_laser_sensor_msg_ori.ranges)):
00156              laser_scan_ex = LaserScanEx()
00157 
00158              angle_curr_rad = i * angle_increment
00159              angle_curr_deg = math.degrees(angle_curr_rad)
00160 
00161              if i < detect_index_mid:
00162                  theta = angle_curr_rad
00163              else:
00164                  theta = math.pi - angle_curr_rad
00165 
00166              # to avoid zero division
00167              if theta != 0:
00168                  virtual_intensity_to_ground = self._virtual_laser_intensity / math.sin(theta)
00169                  laser_intensity_thresh = (self._laser_intensity_max  + self._margin_between_plane_and_down_step) / math.sin(theta)
00170              else:
00171                  virtual_intensity_to_ground = self._virtual_laser_intensity
00172                  laser_intensity_thresh = float("inf")
00173 
00174              laser_scan_ex.virtual_intensity_to_ground = virtual_intensity_to_ground
00175              laser_scan_ex.angle_laser_rad = angle_curr_rad
00176              laser_scan_ex.angle_laser_deg = angle_curr_deg
00177              laser_scan_ex.angle_laser_to_calc_intensity = theta
00178 
00179              # beyond the extend of step scan
00180              if i < self._detect_index_min or i > self._detect_index_max:
00181                  laser_scan_ex.threshold_intensity = float("inf")
00182              else:
00183                  laser_scan_ex.threshold_intensity = laser_intensity_thresh
00184              self._laser_scan_exs.append(laser_scan_ex)
00185 
00186     def create_angle_chunk(self, i_laser_sensor_msg_ori):
00187         self.calculate_threshold_intensity(i_laser_sensor_msg_ori)
00188 
00189         if (self._chunk_angle_for_noise_deg != 0):
00190             count_chunk = round(self._laser_scan_range_deg / self._chunk_angle_for_noise_deg)
00191         else:
00192             count_chunk = 6
00193 
00194         count_elem_chunk = len(i_laser_sensor_msg_ori.ranges) / count_chunk
00195 
00196         for j in range(int(count_chunk)):
00197             index_start = count_elem_chunk * j
00198             chunk_angle = ChunkAngle(count_elem_chunk, index_start, self._ratio_detect_in_chunk)
00199             self._chunks_to_detect_noise.append(chunk_angle)
00200 
00201     # first intensity judge. before analyzing intensity
00202     def judge_intensity(self, i_laser_sensor_msg_ori):
00203         for i in range(len(i_laser_sensor_msg_ori.ranges)):
00204             # skip when a range cannot detect down step
00205             if i < self._detect_index_min or i > self._detect_index_max:
00206                 continue
00207 
00208             # overwrite only when range can detect down step
00209             if i_laser_sensor_msg_ori.ranges[i] > self._laser_scan_exs[i].threshold_intensity:
00210                 #print 'detected lower step at %f[degree]!' % self._laser_scan_exs[i].angle_laser_deg
00211                 self._laser_scan_exs[i].is_beyond_threshold_before = True
00212             else:
00213                 self._laser_scan_exs[i].is_beyond_threshold_before = False
00214 
00215     # second intensity judge. analyzing intensity for noise detect
00216     def analyze_intensity_noise_detect(self, i_tmp_fix_data):
00217         for j in range(len(self._chunks_to_detect_noise)):
00218             self._chunks_to_detect_noise[j].judge_beyond_thresh(self._laser_scan_exs)
00219             self._chunks_to_detect_noise[j].update_intensity(i_tmp_fix_data, self._laser_scan_exs)
00220 
00221     def on_subscribe_laser_scan(self, i_laser_sensor_msg_ori):
00222         laser_sensor_msg_fix = copy.deepcopy(i_laser_sensor_msg_ori)
00223         # temporary buffer for publish. This is because tuple(type of LaserScan.ranges) type can't be overwritten.
00224         tmp_fix_data = len(i_laser_sensor_msg_ori.ranges)*[0]
00225 
00226         # calculate threshold only when it's the first time
00227         if self._is_init == True:
00228             # reset initial flag
00229             self._is_init = False
00230             self.create_angle_chunk(i_laser_sensor_msg_ori)
00231 
00232         # judge laser scan data
00233         for i in range(len(i_laser_sensor_msg_ori.ranges)):
00234             # copy original data
00235             tmp_fix_data[i] = copy.deepcopy(i_laser_sensor_msg_ori.ranges[i])
00236 
00237         # judge laser scan data before noise detect
00238         self.judge_intensity(i_laser_sensor_msg_ori)
00239 
00240         # analyze laser scan data to consider noise
00241         self.analyze_intensity_noise_detect(tmp_fix_data)
00242 
00243         laser_sensor_msg_fix.ranges = tuple(tmp_fix_data)
00244 
00245         # publish fixed laser scan topic
00246         self._laser_fix_pub.publish(laser_sensor_msg_fix)
00247         #print 'succeed publish'
00248 
00249 if __name__ == '__main__':
00250     try:
00251         print 'start program'
00252         LowerStepDetector()
00253 
00254     except:
00255         rospy.loginfo("lower_step_detector finished.")


lower_step_detector
Author(s):
autogenerated on Thu Jun 6 2019 19:31:41