power_state_aggregator.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 #
3 # Copyright 2017 Fraunhofer Institute for Manufacturing Engineering and Automation (IPA)
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 
17 
18 import rospy
19 import numpy
20 from std_msgs.msg import Float64
21 from cob_msgs.msg import PowerState
22 
24 
25  def __init__(self):
26  # get parameters
27  self.current_buffer_size = rospy.get_param('~current_buffer_size', 10)
28  self.pub_power_state = rospy.Publisher('power_state', PowerState, queue_size=1)
29  self.voltage = None
30  self.current = None
31  self.last_currents = []
32  self.last_update = rospy.Time(0)
33  self.charging = False
34  self.remaining_capacity = None
36  self.temperature = None
37  rospy.Subscriber("voltage", Float64, self.voltage_cb)
38  rospy.Subscriber("current", Float64, self.current_cb)
39  rospy.Subscriber("remaining_capacity", Float64, self.remaining_capacity_cb)
40  rospy.Subscriber("full_charge_capacity", Float64, self.full_charge_capacity_cb)
41  rospy.Subscriber("temperature", Float64, self.temperature_cb)
42 
43  def voltage_cb(self, msg):
44  self.last_update = rospy.Time.now()
45  self.voltage = msg.data
46 
47  def current_cb(self, msg):
48  self.last_update = rospy.Time.now()
49  self.current = msg.data
50 
51  # fill current into list of past currents for filtering purposes
52  if len(self.last_currents) >= self.current_buffer_size:
53  self.last_currents.pop(0)
54  self.last_currents.append(msg.data)
55 
56  if msg.data > -1: # we use a limit of -1Ampere because if the battery is 100% full and the robot is still docked, there is no more current going into the battery. -1 A is biggger than the "Ruhestrom", so this should be ok until BMS is fixed and delivers a proper flag for docked or not_docked.
57  self.charging = True
58  else:
59  self.charging = False
60 
61  def remaining_capacity_cb(self, msg):
62  self.last_update = rospy.Time.now()
63  self.remaining_capacity = msg.data
64 
65  def full_charge_capacity_cb(self, msg):
66  self.last_update = rospy.Time.now()
67  self.full_charge_capacity = msg.data
68 
69  def temperature_cb(self, msg):
70  self.last_update = rospy.Time.now()
71  self.temperature = msg.data
72 
74  if not self.charging and self.voltage != None and self.current != None:
75  return round(self.voltage * abs(self.current), 3)
76  else:
77  return 0.0
78 
80  try:
81  return round(100.0*(self.remaining_capacity/self.full_charge_capacity), 3)
82  except ZeroDivisionError as e:
83  rospy.logerr("ZeroDivisionError: full_charge_capacity is 0.0: %s" % (e))
84  except:
85  rospy.logwarn("something went wrong, cannot calculate relative remaining capacity. full_charge_capacity=%s, remaining_capacity=%s" % (self.full_charge_capacity, self.remaining_capacity))
86  return 0.0
87 
89  time_remaining_max = 10.0 # assume 10h - for cases where current approx. 0A
90  time_remaining = time_remaining_max
91  if len(self.last_currents) > 0:
92  current = numpy.mean(self.last_currents)
93 
94  if self.full_charge_capacity != None and self.remaining_capacity != None:
95  try:
96  if self.charging:
97  time_remaining = round((self.full_charge_capacity - self.remaining_capacity) / abs(current), 3)
98  else:
99  time_remaining = round(self.remaining_capacity / abs(current), 3)
100  except ZeroDivisionError as e:
101  rospy.logerr("ZeroDivisionError: current is 0.0: %s" % (e))
102  except:
103  rospy.logwarn("something went wrong, cannot calculate time_remaining. full_charge_capacity=%s, remaining_capacity=%s, current=%s" % (self.full_charge_capacity, self.remaining_capacity, current))
104  else:
105  pass
106  else:
107  pass
108  return min(time_remaining, time_remaining_max)
109 
110  def publish(self):
111  if self.voltage != None and self.current != None and self.remaining_capacity != None and self.full_charge_capacity != None and self.temperature != None and (rospy.Time.now() - self.last_update) < rospy.Duration(1):
112  ps = PowerState()
113  ps.header.stamp = self.last_update
114  ps.voltage = self.voltage
115  ps.current = self.current
116  ps.power_consumption = self.calculate_power_consumption()
117  ps.remaining_capacity = self.remaining_capacity
118  ps.relative_remaining_capacity = self.calculate_relative_remaining_capacity()
119  ps.charging = self.charging
120  ps.time_remaining = self.calculate_time_remaining()
121  ps.temperature = self.temperature
122  self.pub_power_state.publish(ps)
123  return ps
124 
125 if __name__ == "__main__":
126  rospy.init_node("power_state_aggregator")
128  rospy.loginfo("power state aggregator running")
129  rate = rospy.Rate(10)
130  while not rospy.is_shutdown():
131  PSA.publish()
132  try:
133  rate.sleep()
134  except rospy.exceptions.ROSInterruptException as e:
135  pass
136 


cob_bms_driver
Author(s): mig-mc , Mathias Lüdtke
autogenerated on Wed Apr 7 2021 02:11:37