Go to the documentation of this file.00001
00002 import roslib; roslib.load_manifest('hrl_pr2_lib')
00003 import rospy
00004
00005 import sensor_msgs.msg as sm
00006
00007 import arm_navigation_msgs.srv as psrv
00008 import numpy as np
00009 import time
00010 import hrl_pr2_lib.msg as hmsg
00011
00012
00013 class CollisionClient:
00014 def __init__(self, arm):
00015 rospy.Subscriber('contacts_' + arm, hmsg.OnlineContact, self.collision_cb, tcp_nodelay=True)
00016 self.contacted_with_self = [False, None, None]
00017
00018 def collision_cb(self, msg):
00019 for c in msg.contacts:
00020 if c.contact_body_1 == 'points' or c.contact_body_2 == 'points' \
00021 or c.contact_body_1 == None or c.contact_body_2 == None:
00022 continue
00023 else:
00024 self.contacted_with_self = [True, c.contact_body_1, c.contact_body_2]
00025
00026 def check_self_contacts(self):
00027 r = self.contacted_with_self
00028 self.contacted_with_self = [False, None, None]
00029 if r[0] == True:
00030 rospy.loginfo('Contact made between %s and %s.' % (r[1], r[2]))
00031 return r[0]
00032
00033
00034 class CollisionMonitor:
00035 def __init__(self, arm):
00036 rospy.init_node('collision_monitor_' + arm)
00037 self.name_dict = None
00038 link_names = ['_shoulder_pan_joint', '_shoulder_lift_joint',
00039 '_upper_arm_roll_joint', '_elbow_flex_joint',
00040 '_forearm_roll_joint', '_wrist_flex_joint', '_wrist_roll_joint']
00041 self.arm_name = [arm + l for l in link_names]
00042
00043 if arm == 'l':
00044 service_name = 'environment_server_left_arm/get_state_validity'
00045 else:
00046 service_name = 'environment_server_right_arm/get_state_validity'
00047
00048
00049 rospy.loginfo('waiting for ' + service_name)
00050 rospy.wait_for_service(service_name)
00051 self.check_state_validity_client = rospy.ServiceProxy(service_name, \
00052 psrv.GetStateValidity, persistent=True)
00053 rospy.Subscriber('joint_states', sm.JointState, self.joint_state_cb, \
00054 queue_size=5, tcp_nodelay=True)
00055 self.contact_pub = rospy.Publisher('contacts_' + arm, hmsg.OnlineContact)
00056
00057 def joint_state_cb(self, msg):
00058 if self.name_dict == None:
00059 self.name_dict = {}
00060 for i, n in enumerate(msg.name):
00061 self.name_dict[n] = i
00062
00063 arm_indices = [self.name_dict[n] for n in self.arm_name]
00064 arm_list = np.array(msg.position)[arm_indices].tolist()
00065
00066 req = psrv.GetStateValidityRequest()
00067 req.robot_state.joint_state.name = self.arm_name
00068 req.robot_state.joint_state.position = arm_list
00069 req.robot_state.joint_state.header.stamp = rospy.get_rostime()
00070 req.check_collisions = True
00071 res = self.check_state_validity_client(req)
00072 if not (res.error_code.val == res.error_code.SUCCESS):
00073
00074
00075
00076
00077
00078
00079
00080 m = hmsg.OnlineContact()
00081 m.contacts = res.contacts
00082 self.contact_pub.publish(m)
00083
00084
00085
00086
00087 def call_collision_monitor(arm):
00088 a = CollisionMonitor(arm)
00089 rospy.loginfo('ready')
00090 r = rospy.Rate(10)
00091 while not rospy.is_shutdown():
00092 r.sleep()
00093
00094 def call_get_state_validity():
00095 rospy.init_node('test_get_state_validity')
00096 service_name = 'environment_server_left_arm/get_state_validity'
00097 rospy.loginfo('waiting for ' + service_name)
00098 rospy.wait_for_service(service_name)
00099 check_state_validity_client = rospy.ServiceProxy('environment_server_left_arm/get_state_validity', \
00100 psrv.GetStateValidity, persistent=True)
00101 req = psrv.GetStateValidityRequest()
00102 req.robot_state.joint_state.name = ['l_shoulder_pan_joint', 'l_shoulder_lift_joint',
00103 'l_upper_arm_roll_joint', 'l_elbow_flex_joint',
00104 'l_forearm_roll_joint', 'l_wrist_flex_joint', 'l_wrist_roll_joint']
00105 req.robot_state.joint_state.position = 7 * [0.0]
00106
00107
00108 t = time.time()
00109 for i in range(1000):
00110 req.robot_state.joint_state.header.stamp = rospy.get_rostime()
00111 req.check_collisions = True
00112 res = check_state_validity_client(req)
00113 diff = time.time() - t
00114 time_per_call = diff / 1000
00115 print time_per_call, 'rate', 1 / time_per_call
00116
00117
00118
00119
00120
00121
00122 if __name__ == '__main__':
00123 import sys
00124 call_collision_monitor(sys.argv[1])
00125
00126
00127
00128
00129
00130
00131
00132
00133
00134
00135
00136
00137
00138
00139
00140
00141
00142
00143
00144
00145
00146
00147
00148
00149
00150
00151
00152
00153
00154
00155
00156
00157
00158
00159
00160
00161
00162
00163
00164
00165
00166
00167
00168
00169
00170
00171
00172
00173
00174
00175
00176
00177
00178
00179
00180
00181
00182
00183
00184
00185
00186
00187
00188
00189