Package node_manager_fkie :: Module common
[frames] | no frames]

Source Code for Module node_manager_fkie.common

  1  import os 
  2  import rospy 
  3   
  4  MANIFEST_FILE = 'manifest.xml' 
  5  PACKAGE_FILE = 'package.xml' 
  6   
  7  try: 
  8      from catkin_pkg.package import parse_package 
  9      CATKIN_SUPPORTED = True 
 10  except ImportError: 
 11      CATKIN_SUPPORTED = False 
 12   
 13  PACKAGE_CACHE = {} 
 14   
 15   
16 -def utf8(s, errors='replace'):
17 if isinstance(s, (str, buffer)): 18 return unicode(s, "utf-8", errors=errors) 19 elif not isinstance(s, unicode): 20 return unicode(str(s)) 21 return s
22 23
24 -def get_ros_home():
25 ''' 26 Returns the ROS HOME depending on ROS distribution API. 27 @return: ROS HOME path 28 @rtype: C{str} 29 ''' 30 try: 31 import rospkg.distro 32 distro = rospkg.distro.current_distro_codename() 33 if distro in ['electric', 'diamondback', 'cturtle']: 34 import roslib.rosenv 35 return roslib.rosenv.get_ros_home() 36 else: 37 from rospkg import get_ros_home 38 return get_ros_home() 39 except: 40 from roslib import rosenv 41 return rosenv.get_ros_home()
42 43
44 -def masteruri_from_ros():
45 ''' 46 Returns the master URI depending on ROS distribution API. 47 @return: ROS master URI 48 @rtype: C{str} 49 ''' 50 try: 51 import rospkg.distro 52 distro = rospkg.distro.current_distro_codename() 53 if distro in ['electric', 'diamondback', 'cturtle']: 54 import roslib.rosenv 55 return roslib.rosenv.get_master_uri() 56 else: 57 import rosgraph 58 return rosgraph.rosenv.get_master_uri() 59 except: 60 return os.environ['ROS_MASTER_URI']
61 62
63 -def lnamespace(name):
64 ns_list = name.split(rospy.names.SEP) 65 if not ns_list[0]: 66 return rospy.names.SEP, name.lstrip(rospy.names.SEP) 67 if len(ns_list) == 1: 68 return ns_list[0], '' 69 return ns_list[0], name.replace('%s%s' % (ns_list[0], rospy.names.SEP), '')
70 71
72 -def namespace(name):
73 result = os.path.dirname(name) 74 if not result.endswith(rospy.names.SEP): 75 result += rospy.names.SEP 76 return result
77 78
79 -def normns(name):
80 sep = rospy.names.SEP 81 result = name.replace('%s%s' % (sep, sep), sep) 82 return result
83 84
85 -def get_rosparam(param, masteruri):
86 if masteruri: 87 try: 88 master = rospy.msproxy.MasterProxy(masteruri) 89 return master[param] # MasterProxy does all the magic for us 90 except KeyError: 91 return {}
92 93
94 -def delete_rosparam(param, masteruri):
95 if masteruri: 96 try: 97 master = rospy.msproxy.MasterProxy(masteruri) 98 del master[param] # MasterProxy does all the magic for us 99 except Exception: 100 pass
101 102
103 -def get_packages(path):
104 result = {} 105 if os.path.isdir(path): 106 fileList = os.listdir(path) 107 if MANIFEST_FILE in fileList: 108 return {os.path.basename(path): path} 109 if CATKIN_SUPPORTED and PACKAGE_FILE in fileList: 110 try: 111 pkg = parse_package(path) 112 return {pkg.name: path} 113 except: 114 pass 115 return {} 116 for f in fileList: 117 ret = get_packages(os.path.join(path, f)) 118 result = dict(ret.items() + result.items()) 119 return result
120 121
122 -def resolve_paths(text):
123 ''' 124 Searches in text for $(find ...) statements and replaces it by the package path. 125 @return: text with replaced statements. 126 ''' 127 result = text 128 startIndex = text.find('$(') 129 if startIndex > -1: 130 endIndex = text.find(')', startIndex + 2) 131 script = text[startIndex + 2: endIndex].split() 132 if len(script) == 2 and (script[0] == 'find'): 133 pkg = '' 134 try: 135 from rospkg import RosPack 136 rp = RosPack() 137 pkg = rp.get_path(script[1]) 138 except: 139 import roslib 140 pkg = roslib.packages.get_pkg_dir(script[1]) 141 return result.replace(text[startIndex: endIndex + 1], pkg) 142 return result
143 144
145 -def to_url(path):
146 ''' 147 Searches the package name for given path and create an URL starting with pkg:// 148 ''' 149 result = path 150 pkg, pth = package_name(os.path.dirname(path)) 151 if pkg is not None: 152 result = "pkg://%s%s" % (pkg, path.replace(pth, '')) 153 return result
154 155
156 -def package_name(path):
157 ''' 158 Returns for given directory a tuple of package name and package path or None values. 159 The results are cached! 160 @rtype: C{(name, path)} 161 ''' 162 if not (path is None) and path and path != os.path.sep and os.path.isdir(path): 163 if path in PACKAGE_CACHE: 164 return PACKAGE_CACHE[path] 165 package = os.path.basename(path) 166 fileList = os.listdir(path) 167 for f in fileList: 168 if f == MANIFEST_FILE: 169 PACKAGE_CACHE[path] = (package, path) 170 return (package, path) 171 if CATKIN_SUPPORTED and f == PACKAGE_FILE: 172 try: 173 pkg = parse_package(os.path.join(path, f)) 174 PACKAGE_CACHE[path] = (pkg.name, path) 175 return (pkg.name, path) 176 except: 177 return (None, None) 178 PACKAGE_CACHE[path] = package_name(os.path.dirname(path)) 179 return PACKAGE_CACHE[path] 180 return (None, None)
181 182
183 -def is_package(file_list):
184 return (MANIFEST_FILE in file_list or (CATKIN_SUPPORTED and PACKAGE_FILE in file_list))
185