sr_robot_state_saver.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Copyright 2019 Shadow Robot Company Ltd.
3 #
4 # This program is free software: you can redistribute it and/or modify it
5 # under the terms of the GNU General Public License as published by the Free
6 # Software Foundation version 2 of the License.
7 #
8 # This program is distributed in the hope that it will be useful, but WITHOUT
9 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 # more details.
12 #
13 # You should have received a copy of the GNU General Public License along
14 # with this program. If not, see <http://www.gnu.org/licenses/>.
15 
16 from sys import argv
17 
18 import rospy
19 from moveit_msgs.srv import SaveRobotStateToWarehouse as SaveState
20 from sensor_msgs.msg import JointState
21 from moveit_msgs.msg import RobotState
22 from sr_arm_commander import SrArmCommander
23 from sr_hand_commander import SrHandCommander
24 from sr_robot_commander import SrRobotCommander
25 from sr_utilities.hand_finder import HandFinder
26 from controller_manager_msgs.srv import ListControllers
27 from control_msgs.msg import JointTrajectoryControllerState
28 from threading import Lock
29 
30 
31 class SrStateSaverUnsafe(object):
32  def __init__(self, name, hand_or_arm="both", side="right", save_target=False):
33 
34  self._save = rospy.ServiceProxy(
35  'save_robot_state', SaveState)
36 
37  self._name = name
38  self._save_target = save_target
39 
40  if name is None or name == '':
41  raise ValueError("Cannot save with empty name.")
42 
43  prefix = ""
44  double_error = []
45  if hand_or_arm == "arm":
46  if side != "bimanual":
47  prefix = "ra" if side == "right" else "la"
48  self._commander = SrArmCommander(side + '_arm')
49  if not self._commander.arm_found():
50  double_error.append("Group " + side + "_arm not found.")
51  else:
52  self._commander = SrRobotCommander('two_arms')
53  elif hand_or_arm == 'hand':
54  if side != "bimanual":
55  prefix = "rh" if side == "right" else "lh"
56  self._commander = SrHandCommander(side + '_hand')
57  else:
58  self._commander = SrRobotCommander('two_hands')
59  elif hand_or_arm == 'both':
60  if side != "bimanual":
61  self._commander = SrRobotCommander(side + '_arm_and_hand')
62  else:
63  self._commander = SrRobotCommander('two_arms_and_hands')
64 
65  if len(double_error) != 0:
66  raise ValueError(" ".join(double_error))
67 
68  self._save_hand = (hand_or_arm == "hand" or hand_or_arm == "both")
69  self._save_arm = (hand_or_arm == "arm" or hand_or_arm == "both")
70  self._save_bimanual = (side == 'bimanual')
71 
72  if save_target:
73  rospy.loginfo("Saving targets instead of current values")
74  self._mutex = Lock()
75  self._target_values = dict()
76  if self._save_hand:
77  self._hand_subscriber = rospy.Subscriber("/" + prefix + "_trajectory_controller/state",
78  JointTrajectoryControllerState, self._target_cb)
79  if self._save_arm:
80  self._arm_subscriber = rospy.Subscriber("/" + prefix + "_trajectory_controller/state",
81  JointTrajectoryControllerState, self._target_cb)
82  if self._save_bimanual:
83  if self._save_hand:
84  self.r_hand_subscriber = rospy.Subscriber("/ra_trajectory_controller/state",
85  JointTrajectoryControllerState, self._target_cb)
86  self.l_hand_subscriber = rospy.Subscriber("/la_trajectory_controller/state",
87  JointTrajectoryControllerState, self._target_cb)
88  if self._save_arm:
89  self.r_arm_subscriber = rospy.Subscriber("/ra_trajectory_controller/state",
90  JointTrajectoryControllerState, self._target_cb)
91  self.l_arm_subscriber = rospy.Subscriber("/la_trajectory_controller/state",
92  JointTrajectoryControllerState, self._target_cb)
93 
94  current_dict = {}
95  rospy.loginfo("Getting current")
96  current_dict = self._commander.get_current_state()
97  robot_name = self._commander.get_robot_name()
98 
99  if self._save_target:
100  rospy.loginfo("Getting targets")
101  waiting_for_targets = True
102  while waiting_for_targets and not rospy.is_shutdown():
103  self._mutex.acquire()
104  waiting_for_targets = False
105  for joint in current_dict:
106  if joint in self._target_values:
107  current_dict[joint] = self._target_values[joint]
108  else:
109  waiting_for_targets = True
110  rospy.loginfo("Still waiting for %s target" % joint)
111  self._mutex.release()
112  if waiting_for_targets:
113  rospy.loginfo(self._target_values)
114  rospy.sleep(1)
115  if rospy.is_shutdown():
116  exit(0)
117 
118  self.save_state(current_dict, robot_name)
119 
120  def _target_cb(self, data):
121  self._mutex.acquire()
122  for n, joint in enumerate(data.joint_names):
123  self._target_values[joint] = data.desired.positions[n]
124  self._mutex.release()
125 
126  def save_state(self, current_dict, robot_name):
127  rs = RobotState()
128  rospy.loginfo(current_dict)
129  rs.joint_state = JointState()
130  rs.joint_state.name = current_dict.keys()
131  rs.joint_state.position = current_dict.values()
132  rospy.logwarn(rs)
133  self._save(self._name, robot_name, rs)
def __init__(self, name, hand_or_arm="both", side="right", save_target=False)


sr_robot_commander
Author(s): Andriy Petlovanyy
autogenerated on Mon Feb 28 2022 23:52:25