converter.py
Go to the documentation of this file.
1 import genmsg
2 import genmsg.msgs
3 from genmsg import MsgSpec, SrvSpec
4 from genpy.generator import compute_pkg_type
5 
6 from ._compat import lru_cache
7 from ._typing import TYPE_CHECKING
8 from .stub_element import (
9  AliasElement,
10  ClassElement,
11  ClassMethodElement,
12  CommentElement,
13  EmptyLinesElement,
14  FieldElement,
15  ImportsElement,
16  ParameterElement,
17 )
18 
19 if TYPE_CHECKING:
20  from typing import Dict, List, Sequence, Tuple
21 
22 
23 _GENMSG_PRIMITIVES = {
24  "byte": "byte",
25  "char": "str",
26  "int8": "int",
27  "int16": "int",
28  "int32": "int",
29  "int64": "int",
30  "uint8": "int",
31  "uint16": "int",
32  "uint32": "int",
33  "uint64": "int",
34  "float32": "float",
35  "float64": "float",
36  "bool": "bool",
37  "string": "str",
38 }
39 
40 _GENPY_DEFINED = {
41  genmsg.HEADER: ("std_msgs.msg._Header.Header", "std_msgs.msg._Header"),
42  genmsg.TIME: ("genpy.Time", "genpy"),
43  genmsg.DURATION: ("genpy.Duration", "genpy"),
44 } # type: Dict[str, Tuple[str, str]]
45 
46 
47 @lru_cache()
48 def _get_genmsg_type(first_party_package, field_type, imports):
49  # type: (str, str, ImportsElement) -> str
50 
51  """Get the corresponding python type for the given ROS type.
52 
53  .. warning::
54  As this method is decorated by lru_cache, make sure that
55  you are using another instance of ImportsElement when
56  calling this method in a different context.
57  (e.g. creating a different module)
58  """
59  primitive = _GENMSG_PRIMITIVES.get(field_type)
60  if primitive is not None:
61  return primitive
62 
63  if field_type.endswith("]"):
64  base_field_type, is_array, array_len = genmsg.msgs.parse_type(field_type)
65  assert is_array
66  # TODO: Handle array_len (use typing.Annotated?)
67 
68  if base_field_type == "uint8":
69  # Special case for uint8[]
70  return "bytes"
71 
72  base_type = _get_genmsg_type(first_party_package, base_field_type, imports)
73  return "typing.List[{}]".format(base_type)
74 
75  special = _GENPY_DEFINED.get(field_type)
76  if special is not None:
77  full_name, module = special
78  imports.add_third_party_module(module, None)
79  return full_name
80 
81  package, type_ = compute_pkg_type(first_party_package, field_type)
82  module = "{}.msg".format(package)
83  imports.add_third_party_module(module, None)
84 
85  return "{}.{}".format(module, type_)
86 
87 
88 def _convert_message_fields(first_party_package, spec, imports):
89  # type: (str, MsgSpec, ImportsElement) -> List[FieldElement]
90  fields = [] # type: List[FieldElement]
91  for field in spec.fields():
92  type_, name = field
93  fields.append(
95  name,
96  _get_genmsg_type(first_party_package, type_, imports),
97  )
98  )
99 
100  return fields
101 
102 
103 def _convert_message_constants(first_party_package, spec, imports):
104  # type: (str, MsgSpec, ImportsElement) -> List[FieldElement]
105  constants = [] # type: List[FieldElement]
106  for constant in spec.constants:
107  constants.append(
108  FieldElement(
109  constant.name,
110  _get_genmsg_type(first_party_package, constant.type, imports),
111  )
112  )
113 
114  return constants
115 
116 
117 def convert_message_class(first_party_package, spec, imports):
118  # type: (str, MsgSpec, ImportsElement) -> ClassElement
119  imports.add_third_party_module("genpy", None)
120  imports.add_system_module("typing", None)
121  imports.add_system_module("types", None)
122 
123  msgclass = ClassElement(spec.short_name, "genpy.Message")
124 
125  # Add private fields
126  msgclass.add(FieldElement("_md5sum", "str"))
127  msgclass.add(FieldElement("_type", "str"))
128  msgclass.add(FieldElement("_has_header", "bool"))
129  msgclass.add(FieldElement("_full_text", "str"))
130  msgclass.add(FieldElement("__slots__", "typing.List[str]"))
131  msgclass.add(FieldElement("_slot_types", "typing.List[str]"))
132 
133  message_constants = _convert_message_constants(first_party_package, spec, imports)
134  if len(message_constants) > 0:
135  msgclass.add(EmptyLinesElement())
136  msgclass.add(CommentElement("Constants"))
137  for c in message_constants:
138  msgclass.add(c)
139 
140  message_fields = _convert_message_fields(first_party_package, spec, imports)
141  if len(message_fields) > 0:
142  msgclass.add(EmptyLinesElement())
143  msgclass.add(CommentElement("Fields"))
144  for f in message_fields:
145  msgclass.add(f)
146 
147  msgclass.add(EmptyLinesElement())
148 
149  # Add __init__ method that accepts message fields as the parameter
150  init_method = ClassMethodElement("__init__", "None")
151  for field in message_fields:
152  init_method.add_parameter(
153  ParameterElement(field.name, field.type, has_default=True)
154  )
155 
156  # NOTE: Emit *args and **kwds in order to align with the actual implementation
157  init_method.add_parameter(ParameterElement("*args", "typing.Any"))
158  init_method.add_parameter(ParameterElement("**kwds", "typing.Any"))
159 
160  msgclass.add(init_method)
161 
162  # Add private methods except pattern methods like `_get_struct_I`
163  msgclass.add(ClassMethodElement("_get_types", "typing.List[str]"))
164 
165  # Add public methods
166  msgclass.add(
168  "serialize", "None", [ParameterElement("buff", "typing.BinaryIO")]
169  )
170  )
171  msgclass.add(
173  "deserialize", spec.short_name, [ParameterElement("str", "bytes")]
174  )
175  )
176  msgclass.add(
178  "serialize_numpy",
179  "None",
180  [
181  ParameterElement("buff", "typing.BinaryIO"),
182  ParameterElement("numpy", "types.ModuleType"),
183  ],
184  )
185  )
186  msgclass.add(
188  "deserialize_numpy",
189  spec.short_name,
190  [
191  ParameterElement("str", "bytes"),
192  ParameterElement("numpy", "types.ModuleType"),
193  ],
194  )
195  )
196 
197  return msgclass
198 
199 
200 def convert_service_class(spec, request_class, response_class):
201  # type: (SrvSpec, ClassElement, ClassElement) -> ClassElement
202  srvclass = ClassElement(spec.short_name, "object")
203  srvclass.add(FieldElement("_type", "str"))
204  srvclass.add(FieldElement("_md5sum", "str"))
205  srvclass.add(AliasElement("_request_class", request_class.name))
206  srvclass.add(AliasElement("_response_class", response_class.name))
207 
208  return srvclass
209 
210 
211 def convert_genpy_init(modules):
212  # type: (Sequence[str]) -> ImportsElement
213  imports = ImportsElement()
214  for m in modules:
215  imports.add_third_party_module("._{}".format(m), "*")
216 
217  return imports
genmypy.converter.convert_service_class
def convert_service_class(spec, request_class, response_class)
Definition: converter.py:200
genmypy.converter.convert_message_class
def convert_message_class(first_party_package, spec, imports)
Definition: converter.py:117
genmypy.stub_element.FieldElement
Definition: stub_element.py:144
genmypy.stub_element.ClassElement
Definition: stub_element.py:121
genmypy.converter._get_genmsg_type
def _get_genmsg_type(first_party_package, field_type, imports)
Definition: converter.py:48
genmypy.stub_element.CommentElement
Definition: stub_element.py:110
genmypy.converter._convert_message_constants
def _convert_message_constants(first_party_package, spec, imports)
Definition: converter.py:103
genmypy.stub_element.AliasElement
Definition: stub_element.py:155
genmypy.converter.convert_genpy_init
def convert_genpy_init(modules)
Definition: converter.py:211
genmypy._compat.lru_cache
def lru_cache()
Definition: _compat.py:14
genmypy.stub_element.EmptyLinesElement
Definition: stub_element.py:98
genmypy.stub_element.ParameterElement
Definition: stub_element.py:214
genmypy.stub_element.ImportsElement
Definition: stub_element.py:62
genmypy.stub_element.ClassMethodElement
Definition: stub_element.py:204
genmypy.converter._convert_message_fields
def _convert_message_fields(first_party_package, spec, imports)
Definition: converter.py:88


genmypy
Author(s): Yuki Igarashi, Tamaki Nishino
autogenerated on Mon Apr 10 2023 03:01:12