hand_sensor_listeners.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # Software License Agreement (BSD License)
00003 #
00004 # Copyright (c) 2009, Willow Garage, Inc.
00005 # All rights reserved.
00006 #
00007 # Redistribution and use in source and binary forms, with or without
00008 # modification, are permitted provided that the following conditions
00009 # are met:
00010 #
00011 #  * Redistributions of source code must retain the above copyright
00012 #    notice, this list of conditions and the following disclaimer.
00013 #  * Redistributions in binary form must reproduce the above
00014 #    copyright notice, this list of conditions and the following
00015 #    disclaimer in the documentation and/or other materials provided
00016 #    with the distribution.
00017 #  * Neither the name of the Willow Garage nor the names of its
00018 #    contributors may be used to endorse or promote products derived
00019 #    from this software without specific prior written permission.
00020 #
00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032 # POSSIBILITY OF SUCH DAMAGE.
00033 #
00034 # author: Kaijen Hsiao
00035 
00036 ## @package hand_sensor_listeners
00037 #Listen to and keep track of fingertip sensor readings
00038 
00039 from __future__ import division
00040 import roslib
00041 roslib.load_manifest('pr2_gripper_reactive_approach')
00042 import rospy
00043 import time
00044 import scipy
00045 import threading
00046 import copy
00047 import pdb
00048 import math
00049 from pr2_msgs.msg import PressureState
00050 
00051 ##listens to PressureState messages 
00052 class FingertipSensorListener:
00053 
00054   def __init__(self, gripper): #gripper is 'r' or 'l'
00055     self.gripper = gripper
00056     self.lock = threading.Lock()
00057 
00058     #raw values
00059     self.l_readings = [0]*22
00060     self.r_readings = [0]*22
00061 
00062     #thresholded values
00063     self.l_thres = [0]*22
00064     self.r_thres = [0]*22
00065 
00066     #thresholds to use (updated shortly in set_thresholds)
00067     self.l_threshold = [1]*22
00068     self.r_threshold = [1]*22
00069 
00070     #threshold parameters
00071     self.side_threshold_buffer = 125
00072     self.front_threshold_buffer = 225
00073     self.threshold_buffers = [self.side_threshold_buffer]*7 + [self.front_threshold_buffer]*15
00074 
00075     #subscribe to PressureState messages for this gripper
00076     rospy.Subscriber("pressure/"+gripper+"_gripper_motor", PressureState, self.sensor_callback)
00077 
00078     #grab some readings and set the thresholds
00079     time.sleep(2.0)
00080     self.set_thresholds(short = 1)
00081 
00082     #uncomment to test how sensitive the thresholds are
00083     #self.test_sensor_thresholds()
00084 
00085 
00086   ##callback: store the PressureState messages as they come in
00087   def sensor_callback(self, pressurestate):
00088 
00089     self.lock.acquire()    
00090     self.r_readings = pressurestate.r_finger_tip[:]
00091     self.l_readings = pressurestate.l_finger_tip[:]
00092     self.lock.release()
00093 
00094   ##get copies of the thresholded readings
00095   def get_thresholded_readings(self):
00096     self.lock.acquire()
00097 
00098     #using simple threshold: zero out entries less than self.threshold
00099     l_thres = [(x-t if x >= t else 0.) for \
00100                       (x,t) in zip(self.l_readings, self.l_threshold)]
00101     r_thres = [(x-t if x >= t else 0.) for \
00102                       (x,t) in zip(self.r_readings, self.r_threshold)]
00103     self.lock.release()
00104 
00105     return (l_thres, r_thres)
00106 
00107 
00108   ##get copies of the raw readings
00109   def get_readings(self):
00110     self.lock.acquire()
00111     l_readings_copy = self.l_readings[:]
00112     r_readings_copy = self.r_readings[:]
00113     self.lock.release()
00114 
00115     return (l_readings_copy, r_readings_copy)
00116 
00117 
00118   ##get the readings relative to the thresholds (no zeroing of values that are below)
00119   def get_relative_readings(self):
00120     self.lock.acquire()
00121     l_rel = [x-t for (x,t) in zip(self.l_readings, self.l_threshold)]
00122     r_rel = [x-t for (x,t) in zip(self.r_readings, self.r_threshold)]
00123     self.lock.release()
00124     
00125     return (l_rel, r_rel)
00126 
00127 
00128   ##run this when you're sure you're not touching anything, to set the noise threshold
00129   def set_thresholds(self, short = 0):
00130 
00131     #gather data for awhile and use the max value to set the thresholds
00132     l_readings_max = [0.]*22
00133     r_readings_max = [0.]*22
00134 
00135     num_readings = 50
00136     if short:
00137       num_readings = 10
00138 
00139     for i in range(num_readings):
00140       (l_readings, r_readings) = self.get_readings()
00141       l_readings_max = [max(x,y) for (x,y) in zip(l_readings_max, l_readings)]
00142       r_readings_max = [max(x,y) for (x,y) in zip(r_readings_max, r_readings)]
00143       time.sleep(.05)
00144 
00145     #only increase the thresholds, if doing a short reset
00146     self.lock.acquire()
00147     if short:
00148       self.l_threshold = [max(new+buffer, current_thres) for (new, current_thres, buffer) in zip(l_readings_max, self.l_threshold, self.threshold_buffers)]
00149       self.r_threshold = [max(new+buffer, current_thres) for (new, current_thres, buffer) in zip(r_readings_max, self.r_threshold, self.threshold_buffers)]
00150     else:
00151       self.l_threshold = [x + buffer for (x, buffer) in zip(l_readings_max, self.threshold_buffers)]
00152       self.r_threshold = [x + buffer for (x, buffer) in zip(r_readings_max, self.threshold_buffers)]
00153     self.lock.release()
00154 
00155     rospy.loginfo("left tip thresholds:"+str(self.l_threshold))
00156     rospy.loginfo("right tip thresholds:"+str(self.r_threshold))
00157 
00158 
00159   ##test sensor sensitivity
00160   def test_sensor_thresholds(self):
00161     rospy.loginfo("testing the sensor thresholds")
00162     while(not rospy.is_shutdown()):
00163       self.print_sensor_status()
00164       time.sleep(.1)
00165 
00166 
00167   ##pretty-print list to string
00168   def pplist(self, list):
00169     return ' '.join(['%0.3f'%x for x in list])
00170 
00171 
00172   ##print the current readings, thresholded and categorized
00173   def print_sensor_status(self, l_readings = None, r_readings = None):
00174     (l_readings, r_readings, l_regions_touching, r_regions_touching, l_rows_touching, r_rows_touching, \
00175          l_columns_touching, r_columns_touching) = self.get_sensor_status(l_readings, r_readings)
00176 
00177     if any(l_readings):
00178         rospy.loginfo("left")
00179         rospy.loginfo("l_readings: " + self.pplist(l_readings))
00180         rospy.loginfo("l_regions_touching: " + str(l_regions_touching))
00181         rospy.loginfo("l_rows_touching: " + str(l_rows_touching))
00182         rospy.loginfo("l_columns_touching: " + str(l_columns_touching))
00183 
00184     if any(r_readings):
00185         rospy.loginfo("          right")
00186         rospy.loginfo("          r_readings: " + self.pplist(r_readings))
00187         rospy.loginfo("          r_regions_touching: " + str(r_regions_touching))
00188         rospy.loginfo("          r_rows_touching: " + str(r_rows_touching))
00189         rospy.loginfo("          r_columns_touching: " + str(r_columns_touching))
00190 
00191 
00192   ##get readings, thresholded and categorized
00193   def get_sensor_status(self, l_readings = None, r_readings = None):
00194     if l_readings == None or r_readings == None:
00195       (l_readings, r_readings) = self.get_thresholded_readings()
00196     (l_regions_touching, r_regions_touching) = self.regions_touching(l_readings, r_readings)
00197     (l_rows_touching, r_rows_touching) = self.front_rows_touching(l_readings, r_readings)
00198     (l_columns_touching, r_columns_touching) = self.front_columns_touching(l_readings, r_readings)
00199     return (l_readings, r_readings, l_regions_touching, r_regions_touching, l_rows_touching, r_rows_touching, \
00200               l_columns_touching, r_columns_touching)
00201 
00202 
00203   ##report whether various regions are touching (tip, plus_z_side, neg_z_side, front, back) for each sensor
00204   def regions_touching(self, l_readings = None, r_readings = None):
00205     l_region_elements = [[2,3,4,5], [1,], [6,], [7,8,9,10,11,12,13,14,15,16,17,18,19,20,21], [0]]
00206     r_region_elements = [[2,3,4,5], [6,], [1,], [7,8,9,10,11,12,13,14,15,16,17,18,19,20,21], [0]]
00207  
00208     if l_readings == None or r_readings == None:
00209       (l_readings, r_readings) = self.get_thresholded_readings()
00210     l_regions_touching = []
00211     for region in l_region_elements:
00212       l_regions_touching.append(any([l_readings[element] for element in region]))
00213 
00214     r_regions_touching = []
00215     for region in r_region_elements:
00216       r_regions_touching.append(any([r_readings[element] for element in region]))
00217  
00218     return (l_regions_touching, r_regions_touching)
00219 
00220 
00221   ##report whether each row of 3 elements on each sensor's front (from base to tip) is in contact
00222   def front_rows_touching(self, l_readings = None, r_readings = None):
00223     if l_readings == None or r_readings == None:
00224       (l_readings, r_readings) = self.get_thresholded_readings()
00225     l_rows_touching = [0]*5
00226     r_rows_touching = [0]*5
00227 
00228     for i in range(5):
00229       if any(l_readings[i*3+7:(i+1)*3+7]):
00230         l_rows_touching[i] = 1
00231       if any(r_readings[i*3+7:(i+1)*3+7]):
00232         r_rows_touching[i] = 1
00233     
00234     l_rows_touching.reverse()
00235     r_rows_touching.reverse()
00236 
00237 #     rospy.loginfo("l_rows_touching: "+str(l_rows_touching))
00238 #     rospy.loginfo("r_rows_touching: "+str(r_rows_touching))
00239 
00240     return (l_rows_touching, r_rows_touching)
00241     
00242    
00243   ##report whether each column of 5 elements on each sensor's front (from +z side to -z side) is in contact
00244   def front_columns_touching(self, l_readings = None, r_readings = None):
00245     if l_readings == None or r_readings == None:
00246       (l_readings, r_readings) = self.get_thresholded_readings()
00247     l_columns_touching = [0]*3
00248     r_columns_touching = [0]*3
00249 
00250     for i in range(3):
00251       l_column_elements = [x*3+7+i for x in range(5)]
00252       r_column_elements = [x*3+9-i for x in range(5)]
00253       if scipy.array(l_readings)[l_column_elements].any():
00254         l_columns_touching[i] = 1
00255       if scipy.array(r_readings)[r_column_elements].any():
00256         r_columns_touching[i] = 1
00257 
00258 #     rospy.loginfo("l_columns_touching: "+str(l_columns_touching))
00259 #     rospy.loginfo("r_columns_touching: "+str(r_columns_touching))
00260 
00261     return (l_columns_touching, r_columns_touching)
00262 
00263 
00264     
00265 if __name__ == "__main__":
00266   rospy.init_node('hand_sensor_listeners', anonymous=True)
00267 
00268   listener = FingertipSensorListener('r')
00269   #listener = FingertipSensorListener('l')
00270   
00271   listener.test_sensor_thresholds()


pr2_gripper_reactive_approach
Author(s): Kaijen Hsiao
autogenerated on Mon Oct 6 2014 12:27:12