fetch_battery_states_logger.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 import influxdb
4 import rospy
5 
6 from influxdb_store.utils import timestamp_to_influxdb_time
7 
8 from power_msgs.msg import BatteryState
9 
10 
12  def __init__(self):
13  host = rospy.get_param('~host', 'localhost')
14  port = rospy.get_param('~port', 8086)
15  database = rospy.get_param('~database', 'test')
16  self.client = influxdb.InfluxDBClient(
17  host=host, port=port, database=database)
18  self.client.create_database(database)
19  self.sub = rospy.Subscriber(
20  '~input', BatteryState, self._cb, queue_size=10)
21 
22  def _cb(self, msg):
23  time = timestamp_to_influxdb_time(rospy.Time.now())
24  battery_name = msg.name
25  charge_percent = msg.charge_level
26  query = [{
27  "measurement": "battery_states",
28  "tags": {
29  "battery_name": battery_name
30  },
31  "time": time,
32  "fields": {
33  "charge_percent": charge_percent,
34  }
35  }]
36  try:
37  self.client.write_points(query, time_precision='ms')
38  except influxdb.exceptions.InfluxDBServerError as e:
39  rospy.logerr("InfluxDB error: {}".format(e))
40 
41 
42 if __name__ == '__main__':
43  rospy.init_node('fetch_battery_states_logger')
45  rospy.spin()
def timestamp_to_influxdb_time(timestamp)
Definition: utils.py:5


influxdb_store
Author(s): Shingo Kitagawa
autogenerated on Sat Jun 24 2023 02:40:23