joint_states_logger.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 import copy
4 import influxdb
5 import rospy
6 import sys
7 import threading
8 import time
9 
10 from influxdb_store.utils import timestamp_to_influxdb_time
11 
12 from sensor_msgs.msg import JointState
13 
14 
15 class JointStatesLogger(object):
16  def __init__(self):
17  host = rospy.get_param('~host', 'localhost')
18  port = rospy.get_param('~port', 8086)
19  database = rospy.get_param('~database', 'test')
20  self.duration = rospy.get_param('~duration', 3.0)
21  self.client = influxdb.InfluxDBClient(
22  host=host, port=port, database=database)
23  self.client.create_database(database)
24  self.sub = rospy.Subscriber(
25  '~input', JointState, self._cb, queue_size=300)
26  self.timer = rospy.Timer(rospy.Duration(self.duration), self._timer_cb)
27  self.lock = threading.Lock()
28  self.query = []
29 
30  def _cb(self, msg):
31  influx_time = timestamp_to_influxdb_time(msg.header.stamp)
32  joint_names = msg.name
33  position = msg.position
34  velocity = msg.velocity
35  effort = msg.effort
36  with self.lock:
37  self.query.append({
38  "measurement": "joint_states",
39  "tags": {
40  "type": "position"
41  },
42  "time": influx_time,
43  "fields": dict(list(zip(joint_names, position)))
44  })
45  self.query.append({
46  "measurement": "joint_states",
47  "tags": {
48  "type": "velocity"
49  },
50  "time": influx_time,
51  "fields": dict(list(zip(joint_names, velocity)))
52  })
53  self.query.append({
54  "measurement": "joint_states",
55  "tags": {
56  "type": "effort"
57  },
58  "time": influx_time,
59  "fields": dict(list(zip(joint_names, effort)))
60  })
61 
62  def _timer_cb(self, event):
63  start_time = time.time() * 1000
64  with self.lock:
65  # no joint_states coming, so stop nodes
66  if len(self.query) == 0:
67  sys.exit(1)
68  query = copy.deepcopy(self.query)
69  self.query = []
70  end_time = time.time() * 1000
71  rospy.logdebug("copy time: {}ms".format(end_time - start_time))
72  rospy.logdebug("data length: {}".format(len(query)))
73  try:
74  self.client.write_points(query, time_precision='ms')
75  except influxdb.exceptions.InfluxDBServerError as e:
76  rospy.logerr("InfluxDB error: {}".format(e))
77  end_time = time.time() * 1000
78  rospy.logdebug("timer cb time: {}ms".format(end_time - start_time))
79  if ((end_time - start_time) > (self.duration * 1000)):
80  rospy.logerr("timer cb time exceeds: {} > {}".format(
81  end_time - start_time, self.duration * 1000))
82 
83 
84 if __name__ == '__main__':
85  rospy.init_node('joint_states_logger')
86  logger = JointStatesLogger()
87  rospy.spin()
def timestamp_to_influxdb_time(timestamp)
Definition: utils.py:5


influxdb_store
Author(s): Shingo Kitagawa
autogenerated on Sat Jun 24 2023 02:40:23