behavior_library.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import os
3 from rospkg import RosPack
4 import xml.etree.ElementTree as ET
5 import zlib
6 
7 from flexbe_core.logger import Logger
8 
9 
10 class BehaviorLibrary(object):
11  '''
12  Provides access to all known behaviors.
13  '''
14 
15  def __init__(self):
16  self._rp = RosPack()
17  self._behavior_lib = dict()
18  self.parse_packages()
19 
20  def parse_packages(self):
21  """
22  Parses all ROS packages to update the internal behavior library.
23  """
24  self._behavior_lib = dict()
25  for pkg in self._rp.list():
26  for export in self._rp._load_manifest(pkg).exports:
27  if export.tag == "flexbe_behaviors":
28  self._add_behavior_manifests(self._rp.get_path(pkg), pkg)
29 
30  def _add_behavior_manifests(self, path, pkg=None):
31  """
32  Recursively add all behavior manifests in the given folder to the internal library.
33  If a package name is specified, only manifests referring to this package are added.
34 
35  @type path: string
36  @param path: Path of the folder to be traversed.
37 
38  @type pkg: string
39  @param pkg: Optional name of a package to only add manifests referring to this package.
40  """
41  for entry in os.listdir(path):
42  entry_path = os.path.join(path, entry)
43  if os.path.isdir(entry_path):
44  self._add_behavior_manifests(entry_path, pkg)
45  elif entry.endswith(".xml") and not entry.startswith("#"):
46  m = ET.parse(entry_path).getroot()
47  # structure sanity check
48  if (m.tag != "behavior"
49  or len(m.findall(".//executable")) == 0
50  or m.find("executable").get("package_path") is None
51  or len(m.find("executable").get("package_path").split(".")) < 2):
52  continue
53  e = m.find("executable")
54  if pkg is not None and e.get("package_path").split(".")[0] != pkg:
55  continue # ignore if manifest not in specified package
56  be_id = zlib.adler32(e.get("package_path").encode()) & 0x7fffffff
57  self._behavior_lib[be_id] = {
58  "name": m.get("name"),
59  "package": ".".join(e.get("package_path").split(".")[:-1]),
60  "file": e.get("package_path").split(".")[-1],
61  "class": e.get("class")
62  }
63 
64  def get_behavior(self, be_id):
65  """
66  Provides the library entry corresponding to the given ID.
67 
68  @type be_id: int
69  @param be_id: Behavior ID to look up.
70 
71  @return Corresponding library entry or None if not found.
72  """
73  try:
74  return self._behavior_lib[be_id]
75  except KeyError:
76  Logger.logwarn("Did not find ID %d in libary, updating..." % be_id)
77  self.parse_packages()
78  return self._behavior_lib.get(be_id, None)
79 
80  def find_behavior(self, be_name):
81  """
82  Searches for a behavior with the given name and returns it along with its ID.
83 
84  @type be_name: string
85  @param be_name: Behavior ID to look up.
86 
87  @return Tuple (be_id, be_entry) corresponding to the name or (None, None) if not found.
88  """
89  find = lambda: next((id, be) for (id, be) # noqa: E731 (allow lambda, only used locally here)
90  in self._behavior_lib.items()
91  if be["name"] == be_name)
92  try:
93  return find()
94  except StopIteration:
95  Logger.logwarn("Did not find behavior '%s' in libary, updating..." % be_name)
96  self.parse_packages()
97  try:
98  return find()
99  except StopIteration:
100  Logger.logerr("Still cannot find behavior '%s' in libary after update, giving up!" % be_name)
101  return None, None
102 
103  def count_behaviors(self):
104  """
105  Counts the available behaviors.
106 
107  @return Number of behaviors.
108  """
109  return len(self._behavior_lib)
110 
111  def get_sourcecode_filepath(self, be_id, add_tmp=False):
112  """
113  Constructs a file path to the source code of corresponding to the given ID.
114 
115  @type be_id: int
116  @param be_id: Behavior ID to look up.
117 
118  @type add_tmp: bool
119  @param add_tmp: Append "_tmp" to the file to consider a temporary version.
120 
121  @return String containing the absolute path to the source code file.
122  """
123  be_entry = self.get_behavior(be_id)
124  if be_entry is None:
125  # rely on get_behavior to handle/log missing package
126  return None
127  try:
128  module_path = __import__(be_entry["package"]).__path__[-1]
129  except ImportError:
130  Logger.logwarn("Cannot import behavior package '%s', using 'rospack find' instead" % be_entry["package"])
131  # rp can find it because otherwise, the above entry would not exist
132  module_path = os.path.join(self._rp.get_path(be_entry["package"]), "src", be_entry["package"])
133  filename = be_entry["file"] + '.py' if not add_tmp else '_tmp.py'
134  return os.path.join(module_path, filename)
def get_sourcecode_filepath(self, be_id, add_tmp=False)
def _add_behavior_manifests(self, path, pkg=None)


flexbe_core
Author(s): Philipp Schillinger
autogenerated on Sun Dec 13 2020 04:01:39