nodeutil.py
Go to the documentation of this file.
1 #! /usr/bin/env python
2 
3 import os
4 import socket
5 import xmlrpclib
6 
7 PKG = 'webui' # this package name
8 import roslib; roslib.load_manifest(PKG)
9 import rospy
10 from roslib import scriptutil
11 from roslib.names import make_caller_id
12 
13 def _remote_call(args):
14  code, msg, val = args
15  if code != 1:
16  raise Exception("remote call failed: %s"%msg)
17  return val
18 
19 _caller_apis = {}
20 def get_api_uri(master, node_name, caller_id):
21  caller_api = _caller_apis.get(caller_id, None)
22  if not caller_api:
23  try:
24  code, msg, caller_api = master.lookupNode(caller_id, node_name)
25  if code != 1:
26  return None
27  else:
28  _caller_apis[caller_id] = caller_api
29  except socket.error:
30  raise Exception("Unable to communicate with master!")
31  return caller_api
32 
33 
34 def node_info(node):
35  caller_id = make_caller_id('nodeutil-%s'%os.getpid())
36  node_name = node #scriptutil.script_resolve_name(caller_id, node)
37  master = scriptutil.get_master()
38 
39  # go through the master system state first
40  try:
41  state = _remote_call(master.getSystemState(caller_id))
42  except socket.error:
43  raise Exception("Unable to communicate with master!")
44 
45  pubs = [t for t, l in state[0] if node_name in l]
46  subs = [t for t, l in state[1] if node_name in l]
47  srvs = [t for t, l in state[2] if node_name in l]
48 
49  results = {
50  "publications": pubs,
51  "subscriptions": subs,
52  "services": srvs
53  }
54  node_api = get_api_uri(master, node_name, caller_id)
55  if not node_api:
56  results["error"] = "cannot contact [%s]: unknown node"%node_name
57 
58  return results
59 
60 def topic_info(topic):
61  caller_id = make_caller_id('nodeutil-%s'%os.getpid())
62  master = scriptutil.get_master()
63 
64  # go through the master system state first
65  try:
66  state = _remote_call(master.getSystemState(caller_id))
67  except socket.error:
68  raise IOException("Unable to communicate with master!")
69 
70  pubs, subs, _ = state
71  publists = [publist for t, publist in pubs if t == topic]
72  sublists = [sublist for t, sublist in subs if t == topic]
73 
74  #pub_topics = _succeed(master.getPublishedTopics(caller_id, '/'))
75 
76  if publists == []:
77  return {
78  "error": "This topic does not appear to be published yet."
79  }
80  elif sublists == []:
81  return {
82  "publishers": publists[0],
83  "subscribers": []
84  }
85  else:
86  return {
87  "publishers": publists[0],
88  "subscribers": sublists[0]
89  }
90 
91 def main():
92  print node_info("/fake_wifi_ddwrt")
93  print "----"
94  print topic_info("/ddwrt/accesspoint")
95 
96 
97 if __name__ == "__main__":
98  main()
def main()
Definition: nodeutil.py:91
def get_api_uri(master, node_name, caller_id)
Definition: nodeutil.py:20
def topic_info(topic)
Definition: nodeutil.py:60
def node_info(node)
Definition: nodeutil.py:34
def _remote_call(args)
Definition: nodeutil.py:13


webui
Author(s): Scott Hassan
autogenerated on Mon Jun 10 2019 15:51:24