common.py
Go to the documentation of this file.
1 import os
2 import re
3 import math
4 import qt_gui.plugin
5 import python_qt_binding
6 
7 # ROS version specific imports
8 from .constants import _MICROSTRAIN_ROS_VERISON
9 if _MICROSTRAIN_ROS_VERISON == 1:
10  import rospy
11 elif _MICROSTRAIN_ROS_VERISON == 2:
12  import rclpy
13  import rclpy.exceptions
14  import rclpy.time
15  import rclpy.timer
16  from rclpy.callback_groups import ReentrantCallbackGroup
17 
18 from .constants import _PACKAGE_RESOURCE_DIR
19 from .constants import _NODE_NAME_ENV_KEY, _DEFAULT_NODE_NAME
20 from .constants import _DEFAULT_MESSAGE_TIMEOUT, _DEFAULT_POLL_INTERVAL, _DEFAULT_VAL, _DEFAULT_STR
21 from .constants import _ICON_GREY_UNCHECKED_SMALL, _ICON_GREEN_UNCHECKED_SMALL
22 
24 
25  def __init__(self, node, node_name):
26  # Save the node name to the class so we can use it later
27  self._node = node
28  self._node_name = node_name
29 
30  def call_service(self, service_path, service_type, **kwargs):
31  # Determine the service name
32  service_name = os.path.join(self._node_name, service_path)
33 
34  # Different functionality between ROS 1 and ROS 2
35  if _MICROSTRAIN_ROS_VERISON == 1:
36  # Call the service, and return the result
37  try:
38  service_func = rospy.ServiceProxy(service_name, service_type)
39  response = service_func(**kwargs)
40  if hasattr(response, 'success'):
41  if response.success:
42  return response
43  else:
44  return _DEFAULT_VAL
45  else:
46  return response
47  except Exception:
48  return _DEFAULT_VAL
49  elif _MICROSTRAIN_ROS_VERISON == 2:
50  client = self._node.create_client(service_type, service_name)
51 
52  # Copy the kwargs into the service request
53  req = service_type.Request()
54  for key, value in kwargs.items():
55  if hasattr(req, key):
56  setattr(req, key, value)
57 
58  # Call the service and return the result
59  try:
60  return client.call_async(req)
61  except Exception:
62  return _DEFAULT_VAL
63 
64 
65 class Monitor(object):
66 
67  def __init__(self, node, node_name, path, message_type, message_timeout = _DEFAULT_MESSAGE_TIMEOUT):
68  # Set up some common variables
69  self._node = node
70  self._node_name = node_name
71  self._monitor_path = path
73  self._message_timeout = message_timeout
74  self._current_message = message_type()
75 
76  def stop(self):
78 
79  def _ros_time(self):
80  if _MICROSTRAIN_ROS_VERISON == 1:
81  return rospy.Time.now().to_sec()
82  elif _MICROSTRAIN_ROS_VERISON == 2:
83  return self._node.get_clock().now().to_msg().sec
84  else:
85  return 0
86 
87  def _euler_from_quaternion(self, quaternion):
88  # Formula copied and adapted from https://en.wikipedia.org/wiki/Conversion_between_quaternions_and_Euler_angles
89  q_x = quaternion[0]
90  q_y = quaternion[1]
91  q_z = quaternion[2]
92  q_w = quaternion[3]
93 
94  sinr_cosp = 2 * (q_w * q_x + q_y * q_z)
95  cosr_cosp = 1- 2 * (q_x * q_x + q_y * q_y)
96  roll = math.atan2(sinr_cosp, cosr_cosp)
97 
98  sinp = 2 * (q_w * q_y - q_z * q_x)
99  if math.fabs(sinp) >= 1:
100  pitch = math.copysign(math.pi / 2, sinp)
101  else:
102  pitch = math.asin(sinp)
103 
104  siny_cosp = 2 * (q_w * q_z + q_x * q_y)
105  cosy_cosp = 1 - 2 * (q_y * q_y + q_z * q_z)
106  yaw = math.atan2(siny_cosp, cosy_cosp)
107 
108  return (roll, pitch, yaw)
109 
110  @property
112  time_elapsed_since_last_message = self._ros_time() - self._last_message_received_time
113  return time_elapsed_since_last_message > self._message_timeout
114 
115  def _get_val(self, val, default = _DEFAULT_VAL):
116  if self._message_timed_out:
117  return default
118  else:
119  return val
120 
121  def _get_string(self, val, default_val = _DEFAULT_VAL, default_str = _DEFAULT_STR):
122  if val is default_val or self._get_val(val, default_val) is default_val:
123  return default_str
124  else:
125  return str(val)
126 
127  def _get_string_units(self, val, units, default_val=_DEFAULT_VAL, default_str=_DEFAULT_STR):
128  if val is default_val or self._get_val(val, default_val) is default_val:
129  return default_str
130  else:
131  return "%.6f %s" % (val, units)
132 
133  def _get_string_hex(self, val, default_val = _DEFAULT_VAL, default_str = _DEFAULT_STR):
134  if (val is default_val or self._get_val(val, default_val) is default_val):
135  return default_str
136  else:
137  return hex(val)
138 
139  def _get_small_boolean_icon_string(self, val, default_val = _DEFAULT_VAL):
140  if val is default_val or self._get_val(val, default_val) is default_val:
141  return _ICON_GREY_UNCHECKED_SMALL
142  else:
143  if val:
144  return _ICON_GREEN_UNCHECKED_SMALL
145  else:
146  return _ICON_GREY_UNCHECKED_SMALL
147 
148  def _get_boolean_string(self, val, default_val = _DEFAULT_VAL):
149  if val is default_val or self._get_val(val, default_val) is default_val:
150  return "Unknown"
151  else:
152  if val:
153  return "Yes"
154  else:
155  return "No"
156 
157 
159 
160  def __init__(self, node, node_name, path, service_type, message_timeout=_DEFAULT_MESSAGE_TIMEOUT, callback=_DEFAULT_VAL, poll_interval=_DEFAULT_POLL_INTERVAL):
161  # Initialize the parent class
162  if _MICROSTRAIN_ROS_VERISON == 1:
163  super(ServiceMonitor, self).__init__(node, node_name, path, service_type._response_class, message_timeout=message_timeout)
164  self._current_response = service_type._response_class()
165  elif _MICROSTRAIN_ROS_VERISON == 2:
166  super(ServiceMonitor, self).__init__(node, node_name, path, service_type.Response, message_timeout=message_timeout)
167  self._current_future = rclpy.Future()
168  self._current_future._done = False
169  self._current_response = service_type.Response()
170 
171  # Save some important information about the services
172  self._service_type = service_type
173 
174  # Initialize a client that we will use to communicate with the services
176 
177  # Start polling the service in a timer
178  if callback is _DEFAULT_VAL:
179  callback = self._default_callback
180 
181  if _MICROSTRAIN_ROS_VERISON == 1:
182  self._poll_timer = rospy.Timer(rospy.Duration(poll_interval), callback)
183  elif _MICROSTRAIN_ROS_VERISON == 2:
184  self._poll_timer = self._node.create_timer(poll_interval, callback)
185 
186  # Apparently ros timers will wait until executing the timer for the first time, so just execute it once here
187  callback()
188 
189  def stop(self):
190  super(ServiceMonitor, self).stop()
191  if _MICROSTRAIN_ROS_VERISON == 1:
192  self._poll_timer.shutdown()
193  elif _MICROSTRAIN_ROS_VERISON == 2:
194  self._poll_timer.destroy()
195  del self._poll_timer
196 
197  def _default_callback(self, event=None):
198  if _MICROSTRAIN_ROS_VERISON == 1:
199  # Get the result of the service call
200  new_message = self._microstrain_services.call_service(self._monitor_path, self._service_type)
201 
202  # If the service returned an actual value, save the time of success
203  if new_message is not _DEFAULT_VAL:
204  self._current_response = new_message
206  elif _MICROSTRAIN_ROS_VERISON == 2:
207  self._current_future = self._microstrain_services.call_service(self._monitor_path, self._service_type)
208 
209  @property
210  def _current_message(self):
211  # In ROS2, we need to get the result of the service call here if it is present
212  if _MICROSTRAIN_ROS_VERISON == 2:
213  if self._current_future.done():
214  response = self._current_future.result()
215  if hasattr(response, 'success'):
216  if response.success:
218  self._current_response = response
219  else:
221  self._current_response = response
222 
223  # Now that the current response has been updated for ROS2, and was already updated for ROS, return it
224  return self._current_response
225 
226  @_current_message.setter
227  def _current_message(self, current_message):
228  self._current_response = current_message
229 
230 
232 
233  def __init__(self, node, node_name, path, message_type, message_timeout=_DEFAULT_MESSAGE_TIMEOUT, callback=_DEFAULT_VAL):
234  # Initialize the parent class
235  super(SubscriberMonitor, self).__init__(node, node_name, path, message_type, message_timeout=message_timeout)
236 
237  # Save th topic name
238  self._topic = os.path.join(self._node_name, self._monitor_path)
239 
240  # Set up the subscriber
241  if callback is _DEFAULT_VAL:
242  callback = self._default_callback
243 
244  if _MICROSTRAIN_ROS_VERISON == 1:
245  self._subscriber = rospy.Subscriber(self._topic, message_type, callback)
246  elif _MICROSTRAIN_ROS_VERISON == 2:
247  self._subscriber = self._node.create_subscription(message_type, self._topic, callback, 10)
248 
249  def stop(self):
250  super(SubscriberMonitor, self).stop()
251  if _MICROSTRAIN_ROS_VERISON == 1:
252  self._subscriber.unregister()
253  elif _MICROSTRAIN_ROS_VERISON == 2:
254  self._subscriber.destroy()
255  del self._subscriber
256 
257  def _default_callback(self, message):
258  self._current_message = message
microstrain_inertial_rqt.utils.common.ServiceMonitor._current_response
_current_response
Definition: common.py:164
microstrain_inertial_rqt.utils.common.SubscriberMonitor
Definition: common.py:231
microstrain_inertial_rqt.utils.common.Monitor._ros_time
def _ros_time(self)
Definition: common.py:79
microstrain_inertial_rqt.utils.common.ServiceMonitor._current_future
_current_future
Definition: common.py:167
microstrain_inertial_rqt.utils.common.Monitor
Definition: common.py:65
microstrain_inertial_rqt.utils.common.Monitor.stop
def stop(self)
Definition: common.py:76
microstrain_inertial_rqt.utils.common.ServiceMonitor
Definition: common.py:158
microstrain_inertial_rqt.utils.common.ServiceMonitor._default_callback
def _default_callback(self, event=None)
Definition: common.py:197
microstrain_inertial_rqt.utils.common.Monitor._current_message
_current_message
Definition: common.py:74
microstrain_inertial_rqt.utils.common.Monitor._monitor_path
_monitor_path
Definition: common.py:71
microstrain_inertial_rqt.utils.common.MicrostrainServices
Definition: common.py:23
microstrain_inertial_rqt.utils.common.SubscriberMonitor._topic
_topic
Definition: common.py:238
microstrain_inertial_rqt.utils.common.Monitor._last_message_received_time
_last_message_received_time
Definition: common.py:72
microstrain_inertial_rqt.utils.common.ServiceMonitor._service_type
_service_type
Definition: common.py:172
microstrain_inertial_rqt.utils.common.Monitor._get_string
def _get_string(self, val, default_val=_DEFAULT_VAL, default_str=_DEFAULT_STR)
Definition: common.py:121
microstrain_inertial_rqt.utils.common.Monitor._get_string_units
def _get_string_units(self, val, units, default_val=_DEFAULT_VAL, default_str=_DEFAULT_STR)
Definition: common.py:127
microstrain_inertial_rqt.utils.common.Monitor._message_timeout
_message_timeout
Definition: common.py:73
microstrain_inertial_rqt.utils.common.MicrostrainServices._node_name
_node_name
Definition: common.py:28
microstrain_inertial_rqt.utils.common.Monitor._message_timed_out
def _message_timed_out(self)
Definition: common.py:111
microstrain_inertial_rqt.utils.common.Monitor._euler_from_quaternion
def _euler_from_quaternion(self, quaternion)
Definition: common.py:87
microstrain_inertial_rqt.utils.common.MicrostrainServices.call_service
def call_service(self, service_path, service_type, **kwargs)
Definition: common.py:30
microstrain_inertial_rqt.utils.common.ServiceMonitor._microstrain_services
_microstrain_services
Definition: common.py:175
microstrain_inertial_rqt.utils.common.MicrostrainServices._node
_node
Definition: common.py:27
microstrain_inertial_rqt.utils.common.Monitor._get_boolean_string
def _get_boolean_string(self, val, default_val=_DEFAULT_VAL)
Definition: common.py:148
microstrain_inertial_rqt.utils.common.Monitor.__init__
def __init__(self, node, node_name, path, message_type, message_timeout=_DEFAULT_MESSAGE_TIMEOUT)
Definition: common.py:67
microstrain_inertial_rqt.utils.common.ServiceMonitor._poll_timer
_poll_timer
Definition: common.py:182
microstrain_inertial_rqt.utils.common.ServiceMonitor.stop
def stop(self)
Definition: common.py:189
microstrain_inertial_rqt.utils.common.SubscriberMonitor.stop
def stop(self)
Definition: common.py:249
microstrain_inertial_rqt.utils.common.SubscriberMonitor.__init__
def __init__(self, node, node_name, path, message_type, message_timeout=_DEFAULT_MESSAGE_TIMEOUT, callback=_DEFAULT_VAL)
Definition: common.py:233
microstrain_inertial_rqt.utils.common.Monitor._node
_node
Definition: common.py:69
microstrain_inertial_rqt.utils.common.SubscriberMonitor._subscriber
_subscriber
Definition: common.py:245
qt_gui::plugin
microstrain_inertial_rqt.utils.common.ServiceMonitor.__init__
def __init__(self, node, node_name, path, service_type, message_timeout=_DEFAULT_MESSAGE_TIMEOUT, callback=_DEFAULT_VAL, poll_interval=_DEFAULT_POLL_INTERVAL)
Definition: common.py:160
microstrain_inertial_rqt.utils.common.Monitor._node_name
_node_name
Definition: common.py:70
microstrain_inertial_rqt.utils.common.Monitor._get_string_hex
def _get_string_hex(self, val, default_val=_DEFAULT_VAL, default_str=_DEFAULT_STR)
Definition: common.py:133
microstrain_inertial_rqt.utils.common.Monitor._get_val
def _get_val(self, val, default=_DEFAULT_VAL)
Definition: common.py:115
microstrain_inertial_rqt.utils.common.Monitor._get_small_boolean_icon_string
def _get_small_boolean_icon_string(self, val, default_val=_DEFAULT_VAL)
Definition: common.py:139
microstrain_inertial_rqt.utils.common.SubscriberMonitor._default_callback
def _default_callback(self, message)
Definition: common.py:257
microstrain_inertial_rqt.utils.common.MicrostrainServices.__init__
def __init__(self, node, node_name)
Definition: common.py:25


microstrain_inertial_rqt
Author(s): Parker Hannifin Corp
autogenerated on Fri Apr 18 2025 02:52:41