3 from hrpsys_ros_bridge.msg
import MotorStates
5 from sensor_msgs.msg
import JointState
7 from std_msgs.msg
import Float32
13 g_effort_publishers = {}
14 g_text_publisher =
None 15 safe_color = (52, 152, 219)
16 warning_color = (230, 126, 34)
17 fatal_color = (231, 76, 60)
20 if num > len(g_publishers):
21 for i
in range(len(g_publishers), num):
22 pub = rospy.Publisher(
'temperature_%02d' % (i), Float32, queue_size=1)
23 g_publishers.append(pub)
25 global g_effort_publishers
27 if name
not in g_effort_publishers:
28 g_effort_publishers[name] = rospy.Publisher(
'effort_%s' % (name), Float32, queue_size=1)
31 global g_publishers, g_text_publisher
33 values = msg.temperature
37 max_temparature_name =
"" 38 for name, val, pub
in zip(names, values, g_publishers):
39 pub.publish(Float32(val))
40 if max_temparature < val:
42 max_temparature_name = name
44 if max_temparature > 70:
46 elif max_temparature > 50:
51 text.fg_color.r = color[0] / 255.0
52 text.fg_color.g = color[1] / 255.0
53 text.fg_color.b = color[2] / 255.0
56 text.text =
"%dC -- (%s)" % (int(max_temparature), max_temparature_name)
57 g_text_publisher.publish(text)
60 global g_effort_publishers, robot_model
64 for val, name
in zip(values, names):
66 candidate_joints = [j
for j
in robot_model.joints
if j.name == name]
68 if candidate_joints[0].limit:
69 limit = candidate_joints[0].limit.effort
73 g_effort_publishers[name].publish(Float32(abs(val/limit)))
75 if __name__ ==
"__main__":
76 rospy.init_node(
"motor_state_temperature_decomposer")
77 robot_model = URDF.from_xml_string(rospy.get_param(
"/robot_description"))
78 g_text_publisher = rospy.Publisher(
"max_temparature_text", OverlayText, queue_size=1)
79 s = rospy.Subscriber(
"/motor_states", MotorStates, motorStatesCallback, queue_size=1)
80 s_joint_states = rospy.Subscriber(
"/joint_states", JointState, jointStatesCallback, queue_size=1)
def motorStatesCallback(msg)
def allocateEffortPublishers(names)
def jointStatesCallback(msg)
def allocatePublishers(num)