generator.py
Go to the documentation of this file.
1 import errno
2 import os
3 
4 import genmsg
5 import genmsg.msgs
6 from genmsg import MsgContext, MsgSpec, SrvSpec, gentools
7 from genpy.generator import compute_outfile_name, make_python_safe
8 
9 from ._typing import TYPE_CHECKING
10 from .converter import convert_genpy_init, convert_message_class, convert_service_class
11 from .stub_element import ClassElement, EmptyLinesElement, ImportsElement, ModuleElement
12 
13 if TYPE_CHECKING:
14  from typing import Callable, Dict, Iterator, List, TypeVar
15 
16  SpecType = TypeVar("SpecType", MsgSpec, SrvSpec)
17  LoaderType = Callable[[MsgContext, str, str], SpecType]
18  GeneratorType = Callable[[str, SpecType], Iterator[str]]
19  ModuleFinderType = Callable[[str], Iterator[str]]
20 
21 
22 GENMSG_EXT_LENGTH = len(genmsg.EXT_MSG)
23 
24 
25 def generate_message_stub(package, spec):
26  # type: (str, MsgSpec) -> Iterator[str]
27  pyspec = make_python_safe(spec)
28  imports = ImportsElement()
29  genclass = convert_message_class(package, pyspec, imports)
30 
31  module = ModuleElement()
32  module.add_element(imports)
33  module.add_element(EmptyLinesElement())
34  module.add_element(genclass)
35 
36  for line in module.generate():
37  yield line
38 
39 
40 def generate_service_stub(package, spec):
41  # type: (str, SrvSpec) -> Iterator[str]
42  imports = ImportsElement()
43  message_classes = [] # type: List[ClassElement]
44 
45  for message_spec in (spec.request, spec.response):
46  pyspec = make_python_safe(message_spec)
47  message_classes.append(convert_message_class(package, pyspec, imports))
48 
49  module = ModuleElement()
50  module.add_element(imports)
51  module.add_element(EmptyLinesElement())
52  module.add_element(message_classes[0]) # request class
53  module.add_element(EmptyLinesElement())
54  module.add_element(message_classes[1]) # response class
55  module.add_element(EmptyLinesElement())
56  module.add_element(
57  convert_service_class(spec, message_classes[0], message_classes[1])
58  ) # service class
59 
60  for line in module.generate():
61  yield line
62 
63 
64 def _make_dirs(path):
65  # type: (str) -> None
66  # NOTE: This script will be executed in parallel.
67  # So atomic operations are needed to avoid a race condition
68  # e.g. Calling `makedir` after `exist` check might cause a race condition
69  try:
70  os.makedirs(path)
71  except OSError as e:
72  if e.errno != errno.EEXIST:
73  raise
74 
75 
76 def _compute_outfile_path(outdir, target_path):
77  # type: (str, str) -> str
78  outpath = compute_outfile_name(
79  outdir, os.path.basename(target_path), genmsg.EXT_MSG
80  ) # type: str
81 
82  # Replace suffix .py with .pyi
83  assert outpath.endswith(".py")
84  outpath = outpath + "i"
85  return outpath
86 
87 
89  msg_context, # type: MsgContext
90  loader, # type: LoaderType[SpecType]
91  package, # type: str
92  target_path, # type: str
93  search_paths, # type: Dict[str, List[str]]
94 ):
95  # type: (...) -> SpecType
96  filename = os.path.basename(target_path)
97  full_type = gentools.compute_full_type_name(package, filename)
98  spec = loader(msg_context, target_path, full_type)
99 
100  # NOTE: We don't need to load dependency as we just emit the type information
101  # The following lines are intended to just verify the message.
102  try:
103  genmsg.msg_loader.load_depends(msg_context, spec, search_paths)
104  except genmsg.InvalidMsgSpec as e:
105  raise RuntimeError("Failed to load message: {}".format(e))
106 
107  return spec
108 
109 
111  # type: (str) -> Iterator[str]
112  for path in os.listdir(package_dir):
113  if not os.path.isfile(os.path.join(package_dir, path)):
114  continue
115 
116  if path.endswith(genmsg.EXT_MSG) or path.endswith(genmsg.EXT_SRV):
117  yield path[:-GENMSG_EXT_LENGTH]
118 
119 
120 def _find_all_genpy_modules_py(package_dir):
121  # type: (str) -> Iterator[str]
122  for path in os.listdir(package_dir):
123  if path == "__init__.py" or not os.path.isfile(os.path.join(package_dir, path)):
124  continue
125 
126  if path.startswith("_") and path.endswith(".py"):
127  yield path[1:-3]
128 
129 
131  msg_context, # type: MsgContext
132  loader, # type: LoaderType[SpecType]
133  generator, # type: GeneratorType[SpecType]
134  package, # type: str
135  target_path, # type: str
136  outdir, # type: str
137  search_paths, # type: Dict[str, List[str]]
138 ):
139  # type: (...) -> str
140  spec = _load_spec(msg_context, loader, package, target_path, search_paths)
141  outpath = _compute_outfile_path(outdir, target_path)
142 
143  _make_dirs(outdir)
144  with open(outpath, "w") as f:
145  for line in generator(package, spec):
146  f.write("{}\n".format(line))
147 
148  return outpath
149 
150 
151 GenPyModuleFinders = {
152  "py": _find_all_genpy_modules_py,
153  "genmsg": _find_all_genpy_modules_genmsg,
154 } # type: Dict[str, ModuleFinderType]
155 
156 
157 def generate_module_stub(package_dir, outdir, module_finder):
158  # type: (str, str, str) -> str
159  module_finder_impl = GenPyModuleFinders.get(module_finder)
160  if module_finder_impl is None:
161  raise ValueError("Unknown module finder: {}".format(module_finder))
162  genmsg_modules = sorted(module_finder_impl(package_dir))
163  imports = convert_genpy_init(genmsg_modules)
164 
165  _make_dirs(outdir)
166  outpath = os.path.join(outdir, "__init__.pyi")
167  with open(outpath, "w") as f:
168  for line in imports.generate(0):
169  f.write("{}\n".format(line))
170 
171  return outpath
genmypy.generator.generate_service_stub
def generate_service_stub(package, spec)
Definition: generator.py:40
genmypy.converter.convert_service_class
def convert_service_class(spec, request_class, response_class)
Definition: converter.py:200
genmypy.generator._make_dirs
def _make_dirs(path)
Definition: generator.py:64
genmypy.generator.generate_module_stub
def generate_module_stub(package_dir, outdir, module_finder)
Definition: generator.py:157
genmypy.generator.generate_stub
def generate_stub(msg_context, loader, generator, package, target_path, outdir, search_paths)
Definition: generator.py:130
genmypy.generator._find_all_genpy_modules_genmsg
def _find_all_genpy_modules_genmsg(package_dir)
Definition: generator.py:110
genmypy.generator._load_spec
def _load_spec(msg_context, loader, package, target_path, search_paths)
Definition: generator.py:88
genmypy.converter.convert_message_class
def convert_message_class(first_party_package, spec, imports)
Definition: converter.py:117
genmypy.generator.generate_message_stub
def generate_message_stub(package, spec)
Definition: generator.py:25
genmypy.converter.convert_genpy_init
def convert_genpy_init(modules)
Definition: converter.py:211
genmypy.generator._find_all_genpy_modules_py
def _find_all_genpy_modules_py(package_dir)
Definition: generator.py:120
genmypy.stub_element.EmptyLinesElement
Definition: stub_element.py:98
genmypy.generator._compute_outfile_path
def _compute_outfile_path(outdir, target_path)
Definition: generator.py:76
genmypy.stub_element.ImportsElement
Definition: stub_element.py:62
genmypy.stub_element.ModuleElement
Definition: stub_element.py:22


genmypy
Author(s): Yuki Igarashi, Tamaki Nishino
autogenerated on Mon Apr 10 2023 03:01:12