transform_logger.py
Go to the documentation of this file.
1 import copy
2 import influxdb
3 import numpy as np
4 import rospy
5 import sys
6 import threading
7 import time
8 import yaml
9 
10 import tf2_ros
11 
12 from influxdb_store.utils import timestamp_to_influxdb_time
13 from tf2_msgs.srv import FrameGraph
14 
15 
16 class TransformLogger(object):
17  def __init__(self):
18  self.tf_buffer = tf2_ros.Buffer()
19  self.tf_listener = tf2_ros.TransformListener(self.tf_buffer)
20 
21  rospy.wait_for_service("~tf2_frames")
22  self.get_frames = rospy.ServiceProxy("~tf2_frames", FrameGraph)
23  self.graph = {}
24 
25  trial_count = 0
26  while len(self.graph) == 0:
27  try:
28  if trial_count > 1200:
29  sys.exit(1)
30  self.graph = yaml.load(self.get_frames().frame_yaml)
31  except Exception as e:
32  rospy.logerr_throttle(
33  60.0, 'Failed to get graph: {}'.format(e))
34  rospy.sleep(0.1)
35  trial_count = trial_count + 1
36 
37  host = rospy.get_param('~host', 'localhost')
38  port = rospy.get_param('~port', 8086)
39  database = rospy.get_param('~database', 'test')
40  self.client = influxdb.InfluxDBClient(
41  host=host, port=port, database=database)
42  self.client.create_database(database)
43 
44  self.lock = threading.Lock()
45  self.graph_lock = threading.Lock()
46  self.query = []
47 
48  self.update_duration = 1.0 / rospy.get_param('~update_frequency', 30.0)
49  self.update_timer = rospy.Timer(
50  rospy.Duration(self.update_duration), self._update_cb)
51 
52  self.duration = rospy.get_param('~duration', 3.0)
53  self.write_timer = rospy.Timer(
54  rospy.Duration(self.duration), self._timer_cb)
55 
56  graph_duration = rospy.get_param('~graph_duration', 60.0)
57  self.graph_timer = rospy.Timer(
58  rospy.Duration(graph_duration), self._graph_cb)
59 
60  def _update_cb(self, event):
61  # if event.last_real:
62  # timestamp = event.last_real
63  # else:
64  # timestamp = event.current_real - self.update_rate
65  timestamp = event.current_real
66  influx_time = timestamp_to_influxdb_time(timestamp)
67  trans_x_fields = {}
68  trans_y_fields = {}
69  trans_z_fields = {}
70  rot_x_fields = {}
71  rot_y_fields = {}
72  rot_z_fields = {}
73  rot_w_fields = {}
74  rot_theta_fields = {}
75 
76  with self.graph_lock:
77  graph = copy.deepcopy(self.graph)
78 
79  for child_frame_id, _ in graph.items():
80  try:
81  transform_stamped = self.tf_buffer.lookup_transform(
82  self.frame_id, child_frame_id, rospy.Time(0))
83  except tf2_ros.ExtrapolationException as e:
84  rospy.logerr_throttle(
85  60.0, 'tf2_ros.ExtrapolationException: {}'.format(e))
86  continue
87  except tf2_ros.ConnectivityException as e:
88  rospy.logerr_throttle(
89  60.0, 'tf2_ros.ConnectivityException: {}'.format(e))
90  continue
91  except tf2_ros.LookupException as e:
92  rospy.logerr_throttle(
93  60.0, 'tf2_ros.LookupException: {}'.format(e))
94  continue
95  translation = transform_stamped.transform.translation
96  rotation = transform_stamped.transform.rotation
97  theta = 2 * np.arctan(
98  np.linalg.norm(
99  [rotation.x, rotation.y, rotation.z]) / rotation.w)
100 
101  trans_x_fields[child_frame_id] = translation.x
102  trans_y_fields[child_frame_id] = translation.y
103  trans_z_fields[child_frame_id] = translation.z
104  rot_x_fields[child_frame_id] = rotation.x
105  rot_y_fields[child_frame_id] = rotation.y
106  rot_z_fields[child_frame_id] = rotation.z
107  rot_w_fields[child_frame_id] = rotation.w
108  rot_theta_fields[child_frame_id] = theta
109 
110  with self.lock:
111  if len(trans_x_fields) > 0:
112  self.query.append({
113  "measurement": self.measurement_name,
114  "tags": {
115  "type": "translation",
116  "field": "x",
117  "frame_id": self.frame_id
118  },
119  "time": influx_time,
120  "fields": trans_x_fields
121  })
122 
123  if len(trans_y_fields) > 0:
124  self.query.append({
125  "measurement": self.measurement_name,
126  "tags": {
127  "type": "translation",
128  "field": "y",
129  "frame_id": self.frame_id
130  },
131  "time": influx_time,
132  "fields": trans_y_fields
133  })
134 
135  if len(trans_z_fields) > 0:
136  self.query.append({
137  "measurement": self.measurement_name,
138  "tags": {
139  "type": "translation",
140  "field": "z",
141  "frame_id": self.frame_id
142  },
143  "time": influx_time,
144  "fields": trans_z_fields
145  })
146 
147  if len(rot_x_fields) > 0:
148  self.query.append({
149  "measurement": self.measurement_name,
150  "tags": {
151  "type": "rotation",
152  "field": "x",
153  "frame_id": self.frame_id
154  },
155  "time": influx_time,
156  "fields": rot_x_fields
157  })
158 
159  if len(rot_y_fields) > 0:
160  self.query.append({
161  "measurement": self.measurement_name,
162  "tags": {
163  "type": "rotation",
164  "field": "y",
165  "frame_id": self.frame_id
166  },
167  "time": influx_time,
168  "fields": rot_y_fields
169  })
170 
171  if len(rot_z_fields) > 0:
172  self.query.append({
173  "measurement": self.measurement_name,
174  "tags": {
175  "type": "rotation",
176  "field": "z",
177  "frame_id": self.frame_id
178  },
179  "time": influx_time,
180  "fields": rot_z_fields
181  })
182 
183  if len(rot_w_fields) > 0:
184  self.query.append({
185  "measurement": self.measurement_name,
186  "tags": {
187  "type": "rotation",
188  "field": "w",
189  "frame_id": self.frame_id
190  },
191  "time": influx_time,
192  "fields": rot_w_fields
193  })
194 
195  if len(rot_theta_fields) > 0:
196  self.query.append({
197  "measurement": self.measurement_name,
198  "tags": {
199  "type": "rotation",
200  "field": "theta",
201  "frame_id": self.frame_id
202  },
203  "time": influx_time,
204  "fields": rot_theta_fields
205  })
206 
207  def _timer_cb(self, event):
208  start_time = time.time() * 1000
209  with self.lock:
210  if len(self.query) == 0:
211  return
212  query = copy.deepcopy(self.query)
213  self.query = []
214  end_time = time.time() * 1000
215  rospy.logdebug("copy time: {}ms".format(end_time - start_time))
216  rospy.logdebug("data length: {}".format(len(query)))
217  try:
218  self.client.write_points(query, time_precision='ms')
219  except influxdb.exceptions.InfluxDBServerError as e:
220  rospy.logerr("InfluxDB error: {}".format(e))
221  end_time = time.time() * 1000
222  rospy.logdebug("timer cb time: {}ms".format(end_time - start_time))
223  if ((end_time - start_time) > (self.duration * 1000)):
224  rospy.logerr("timer cb time exceeds: {} > {}".format(
225  end_time - start_time, self.duration * 1000))
226 
227  def _graph_cb(self, event):
228  graph = {}
229  try:
230  graph = yaml.load(self.get_frames().frame_yaml)
231  except Exception as e:
232  rospy.logerr(e)
233 
234  if len(graph) > 0:
235  with self.graph_lock:
236  self.graph = graph
def timestamp_to_influxdb_time(timestamp)
Definition: utils.py:5


influxdb_store
Author(s): Shingo Kitagawa
autogenerated on Sat Jun 24 2023 02:40:23