class_ros_sensor.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 
4 """
5  PHM Ros Sensor Class
6 """
7 
8 import rospy
9 from threading import Thread
10 
11 import roslib.message
12 import roslib.names
13 
14 
15 class ROSSensor:
16  """
17  Ros Sensor Class
18  """
19  def __init__(self, topic):
20  self.name = topic
21  self.error = None
22  self.data = None
23  self.topic_message_type = None
24 
25  self.thread = Thread(target=self.main_func, args=(self.name,))
26  self.thread.start()
27 
28 
29  def main_func(self, topic):
30  """
31  Ros Sensor Main Function
32  """
33  try:
34  topic_type, real_topic, fields = self.get_topic_type(topic)
35 
36  if topic_type is not None:
37  data_class = roslib.message.get_message_class(topic_type)
38  self.topic_message_type = str(data_class)
39 
40  rospy.Subscriber(real_topic, data_class, self._ros_cb)
41  else:
42  print("Can not resolve topic type of %s" % topic)
43 
44  except ValueError as err:
45  print("ROSSensor Class Error!\n")
46  print(err)
47 
48 
49  def _ros_cb(self, msg):
50  """
51  Ros Sensor Callback Function
52  """
53  try:
54  if "sensor_msgs.msg._Temperature.Temperature" in self.topic_message_type:
55  self.data = msg.temperature
56 
57  elif "agv_msgs.msg._CurrentData.CurrentData" in self.topic_message_type:
58  self.data = msg.current_data
59 
60  elif "agv_msgs.msg._VoltageData.VoltageData" in self.topic_message_type:
61  self.data = msg.voltage_data
62 
63  elif "agv_msgs.msg._PowerData.PowerData" in self.topic_message_type:
64  self.data = msg.power_data
65 
66  elif "std_msgs" in self.topic_message_type:
67  self.data = msg.data
68 
69  else:
70  self.data = None
71 
72  except AttributeError as err:
73  print("Invalid topic spec [%s]: %s" % (self.name, str(err)))
74 
75 
76  def stop_thread_func(self):
77  """
78  Ros Sensor Stop Thread Function
79  """
80  self.thread.join()
81  self.thread.should_abort_immediately = True
82 
83  @classmethod
84  def _get_topic_type(cls, topic):
85  """
86  Private Get Topic Type Function
87  """
88  try:
89  val = rospy.get_published_topics()
90  except Exception as err:
91  print("unable to get list of topics from master")
92  print(err)
93 
94  matches = [(tpc, t_type) for tpc, t_type in val if tpc == topic or topic.startswith(tpc + '/')]
95 
96  if matches:
97  tpc, t_type = matches[0]
98  if t_type == roslib.names.ANYTYPE:
99  return None, None, None
100  if t_type == topic:
101  return t_type, None
102  return t_type, tpc, topic[len(tpc):]
103  else:
104  return None, None, None
105 
106 
107  def get_topic_type(self, topic):
108  """
109  Get Topic Type Function
110  """
111  try:
112  topic_type, real_topic, rest = self._get_topic_type(topic)
113 
114  if topic_type:
115  return topic_type, real_topic, rest
116  else:
117  return None, None, None
118 
119  except Exception as err:
120  print("get_topic_type Error!")
121  print(err)


phm_start
Author(s):
autogenerated on Thu Aug 13 2020 16:41:50