republisher.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 # Copyright 2021 InOrbit, Inc.
4 #
5 # Permission is hereby granted, free of charge, to any person obtaining a copy
6 # of this software and associated documentation files (the "Software"), to deal
7 # in the Software without restriction, including without limitation the rights
8 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 # copies of the Software, and to permit persons to whom the Software is
10 # furnished to do so, subject to the following conditions:
11 #
12 # The above copyright notice and this permission notice shall be included in
13 # all copies or substantial portions of the Software.
14 #
15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
18 # THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21 # THE SOFTWARE.
22 #
23 # ROS to InOrbit republisher node sample
24 #
25 # It uses a YAML-based configuration to map between arbitrary
26 # ROS topics into InOrbit key/value custom data topics.
27 
28 import json
29 import rospy
30 import genpy
31 import yaml
32 import rospkg
33 import os
34 from std_msgs.msg import String
35 from roslib.message import get_message_class
36 from operator import attrgetter
37 from rospy_message_converter import message_converter
38 
39 # Types of mappings allowed
40 MAPPING_TYPE_SINGLE_FIELD = "single_field"
41 MAPPING_TYPE_ARRAY_OF_FIELDS = "array_of_fields"
42 MAPPING_TYPE_JSON_OF_FIELDS = "json_of_fields"
43 MAPPING_TYPE_SERIALIZE = "serialize"
44 
45 # Supported static publisher value sources
46 STATIC_VALUE_FROM_PACKAGE_VERSION = "package_version"
47 STATIC_VALUE_FROM_ENVIRONMENT_VAR = "environment_variable"
48 
49 """
50 Main node entry point.
51 
52 Currently a simple function that does everything.
53 We can later turn it into a class and divide in proper methods
54 as complexity increases.
55 """
56 def main():
57  # Start the ROS node
58  rospy.init_node('inorbit_republisher', anonymous=True, log_level=rospy.INFO)
59 
60  # Read republisher configuration from the 'config_file' or 'config' parameter
61  # TODO(adamantivm) Error handling and schema checking
62  if rospy.has_param('~config_file'):
63  config_file = rospy.get_param('~config_file')
64  rospy.loginfo("Using config from config file: {}".format(config_file))
65  config_yaml = open(config_file, "r")
66  elif rospy.has_param('~config'):
67  config_yaml = rospy.get_param('~config')
68  rospy.loginfo("Using config from parameter server")
69  config = yaml.safe_load(config_yaml)
70 
71  # Go through republisher configurations
72  # For each of them: create a publisher if necessary - only one per InOrbit
73  # custom data field - and a matching subscriber to receive and republish
74  # the desired fields.
75 
76  # Dictionary of publisher instances
77  # These are keyed by *out* topic, in the case of regular input mappings
78  # or by *out*+*input* topic name in the specific case of input topics marked as latched
79  pubs = {}
80 
81  # Dictionary of subscriber instances by topic name
82  subs = {}
83 
84  # In case we want to query ROS package options
85  rospack = rospkg.RosPack()
86 
87  # Set-up ROS topic republishers
88  republishers = config.get('republishers', ())
89  for repub in republishers:
90 
91  # Load subscriber message type
92  msg_class = get_message_class(repub['msg_type'])
93  if msg_class is None:
94  rospy.logwarn('Failed to load msg class for {}'.format(repub['msg_type']))
95  continue
96 
97  # explicit handling of latched topics to overcome timing issues in early subscription phase
98  latched = repub.get("latched", False)
99  publisher_class = LatchPublisher if latched else rospy.Publisher
100 
101  in_topic = repub['topic']
102 
103  # Create publisher for each new seen outgoing topic
104  for mapping in repub['mappings']:
105  out_topic = mapping['out']['topic']
106  # If the input topic is latched, we need a separate instance per input topic
107  pub_key = "{}+{}".format(out_topic, in_topic) if latched else out_topic
108  if not pub_key in pubs:
109  pubs[pub_key] = publisher_class(out_topic, String, queue_size=100)
110  # 'field' mapping is not defined when serializing messages
111  if 'field' in mapping:
112  mapping['attrgetter'] = attrgetter(mapping['field'])
113 
114  # Prepare callback to relay messages through InOrbit custom data
115  def callback(msg, repub=repub, in_topic=in_topic, latched=latched):
116  for mapping in repub['mappings']:
117  key = mapping['out']['key']
118  val = None
119  mapping_type = mapping.get('mapping_type', MAPPING_TYPE_SINGLE_FIELD)
120  topic = mapping['out']['topic']
121 
122  if mapping_type == MAPPING_TYPE_SINGLE_FIELD:
123  # TODO(adamantivm) Exception handling
124  field = extract_value(msg, attrgetter(mapping['field']))
125  val = process_single_field(field, mapping)
126 
127  elif mapping_type == MAPPING_TYPE_ARRAY_OF_FIELDS:
128  field = extract_value(msg, attrgetter(mapping['field']))
129  val = process_array(field, mapping)
130 
131  elif mapping_type == MAPPING_TYPE_JSON_OF_FIELDS:
132  try:
133  val = extract_values_as_dict(msg, mapping)
134  # extract_values_as_dict has the ability to filter messages and
135  # returns None when an element doesn't pass the filter
136  if val:
137  val = json.dumps(val)
138  except TypeError as e:
139  rospy.logwarn("Failed to serialize message: %s", e)
140 
141  elif mapping_type == MAPPING_TYPE_SERIALIZE:
142  try:
143  val = serialize(msg, mapping)
144  if val:
145  val = json.dumps(val)
146  except TypeError as e:
147  rospy.logwarn(f"Failed to serialize message: {e}")
148 
149  if val is not None:
150  pub_key = "{}+{}".format(topic, in_topic) if latched else topic
151  pubs[pub_key].publish(String("{}={}".format(key, val)))
152 
153 
154  # subscribe
155  subs[in_topic] = rospy.Subscriber(in_topic, msg_class, callback)
156 
157  # Set-up static publishers
158  static_publishers = config.get('static_publishers', ())
159  for static_pub_config in static_publishers:
160  key = static_pub_config['out']['key']
161  topic = static_pub_config['out']['topic']
162 
163  # If a literal value is provided, it takes highest precendence
164  val = static_pub_config.get('value')
165 
166  # Otherwise, fetch the value from the specified source
167  if val is None:
168  value_from = static_pub_config.get('value_from')
169  if STATIC_VALUE_FROM_PACKAGE_VERSION in value_from:
170  pkg_name = value_from[STATIC_VALUE_FROM_PACKAGE_VERSION]
171  # TODO(adamantivm) Exception handling
172  pkg_manifest = rospack.get_manifest(pkg_name)
173  val = pkg_manifest.version
174  elif STATIC_VALUE_FROM_ENVIRONMENT_VAR in value_from:
175  var_name = value_from[STATIC_VALUE_FROM_ENVIRONMENT_VAR]
176  val = os.environ.get(var_name)
177 
178  # If there is a value to publish, publish it using once per subscriber
179  if val is not None:
180  pub = LatchPublisher(topic, String, queue_size=100)
181  pub.publish(String("{}={}".format(key, val)))
182 
183  rospy.loginfo('Republisher started')
184  rospy.spin()
185  rospy.loginfo('Republisher shutting down')
186 
187  # Disconnect subs and pubs
188  for sub in subs.values():
189  sub.unregister()
190 
191  for pub in pubs.values():
192  pub.unregister()
193 
194 """
195 Extracts a value from the given message using the provided getter function
196 """
197 def extract_value(msg, getter_fn):
198  # TODO(adamantivm) Graceful handling of missing values to extract
199  # TODO(adamantivm) Allow serialization of complex values
200  val = getter_fn(msg)
201  return val
202 
203 """
204 Extracts several values from a given nested msg field and returns a dictionary of
205 <field, value> elements
206 """
207 def extract_values_as_dict(msg, mapping):
208  values = {}
209  base_getter_fn = attrgetter(mapping['field'])
210  base_value = base_getter_fn(msg)
211  fields = mapping.get('mapping_options', {}).get('fields')
212  for field in fields:
213  getter_fn = attrgetter(field)
214  try:
215  val = getter_fn(base_value)
216  # genpy.Time values can't be serialized into JSON. convert them to seconds
217  # TODO(diegobatt): Catch other datatypes
218  if isinstance(val, genpy.Time):
219  val = val.to_sec()
220  # TODO(diegobatt): Make it possible to use a different key than the field
221  values[field] = val
222  except AttributeError as e:
223  rospy.logwarn('Couldn\'t get attribute %s: %s', field, e)
224  filter_fn = mapping.get('mapping_options', {}).get('filter')
225  return values if not filter_fn or eval(filter_fn)(values) else None
226 
227 """
228 Processes a scalar value before publishing according to mapping options
229  - If a 'filter' function is provided, it returns the value only if the
230  result of passing the field value through the filter function is True,
231  otherwise it returns None
232 """
233 def process_single_field(field_value, mapping):
234  filter_fn = mapping.get('mapping_options', {}).get('filter')
235  return field_value if not filter_fn or eval(filter_fn)(field_value) else None
236 
237 """
238 Processes a given array field from the ROS message and:
239  - Filters it using the 'filter' function (if provided)
240  - For each element, it gets the set of keys defined by array_fields
241  - Returns a key/value with the value being a json string containing
242  the resulting array of objects.
243 
244 Note that the array fields to retrieve should have a String value in order to
245 serialize them properly.
246 """
247 def process_array(field, mapping):
248  # Output array of objects
249  values = {
250  'data': []
251  }
252 
253  filter_fn = mapping.get('mapping_options', {}).get('filter')
254  if filter_fn:
255  # Apply the filter function if any
256  filtered_array = list(filter(eval(filter_fn), field))
257  else:
258  filtered_array = field
259 
260  # Get only the array_fields specified. If none was specified, return the whole array
261  # TODO(FlorGrosso): check that the array fields are Strings and discard those which
262  # are not.
263  if 'fields' in mapping.get('mapping_options', {}):
264  fields = mapping['mapping_options']['fields']
265  values['data'] = [{f: extract_value(elem, attrgetter(f)) for f in fields} for elem in filtered_array]
266  else:
267  values['data'] = filtered_array
268 
269  return json.dumps(values)
270 
271 """
272 Transforms the ROS message to json and:
273  - Filters out fields on the top level of the json only.
274 """
275 def serialize(msg, mapping):
276  # TODO: design filtering support and implement it. It should be possible
277  # to leverage jq to do so. However, this would require adding a new rosdep
278  # rule for `pyjq`: https://pypi.org/project/pyjq/.
279  # filter = mapping.get('mapping_options', {}).get('filter')
280 
281  # Get fields configuration
282  fields = mapping.get('mapping_options', {}).get('fields')
283  # Transform ROS message to dict
284  msg_dict = message_converter.convert_ros_message_to_dictionary(msg)
285 
286  # Shrink output by keeping only selected fields or keys
287  if fields:
288  msg_dict = { k: v for k, v in msg_dict.items() if k in fields}
289 
290  return msg_dict
291 
292 """
293 Wrapper class to allow publishing more than one latched message over the same topic, working
294 around a rospy limitation caused by the use of Publisher singletons.
295 For more details see: https://github.com/ros/ros_comm/issues/146#issuecomment-307507271
296 """
297 class LatchPublisher(rospy.Publisher, rospy.SubscribeListener):
298  def __init__(self, name, data_class, tcp_nodelay=False, headers=None, queue_size=None):
299  super(LatchPublisher, self).__init__(name, data_class=data_class, tcp_nodelay=tcp_nodelay, headers=headers, queue_size=queue_size, subscriber_listener=self, latch=False)
300  self.latch_publisher = super(LatchPublisher, self)
301  # Store all kv messages when publishing.
302  self.message = dict()
303 
304  def publish(self, msg):
305  # Map key to message so they can be all published when a peer subscribes.
306  self.message[msg.data.split('=')[0]] = msg
307  self.latch_publisher.publish(msg)
308 
309  def peer_subscribe(self, resolved_name, publish, publish_single):
310  # Publish all key-values when a peer subscribes.
311  for v in self.message.values():
312  self.latch_publisher.publish(v)
313 
314 if __name__ == '__main__':
315  try:
316  main()
317  except rospy.ROSInterruptException:
318  pass
republisher.LatchPublisher.peer_subscribe
def peer_subscribe(self, resolved_name, publish, publish_single)
Definition: republisher.py:309
republisher.LatchPublisher.message
message
Definition: republisher.py:302
republisher.extract_value
def extract_value(msg, getter_fn)
Definition: republisher.py:197
republisher.extract_values_as_dict
def extract_values_as_dict(msg, mapping)
Definition: republisher.py:207
republisher.process_single_field
def process_single_field(field_value, mapping)
Definition: republisher.py:233
republisher.process_array
def process_array(field, mapping)
Definition: republisher.py:247
republisher.LatchPublisher
Definition: republisher.py:297
republisher.LatchPublisher.__init__
def __init__(self, name, data_class, tcp_nodelay=False, headers=None, queue_size=None)
Definition: republisher.py:298
republisher.LatchPublisher.latch_publisher
latch_publisher
Definition: republisher.py:300
republisher.LatchPublisher.publish
def publish(self, msg)
Definition: republisher.py:304
republisher.main
def main()
Definition: republisher.py:56
republisher.serialize
def serialize(msg, mapping)
Definition: republisher.py:275


inorbit_republisher
Author(s):
autogenerated on Sat Feb 1 2025 03:43:34