12 from sensor_msgs.msg
import JointState
17 host = rospy.get_param(
'~host',
'localhost')
18 port = rospy.get_param(
'~port', 8086)
19 database = rospy.get_param(
'~database',
'test')
20 self.
duration = rospy.get_param(
'~duration', 3.0)
21 self.
client = influxdb.InfluxDBClient(
22 host=host, port=port, database=database)
23 self.
client.create_database(database)
24 self.
sub = rospy.Subscriber(
25 '~input', JointState, self.
_cb, queue_size=300)
27 self.
lock = threading.Lock()
32 joint_names = msg.name
33 position = msg.position
34 velocity = msg.velocity
38 "measurement":
"joint_states",
43 "fields": dict(list(zip(joint_names, position)))
46 "measurement":
"joint_states",
51 "fields": dict(list(zip(joint_names, velocity)))
54 "measurement":
"joint_states",
59 "fields": dict(list(zip(joint_names, effort)))
63 start_time = time.time() * 1000
66 if len(self.
query) == 0:
68 query = copy.deepcopy(self.
query)
70 end_time = time.time() * 1000
71 rospy.logdebug(
"copy time: {}ms".format(end_time - start_time))
72 rospy.logdebug(
"data length: {}".format(len(query)))
74 self.
client.write_points(query, time_precision=
'ms')
75 except influxdb.exceptions.InfluxDBServerError
as e:
76 rospy.logerr(
"InfluxDB error: {}".format(e))
77 end_time = time.time() * 1000
78 rospy.logdebug(
"timer cb time: {}ms".format(end_time - start_time))
79 if ((end_time - start_time) > (self.
duration * 1000)):
80 rospy.logerr(
"timer cb time exceeds: {} > {}".format(
81 end_time - start_time, self.
duration * 1000))
84 if __name__ ==
'__main__':
85 rospy.init_node(
'joint_states_logger')
def timestamp_to_influxdb_time(timestamp)
def _timer_cb(self, event)