34 from std_msgs.msg
import String
35 from roslib.message
import get_message_class
36 from operator
import attrgetter
37 from rospy_message_converter
import message_converter
40 MAPPING_TYPE_SINGLE_FIELD =
"single_field"
41 MAPPING_TYPE_ARRAY_OF_FIELDS =
"array_of_fields"
42 MAPPING_TYPE_JSON_OF_FIELDS =
"json_of_fields"
43 MAPPING_TYPE_SERIALIZE =
"serialize"
46 STATIC_VALUE_FROM_PACKAGE_VERSION =
"package_version"
47 STATIC_VALUE_FROM_ENVIRONMENT_VAR =
"environment_variable"
50 Main node entry point.
52 Currently a simple function that does everything.
53 We can later turn it into a class and divide in proper methods
54 as complexity increases.
58 rospy.init_node(
'inorbit_republisher', anonymous=
True, log_level=rospy.INFO)
62 if rospy.has_param(
'~config_file'):
63 config_file = rospy.get_param(
'~config_file')
64 rospy.loginfo(
"Using config from config file: {}".format(config_file))
65 config_yaml = open(config_file,
"r")
66 elif rospy.has_param(
'~config'):
67 config_yaml = rospy.get_param(
'~config')
68 rospy.loginfo(
"Using config from parameter server")
69 config = yaml.safe_load(config_yaml)
85 rospack = rospkg.RosPack()
88 republishers = config.get(
'republishers', ())
89 for repub
in republishers:
92 msg_class = get_message_class(repub[
'msg_type'])
94 rospy.logwarn(
'Failed to load msg class for {}'.format(repub[
'msg_type']))
98 latched = repub.get(
"latched",
False)
99 publisher_class = LatchPublisher
if latched
else rospy.Publisher
101 in_topic = repub[
'topic']
104 for mapping
in repub[
'mappings']:
105 out_topic = mapping[
'out'][
'topic']
107 pub_key =
"{}+{}".format(out_topic, in_topic)
if latched
else out_topic
108 if not pub_key
in pubs:
109 pubs[pub_key] = publisher_class(out_topic, String, queue_size=100)
111 if 'field' in mapping:
112 mapping[
'attrgetter'] = attrgetter(mapping[
'field'])
115 def callback(msg, repub=repub, in_topic=in_topic, latched=latched):
116 for mapping
in repub[
'mappings']:
117 key = mapping[
'out'][
'key']
119 mapping_type = mapping.get(
'mapping_type', MAPPING_TYPE_SINGLE_FIELD)
120 topic = mapping[
'out'][
'topic']
122 if mapping_type == MAPPING_TYPE_SINGLE_FIELD:
127 elif mapping_type == MAPPING_TYPE_ARRAY_OF_FIELDS:
131 elif mapping_type == MAPPING_TYPE_JSON_OF_FIELDS:
137 val = json.dumps(val)
138 except TypeError
as e:
139 rospy.logwarn(
"Failed to serialize message: %s", e)
141 elif mapping_type == MAPPING_TYPE_SERIALIZE:
145 val = json.dumps(val)
146 except TypeError
as e:
147 rospy.logwarn(f
"Failed to serialize message: {e}")
150 pub_key =
"{}+{}".format(topic, in_topic)
if latched
else topic
151 pubs[pub_key].publish(String(
"{}={}".format(key, val)))
155 subs[in_topic] = rospy.Subscriber(in_topic, msg_class, callback)
158 static_publishers = config.get(
'static_publishers', ())
159 for static_pub_config
in static_publishers:
160 key = static_pub_config[
'out'][
'key']
161 topic = static_pub_config[
'out'][
'topic']
164 val = static_pub_config.get(
'value')
168 value_from = static_pub_config.get(
'value_from')
169 if STATIC_VALUE_FROM_PACKAGE_VERSION
in value_from:
170 pkg_name = value_from[STATIC_VALUE_FROM_PACKAGE_VERSION]
172 pkg_manifest = rospack.get_manifest(pkg_name)
173 val = pkg_manifest.version
174 elif STATIC_VALUE_FROM_ENVIRONMENT_VAR
in value_from:
175 var_name = value_from[STATIC_VALUE_FROM_ENVIRONMENT_VAR]
176 val = os.environ.get(var_name)
181 pub.publish(String(
"{}={}".format(key, val)))
183 rospy.loginfo(
'Republisher started')
185 rospy.loginfo(
'Republisher shutting down')
188 for sub
in subs.values():
191 for pub
in pubs.values():
195 Extracts a value from the given message using the provided getter function
204 Extracts several values from a given nested msg field and returns a dictionary of
205 <field, value> elements
209 base_getter_fn = attrgetter(mapping[
'field'])
210 base_value = base_getter_fn(msg)
211 fields = mapping.get(
'mapping_options', {}).get(
'fields')
213 getter_fn = attrgetter(field)
215 val = getter_fn(base_value)
218 if isinstance(val, genpy.Time):
222 except AttributeError
as e:
223 rospy.logwarn(
'Couldn\'t get attribute %s: %s', field, e)
224 filter_fn = mapping.get(
'mapping_options', {}).get(
'filter')
225 return values
if not filter_fn
or eval(filter_fn)(values)
else None
228 Processes a scalar value before publishing according to mapping options
229 - If a 'filter' function is provided, it returns the value only if the
230 result of passing the field value through the filter function is True,
231 otherwise it returns None
234 filter_fn = mapping.get(
'mapping_options', {}).get(
'filter')
235 return field_value
if not filter_fn
or eval(filter_fn)(field_value)
else None
238 Processes a given array field from the ROS message and:
239 - Filters it using the 'filter' function (if provided)
240 - For each element, it gets the set of keys defined by array_fields
241 - Returns a key/value with the value being a json string containing
242 the resulting array of objects.
244 Note that the array fields to retrieve should have a String value in order to
245 serialize them properly.
253 filter_fn = mapping.get(
'mapping_options', {}).get(
'filter')
256 filtered_array = list(filter(eval(filter_fn), field))
258 filtered_array = field
263 if 'fields' in mapping.get(
'mapping_options', {}):
264 fields = mapping[
'mapping_options'][
'fields']
265 values[
'data'] = [{f:
extract_value(elem, attrgetter(f))
for f
in fields}
for elem
in filtered_array]
267 values[
'data'] = filtered_array
269 return json.dumps(values)
272 Transforms the ROS message to json and:
273 - Filters out fields on the top level of the json only.
282 fields = mapping.get(
'mapping_options', {}).get(
'fields')
284 msg_dict = message_converter.convert_ros_message_to_dictionary(msg)
288 msg_dict = { k: v
for k, v
in msg_dict.items()
if k
in fields}
293 Wrapper class to allow publishing more than one latched message over the same topic, working
294 around a rospy limitation caused by the use of Publisher singletons.
295 For more details see: https://github.com/ros/ros_comm/issues/146#issuecomment-307507271
298 def __init__(self, name, data_class, tcp_nodelay=False, headers=None, queue_size=None):
299 super(LatchPublisher, self).
__init__(name, data_class=data_class, tcp_nodelay=tcp_nodelay, headers=headers, queue_size=queue_size, subscriber_listener=self, latch=
False)
306 self.
message[msg.data.split(
'=')[0]] = msg
311 for v
in self.
message.values():
314 if __name__ ==
'__main__':
317 except rospy.ROSInterruptException: