custom_mappings.py
Go to the documentation of this file.
1 # Copyright 2020 Roboception GmbH
2 #
3 # Redistribution and use in source and binary forms, with or without
4 # modification, are permitted provided that the following conditions are met:
5 #
6 # * Redistributions of source code must retain the above copyright
7 # notice, this list of conditions and the following disclaimer.
8 #
9 # * Redistributions in binary form must reproduce the above copyright
10 # notice, this list of conditions and the following disclaimer in the
11 # documentation and/or other materials provided with the distribution.
12 #
13 # * Neither the name of the {copyright_holder} nor the names of its
14 # contributors may be used to endorse or promote products derived from
15 # this software without specific prior written permission.
16 #
17 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21 # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28 
29 import copy
30 
31 
32 def map_ros2api(msg, rostype):
33  """ Map a ROS msg dict to API """
34  if not isinstance(msg, dict):
35  raise ValueError("msg is not a dict: {}".format(msg))
36  if rostype == 'shape_msgs/Plane':
37  c = msg['coef']
38  return {'normal': {'x': c[0], 'y': c[1], 'z': c[2]}, 'distance': c[3]}
39  elif rostype == 'rc_reason_msgs/CalibrateBasePlaneRequest':
40  new_msg = copy.deepcopy(msg)
41  if msg['plane_estimation_method'] == 'STEREO':
42  new_msg['stereo'] = {'plane_preference': msg['stereo_plane_preference']}
43  del new_msg['stereo_plane_preference']
44  return new_msg
45  elif rostype == 'rc_reason_msgs/LoadCarrierModel':
46  new_msg = copy.deepcopy(msg)
47  # map PoseStamped to pose and pose_frame
48  new_msg['pose'] = msg['pose']['pose']
49  new_msg['pose_frame'] = msg['pose']['header']['frame_id']
50  return new_msg
51  elif rostype in ['rc_reason_msgs/DetectLoadCarriersRequest',
52  'rc_reason_msgs/DetectFillingLevelRequest',
53  'rc_reason_msgs/DetectTagsRequest',
54  'rc_reason_msgs/SilhouetteMatchDetectObjectRequest',
55  'rc_reason_msgs/CadMatchDetectObjectRequest']:
56  new_msg = copy.deepcopy(msg)
57  # don't send robot pose if not external
58  if msg['pose_frame'] != 'external':
59  del new_msg['robot_pose']
60  return new_msg
61  elif rostype in ['rc_reason_msgs/ComputeGraspsRequest']:
62  new_msg = {k: msg[k] for k in ['pose_frame', 'item_models', 'suction_surface_length', 'suction_surface_width']}
63  # only send robot pose if external
64  if msg['pose_frame'] == 'external':
65  new_msg['robot_pose'] = msg['robot_pose']
66  if msg['region_of_interest_id']:
67  new_msg['region_of_interest_id'] = msg['region_of_interest_id']
68  if msg['load_carrier_id']:
69  new_msg['load_carrier_id'] = msg['load_carrier_id']
70  if all([msg['load_carrier_compartment']['box'][k] > 0 for k in ['x', 'y', 'z']]):
71  new_msg['load_carrier_compartment'] = msg['load_carrier_compartment']
72  if msg['collision_detection']['gripper_id']:
73  new_msg['collision_detection'] = msg['collision_detection']
74  return new_msg
75  elif rostype in ['rc_reason_msgs/DetectItemsRequest']:
76  new_msg = {k: msg[k] for k in ['pose_frame', 'item_models']}
77  # only send robot pose if external
78  if msg['pose_frame'] == 'external':
79  new_msg['robot_pose'] = msg['robot_pose']
80  if msg['region_of_interest_id']:
81  new_msg['region_of_interest_id'] = msg['region_of_interest_id']
82  if msg['load_carrier_id']:
83  new_msg['load_carrier_id'] = msg['load_carrier_id']
84  if all([msg['load_carrier_compartment']['box'][k] > 0 for k in ['x', 'y', 'z']]):
85  new_msg['load_carrier_compartment'] = msg['load_carrier_compartment']
86  return new_msg
87  elif rostype == 'rc_reason_msgs/RegionOfInterest3D':
88  new_msg = {'id': msg['id'], 'pose': msg['pose']['pose'], 'pose_frame': msg['pose']['header']['frame_id']}
89  d = msg['primitive']['dimensions']
90  if msg['primitive']['type'] == 1:
91  new_msg['type'] = 'BOX'
92  new_msg['box'] = {'x': d[0], 'y': d[1], 'z': d[2]}
93  elif msg['primitive']['type'] == 2:
94  new_msg['type'] = 'SPHERE'
95  new_msg['sphere'] = {'radius': d[0]}
96  return new_msg
97  elif rostype in ['rc_reason_msgs/SetLoadCarrierRequest']:
98  new_msg = copy.deepcopy(msg)
99  # don't send pose (as prior) if pose_frame is not set
100  if not msg['load_carrier']['pose_frame']:
101  del new_msg['load_carrier']['pose']
102  return new_msg
103 
104  # no mapping required, return auto-generated one
105  return msg
106 
107 
108 def _to_ros_pose_stamped(msg, timestamp=None):
109  if 'pose_frame' not in msg:
110  return msg
111  new_msg = {}
112  for key in msg:
113  if key not in ['pose', 'pose_frame', 'timestamp']:
114  new_msg[key] = msg[key]
115  header = {'frame_id': msg['pose_frame']}
116  stamp = msg.get('timestamp', timestamp)
117  if stamp is not None:
118  header['stamp'] = stamp
119  new_msg['pose'] = {'pose': msg['pose'], 'header': header}
120  return new_msg
121 
122 
123 def map_api2ros(msg, rostype):
124  """ Map an API msg to ROS """
125  if not isinstance(msg, dict):
126  raise ValueError("msg is not a dict: {}".format(msg))
127  if rostype == 'rc_reason_msgs/DetectedTag':
128  new_msg = {}
129  header = {'stamp': msg['timestamp'], 'frame_id': msg['pose_frame']}
130  new_msg['header'] = header
131  new_msg['tag'] = {'id': msg['id'], 'size': msg['size']}
132  new_msg['pose'] = {'pose': msg['pose'], 'header': header}
133  new_msg['instance_id'] = msg['instance_id']
134  return new_msg
135  elif rostype == 'shape_msgs/Plane':
136  return {'coef': [msg['normal']['x'], msg['normal']['y'], msg['normal']['z'], msg['distance']]}
137  elif rostype in ['rc_reason_msgs/GetBasePlaneCalibrationResponse', 'rc_reason_msgs/CalibrateBasePlaneResponse']:
138  new_msg = copy.deepcopy(msg)
139  new_msg['pose_frame'] = msg['plane']['pose_frame']
140  del new_msg['plane']['pose_frame']
141  return new_msg
142  elif rostype in ['rc_reason_msgs/LoadCarrier',
143  'rc_reason_msgs/LoadCarrierModel'
144  ]:
145  return _to_ros_pose_stamped(msg)
146  elif rostype in ['rc_reason_msgs/ComputeGraspsResponse',
147  'rc_reason_msgs/DetectFillingLevelResponse',
148  'rc_reason_msgs/DetectLoadCarriersResponse',
149  'rc_reason_msgs/DetectItemsResponse',
150  'rc_reason_msgs/CadMatchDetectObjectResponse'
151  ]:
152  new_msg = {k: v for k, v in msg.items() if k not in ['load_carriers']}
153  new_msg['load_carriers'] = []
154  for lc in msg['load_carriers']:
155  new_msg['load_carriers'].append(_to_ros_pose_stamped(lc, msg['timestamp']))
156  return new_msg
157  elif rostype in ['rc_reason_msgs/SuctionGrasp']:
158  new_msg = {k: v for k, v in msg.items() if k not in ['pose', 'pose_frame', 'timestamp', 'type']}
159  header = {'stamp': msg['timestamp'], 'frame_id': msg['pose_frame']}
160  new_msg['pose'] = {'pose': msg['pose'], 'header': header}
161  return new_msg
162  elif rostype in ['rc_reason_msgs/Item']:
163  return _to_ros_pose_stamped(msg)
164  elif rostype == 'rc_reason_msgs/RegionOfInterest3D':
165  new_msg = {'id': msg['id']}
166  header = {'frame_id': msg['pose_frame']}
167  new_msg['pose'] = {'pose': msg['pose'], 'header': header}
168  if msg['type'] == 'BOX':
169  new_msg['primitive'] = {'type': 1, 'dimensions': [msg['box']['x'], msg['box']['y'], msg['box']['z']]}
170  elif msg['type'] == 'SPHERE':
171  new_msg['primitive'] = {'type': 2, 'dimensions': [msg['sphere']['radius']]}
172  return new_msg
173  elif rostype == 'rc_reason_msgs/SilhouetteMatchDetectObjectResponse':
174  new_msg = {'timestamp': msg['timestamp']}
175  new_msg['matches'] = msg['instances']
176  new_msg['grasps'] = msg.get('grasps', [])
177  new_msg['load_carriers'] = []
178  for lc in msg['load_carriers']:
179  new_msg['load_carriers'].append(_to_ros_pose_stamped(lc, msg['timestamp']))
180  new_msg['object_id'] = msg['object_id']
181  return new_msg
182  elif rostype == 'rc_reason_msgs/Match':
183  new_msg = _to_ros_pose_stamped(msg)
184  # rename silhouettematch API fields to common ROS Match fields
185  if 'object_id' in new_msg:
186  new_msg['template_id'] = new_msg['object_id']
187  del new_msg['object_id']
188  if 'id' in new_msg:
189  if 'uuid' not in new_msg:
190  new_msg['uuid'] = msg['id']
191  del new_msg['id']
192  # if score is not available, set to -1
193  if 'score' not in new_msg:
194  new_msg['score'] = -1.0
195  return new_msg
196  elif rostype == 'rc_reason_msgs/Grasp':
197  new_msg = _to_ros_pose_stamped(msg)
198  # rename silhouettematch API fields to common ROS Grasp fields
199  if 'instance_uuid' in new_msg:
200  new_msg['match_uuid'] = msg['instance_uuid']
201  del new_msg['instance_uuid']
202 
203  return new_msg
204 
205  # no mapping required, return auto-generated one
206  return msg
rc_reason_clients.custom_mappings.map_ros2api
def map_ros2api(msg, rostype)
Definition: custom_mappings.py:32
rc_reason_clients.custom_mappings.map_api2ros
def map_api2ros(msg, rostype)
Definition: custom_mappings.py:123
rc_reason_clients.custom_mappings._to_ros_pose_stamped
def _to_ros_pose_stamped(msg, timestamp=None)
Definition: custom_mappings.py:108


rc_reason_clients
Author(s):
autogenerated on Sat Nov 23 2024 03:19:24