topics.py
Go to the documentation of this file.
1 #! /usr/bin/env python
2 
3 import subprocess
4 import threading
5 import time
6 
7 import roslib
8 from roslib import scriptutil
9 
10 class Topics(roslib.message.Message):
11  __slots__ = ('pubtopics', 'subtopics','nodes')
12  def __init__(self, topics):
13  self.pubtopics, self.subtopics, self.nodes = topics
14 
15 def succeed(args):
16  code, msg, val = args
17  if code != 1:
18  raise RosTopicException("remote call failed: %s"%msg)
19  return val
20 
21 
22 class TopicThread(threading.Thread):
23  def __init__(self):
24  threading.Thread.__init__(self)
25 
26  self.callback = None
27 
28  def setCallback(self, callback):
29  self.callback = callback
30 
31  def getTopics(self):
32  def topic_type(t, pub_topics):
33  matches = [t_type for t_name, t_type in pub_topics if t_name == t]
34  if matches:
35  return matches[0]
36  return 'unknown type'
37 
38  master = scriptutil.get_master()
39  topic = None
40 
41  state = succeed(master.getSystemState('/rostopic'))
42  pubs, subs, _ = state
43 
44  pub_topics = succeed(scriptutil.get_master().getPublishedTopics('/rostopic', '/'))
45  pubtopics = [t for t,_ in pubs]
46  pubtopics.sort()
47 
48  subtopics = [t for t,_ in subs]
49  subtopics.sort()
50 
51  nodes = []
52 
53  for s in state:
54  for t, l in s:
55  nodes.extend(l)
56 
57  nodes = set(nodes)
58  nodes = list(nodes)
59  nodes.sort()
60 
61  topics = (tuple(pubtopics), tuple(subtopics), tuple(nodes))
62 
63  return topics
64 
65  def run(self):
66  lasttopics = ()
67  while 1:
68  try:
69  topics = self.getTopics()
70  if topics != lasttopics:
71  lasttopics = topics
72  self.callback(Topics(topics))
73  except:
74  pass
75  time.sleep(3)
76 
77 
def succeed(args)
Definition: topics.py:15
def getTopics(self)
Definition: topics.py:31
def setCallback(self, callback)
Definition: topics.py:28
def __init__(self)
Definition: topics.py:23
def run(self)
Definition: topics.py:65
def __init__(self, topics)
Definition: topics.py:12


rosweb
Author(s): Scott Noob Hassan, Rob Wheeler, Nate Koenig
autogenerated on Mon Jun 10 2019 15:51:19