Go to the documentation of this file.00001
00002 import sensor_msgs.msg as sm
00003 import pr2_arm_navigation_perception.srv as snp
00004
00005
00006 import rospy
00007 import numpy as np
00008 import hrl_lib.rutils as ru
00009
00010
00011
00012
00013
00014
00015 def pressure_state_to_mat(contact_msgs):
00016 times = np.array([c.header.stamp.to_time() for c in contact_msgs])
00017 left, right = zip(*[[list(c.l_finger_tip), list(c.r_finger_tip)] for c in contact_msgs])
00018
00019 left = np.matrix(left).T
00020 right = np.matrix(right).T
00021 return left, right, times
00022
00023 class LaserScanner:
00024 def __init__(self, service):
00025 srv_name = '/%s/single_sweep_cloud' % service
00026 self.sp = rospy.ServiceProxy(srv_name, snp.BuildCloudAngle)
00027
00028 def scan_np(self, start, end, duration):
00029 resp = self.sp(start, end, duration)
00030 return ru.pointcloud_to_np(resp.cloud)
00031
00032 def scan(self, start, end, duration):
00033 resp = self.sp(start, end, duration)
00034 return resp.cloud
00035
00036 def sweep_param_to_tilt_param(angle_begin, angle_end, duration):
00037 amplitude = abs(angle_end - angle_begin)/2.0
00038 offset = (angle_end + angle_begin)/2.0
00039 period = duration*2.0
00040 return {'amplitude': amplitude, 'offset':offset, 'period': period}
00041
00042 class PointCloudReceiver:
00043 def __init__(self, topic):
00044 self.listener = ru.GenericListener('point_cloud_receiver', sm.PointCloud, topic, .2)
00045 self.topic = topic
00046
00047 def read(self):
00048 cur_time = rospy.Time.now().to_sec()
00049 not_fresh = True
00050
00051 while not_fresh:
00052 pcmsg = self.listener.read(allow_duplication=False, willing_to_wait=True, warn=False, quiet=True)
00053 if not (pcmsg.header.stamp.to_sec() < cur_time):
00054 not_fresh = False
00055 tdif = rospy.Time.now().to_sec() - cur_time
00056 if tdif > 10.:
00057 rospy.logerr('Have not heard from %s for %.2f seconds.' % (self.topic, tdif))
00058 rospy.sleep(.1)
00059
00060 return pcmsg
00061
00062
00063