9 from threading
import Thread
31 Ros Sensor Main Function 36 if topic_type
is not None:
37 data_class = roslib.message.get_message_class(topic_type)
40 rospy.Subscriber(real_topic, data_class, self.
_ros_cb)
42 print(
"Can not resolve topic type of %s" % topic)
44 except ValueError
as err:
45 print(
"ROSSensor Class Error!\n")
51 Ros Sensor Callback Function 55 self.
data = msg.temperature
58 self.
data = msg.current_data
61 self.
data = msg.voltage_data
64 self.
data = msg.power_data
72 except AttributeError
as err:
73 print(
"Invalid topic spec [%s]: %s" % (self.
name, str(err)))
78 Ros Sensor Stop Thread Function 81 self.thread.should_abort_immediately =
True 86 Private Get Topic Type Function 89 val = rospy.get_published_topics()
90 except Exception
as err:
91 print(
"unable to get list of topics from master")
94 matches = [(tpc, t_type)
for tpc, t_type
in val
if tpc == topic
or topic.startswith(tpc +
'/')]
97 tpc, t_type = matches[0]
98 if t_type == roslib.names.ANYTYPE:
99 return None,
None,
None 102 return t_type, tpc, topic[len(tpc):]
104 return None,
None,
None 109 Get Topic Type Function 115 return topic_type, real_topic, rest
117 return None,
None,
None 119 except Exception
as err:
120 print(
"get_topic_type Error!")
def __init__(self, topic)
def main_func(self, topic)
def get_topic_type(self, topic)
def stop_thread_func(self)
def _get_topic_type(cls, topic)