3 from genmsg
import MsgSpec, SrvSpec
4 from genpy.generator
import compute_pkg_type
6 from ._compat
import lru_cache
7 from ._typing
import TYPE_CHECKING
8 from .stub_element
import (
20 from typing
import Dict, List, Sequence, Tuple
23 _GENMSG_PRIMITIVES = {
41 genmsg.HEADER: (
"std_msgs.msg._Header.Header",
"std_msgs.msg._Header"),
42 genmsg.TIME: (
"genpy.Time",
"genpy"),
43 genmsg.DURATION: (
"genpy.Duration",
"genpy"),
51 """Get the corresponding python type for the given ROS type. 54 As this method is decorated by lru_cache, make sure that 55 you are using another instance of ImportsElement when 56 calling this method in a different context. 57 (e.g. creating a different module) 59 primitive = _GENMSG_PRIMITIVES.get(field_type)
60 if primitive
is not None:
63 if field_type.endswith(
"]"):
64 base_field_type, is_array, array_len = genmsg.msgs.parse_type(field_type)
68 if base_field_type ==
"uint8":
73 return "typing.List[{}]".format(base_type)
75 special = _GENPY_DEFINED.get(field_type)
76 if special
is not None:
77 full_name, module = special
78 imports.add_third_party_module(module,
None)
81 package, type_ = compute_pkg_type(first_party_package, field_type)
82 module =
"{}.msg".format(package)
83 imports.add_third_party_module(module,
None)
85 return "{}.{}".format(module, type_)
91 for field
in spec.fields():
106 for constant
in spec.constants:
119 imports.add_third_party_module(
"genpy",
None)
120 imports.add_system_module(
"typing",
None)
121 imports.add_system_module(
"types",
None)
123 msgclass = ClassElement(spec.short_name,
"genpy.Message")
126 msgclass.add(FieldElement(
"_md5sum",
"str"))
127 msgclass.add(FieldElement(
"_type",
"str"))
128 msgclass.add(FieldElement(
"_has_header",
"bool"))
129 msgclass.add(FieldElement(
"_full_text",
"str"))
130 msgclass.add(FieldElement(
"__slots__",
"typing.List[str]"))
131 msgclass.add(FieldElement(
"_slot_types",
"typing.List[str]"))
134 if len(message_constants) > 0:
135 msgclass.add(EmptyLinesElement())
136 msgclass.add(CommentElement(
"Constants"))
137 for c
in message_constants:
141 if len(message_fields) > 0:
142 msgclass.add(EmptyLinesElement())
143 msgclass.add(CommentElement(
"Fields"))
144 for f
in message_fields:
147 msgclass.add(EmptyLinesElement())
150 init_method = ClassMethodElement(
"__init__",
"None")
151 for field
in message_fields:
152 init_method.add_parameter(
153 ParameterElement(field.name, field.type, has_default=
True)
157 init_method.add_parameter(ParameterElement(
"*args",
"typing.Any"))
158 init_method.add_parameter(ParameterElement(
"**kwds",
"typing.Any"))
160 msgclass.add(init_method)
163 msgclass.add(ClassMethodElement(
"_get_types",
"typing.List[str]"))
168 "serialize",
"None", [ParameterElement(
"buff",
"typing.BinaryIO")]
173 "deserialize", spec.short_name, [ParameterElement(
"str",
"bytes")]
181 ParameterElement(
"buff",
"typing.BinaryIO"),
182 ParameterElement(
"numpy",
"types.ModuleType"),
191 ParameterElement(
"str",
"bytes"),
192 ParameterElement(
"numpy",
"types.ModuleType"),
202 srvclass = ClassElement(spec.short_name,
"object")
203 srvclass.add(FieldElement(
"_type",
"str"))
204 srvclass.add(FieldElement(
"_md5sum",
"str"))
205 srvclass.add(AliasElement(
"_request_class", request_class.name))
206 srvclass.add(AliasElement(
"_response_class", response_class.name))
213 imports = ImportsElement()
215 imports.add_third_party_module(
"._{}".format(m),
"*")
def convert_message_class(first_party_package, spec, imports)
def _get_genmsg_type(first_party_package, field_type, imports)
def convert_genpy_init(modules)
def _convert_message_constants(first_party_package, spec, imports)
def convert_service_class(spec, request_class, response_class)
def _convert_message_fields(first_party_package, spec, imports)