power_state_aggregator.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 #
3 # Copyright 2017 Fraunhofer Institute for Manufacturing Engineering and Automation (IPA)
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 
17 
18 import rospy
19 import numpy
20 import warnings
21 from std_msgs.msg import Float64
22 from cob_msgs.msg import PowerState
23 from sensor_msgs.msg import BatteryState
24 
26 
27  def __init__(self):
28  # get parameters
29  self.current_buffer_size = rospy.get_param('~current_buffer_size', 901) # approx. 60sec, topic rate 15Hz, use odd number to prevent 0.0 mean
30  self.pub_power_state = rospy.Publisher('power_state', PowerState, queue_size=1)
31  self.pub_battery_state = rospy.Publisher('battery_state', BatteryState, queue_size=1)
32  self.voltage = None
33  self.current = None
34  self.last_currents = []
35  self.last_update = rospy.Time(0)
36  self.connected = False
37  self.charging = False
38  self.remaining_capacity = None
40  self.temperature = None
41  rospy.Subscriber("voltage", Float64, self.voltage_cb)
42  rospy.Subscriber("current", Float64, self.current_cb)
43  rospy.Subscriber("remaining_capacity", Float64, self.remaining_capacity_cb)
44  rospy.Subscriber("full_charge_capacity", Float64, self.full_charge_capacity_cb)
45  rospy.Subscriber("temperature", Float64, self.temperature_cb)
46 
47  def voltage_cb(self, msg):
48  self.last_update = rospy.Time.now()
49  self.voltage = msg.data
50 
51  def current_cb(self, msg):
52  self.last_update = rospy.Time.now()
53  self.current = msg.data
54 
55  # fill current into list of past currents for filtering purposes
56  if len(self.last_currents) >= self.current_buffer_size:
57  self.last_currents.pop(0)
58  self.last_currents.append(msg.data)
59 
60  if msg.data > 0:
61  self.charging = True
62  else:
63  self.charging = False
64 
65  if msg.data > -1: # we use a limit of -1Ampere because if the battery is 100% full and the robot is still docked, there is no more current going into the battery. -1 A is biggger than the "Ruhestrom", so this should be ok until BMS is fixed and delivers a proper flag for docked or not_docked.
66  self.connected = True
67  else:
68  self.connected = False
69 
70  def remaining_capacity_cb(self, msg):
71  self.last_update = rospy.Time.now()
72  self.remaining_capacity = msg.data
73 
74  def full_charge_capacity_cb(self, msg):
75  self.last_update = rospy.Time.now()
76  self.full_charge_capacity = msg.data
77 
78  def temperature_cb(self, msg):
79  self.last_update = rospy.Time.now()
80  self.temperature = msg.data
81 
83  if not self.charging and self.voltage != None and self.current != None:
84  return round(self.voltage * abs(self.current), 3)
85  else:
86  return 0.0
87 
89  try:
90  return round(100.0*(self.remaining_capacity/self.full_charge_capacity), 3)
91  except ZeroDivisionError as e:
92  rospy.logerr("ZeroDivisionError: full_charge_capacity is 0.0: %s" % (e))
93  except:
94  rospy.logwarn("something went wrong, cannot calculate relative remaining capacity. full_charge_capacity=%s, remaining_capacity=%s" % (self.full_charge_capacity, self.remaining_capacity))
95  return 0.0
96 
98  warnings.filterwarnings('error')
99  time_remaining_max = 10.0 # assume 10h - for cases where current approx. 0A
100  time_remaining = time_remaining_max
101  try:
102  if len(self.last_currents) > 0:
103  current = numpy.mean(self.last_currents)
104 
105  if self.full_charge_capacity != None and self.remaining_capacity != None:
106  if self.charging:
107  time_remaining = round((self.full_charge_capacity - self.remaining_capacity) / abs(current), 3)
108  else:
109  time_remaining = round(self.remaining_capacity / abs(current), 3)
110  else:
111  pass
112  else:
113  pass
114  except ZeroDivisionError as e:
115  rospy.logerr("ZeroDivisionError: current is 0.0: {}".format(e))
116  except Warning as w:
117  rospy.logerr("Warning: {}".format(w))
118  except Exception as e:
119  rospy.logerr("Exception: {}".format(e))
120  # rospy.logdebug("calculate_time_remaining")
121  # rospy.logdebug("time_remaining_max: {}".format(time_remaining_max))
122  # rospy.logdebug("time_remaining: {}".format(time_remaining))
123  # rospy.logdebug("self.last_currents: {}".format(self.last_currents))
124  # rospy.logdebug("current: {}".format(current))
125  # rospy.logdebug("self.full_charge_capacity: {}".format(self.full_charge_capacity))
126  # rospy.logdebug("self.remaining_capacity: {}".format(self.remaining_capacity))
127  # rospy.logdebug("self.charging: {}".format(self.charging))
128  return min(time_remaining, time_remaining_max)
129 
130  def publish(self):
131  if self.voltage != None and self.current != None and self.remaining_capacity != None and self.full_charge_capacity != None and self.temperature != None and (rospy.Time.now() - self.last_update) < rospy.Duration(1):
132  power_consumption = self.calculate_power_consumption()
133  relative_remaining_capacity = self.calculate_relative_remaining_capacity()
134  time_remaining = self.calculate_time_remaining()
135 
136  ps = PowerState()
137  ps.header.stamp = self.last_update
138  ps.voltage = self.voltage
139  ps.current = self.current
140  ps.power_consumption = power_consumption
141  ps.remaining_capacity = self.remaining_capacity
142  ps.relative_remaining_capacity = relative_remaining_capacity
143  ps.connected = self.connected
144  ps.charging = self.charging
145  ps.time_remaining = time_remaining
146  ps.temperature = self.temperature
147 
148  bs = BatteryState()
149  bs.header.stamp = self.last_update
150  bs.voltage = self.voltage
151  bs.current = self.current
152  bs.charge = self.remaining_capacity
153  bs.design_capacity = self.full_charge_capacity
154  bs.percentage = relative_remaining_capacity / 100.0
155  bs.power_supply_status = BatteryState.POWER_SUPPLY_STATUS_CHARGING if self.charging else BatteryState.POWER_SUPPLY_STATUS_NOT_CHARGING
156  bs.power_supply_health = BatteryState.POWER_SUPPLY_HEALTH_GOOD
157  bs.power_supply_technology = BatteryState.POWER_SUPPLY_TECHNOLOGY_UNKNOWN
158  bs.present = True
159  bs.cell_voltage = [self.voltage]
160  bs.location = "emulated_battery"
161  bs.serial_number = "emulated_battery"
162 
163  self.pub_power_state.publish(ps)
164  self.pub_battery_state.publish(bs)
165 
166 if __name__ == "__main__":
167  rospy.init_node("power_state_aggregator")
169  rospy.loginfo("power state aggregator running")
170  rate = rospy.Rate(10)
171  while not rospy.is_shutdown():
172  PSA.publish()
173  try:
174  rate.sleep()
175  except rospy.exceptions.ROSInterruptException as e:
176  pass
177 
power_state_aggregator.PowerStateAggregator.remaining_capacity_cb
def remaining_capacity_cb(self, msg)
Definition: power_state_aggregator.py:70
power_state_aggregator.PowerStateAggregator.connected
connected
Definition: power_state_aggregator.py:36
power_state_aggregator.PowerStateAggregator.temperature
temperature
Definition: power_state_aggregator.py:40
power_state_aggregator.PowerStateAggregator.calculate_time_remaining
def calculate_time_remaining(self)
Definition: power_state_aggregator.py:97
power_state_aggregator.PowerStateAggregator.full_charge_capacity_cb
def full_charge_capacity_cb(self, msg)
Definition: power_state_aggregator.py:74
power_state_aggregator.PowerStateAggregator.current_buffer_size
current_buffer_size
Definition: power_state_aggregator.py:29
power_state_aggregator.PowerStateAggregator.voltage
voltage
Definition: power_state_aggregator.py:32
power_state_aggregator.PowerStateAggregator.calculate_power_consumption
def calculate_power_consumption(self)
Definition: power_state_aggregator.py:82
power_state_aggregator.PowerStateAggregator.full_charge_capacity
full_charge_capacity
Definition: power_state_aggregator.py:39
power_state_aggregator.PowerStateAggregator.pub_battery_state
pub_battery_state
Definition: power_state_aggregator.py:31
power_state_aggregator.PowerStateAggregator.publish
def publish(self)
Definition: power_state_aggregator.py:130
power_state_aggregator.PowerStateAggregator.remaining_capacity
remaining_capacity
Definition: power_state_aggregator.py:38
power_state_aggregator.PowerStateAggregator.pub_power_state
pub_power_state
Definition: power_state_aggregator.py:30
power_state_aggregator.PowerStateAggregator
Definition: power_state_aggregator.py:25
power_state_aggregator.PowerStateAggregator.charging
charging
Definition: power_state_aggregator.py:37
power_state_aggregator.PowerStateAggregator.calculate_relative_remaining_capacity
def calculate_relative_remaining_capacity(self)
Definition: power_state_aggregator.py:88
power_state_aggregator.PowerStateAggregator.voltage_cb
def voltage_cb(self, msg)
Definition: power_state_aggregator.py:47
power_state_aggregator.PowerStateAggregator.last_currents
last_currents
Definition: power_state_aggregator.py:34
power_state_aggregator.PowerStateAggregator.last_update
last_update
Definition: power_state_aggregator.py:35
power_state_aggregator.PowerStateAggregator.temperature_cb
def temperature_cb(self, msg)
Definition: power_state_aggregator.py:78
power_state_aggregator.PowerStateAggregator.current
current
Definition: power_state_aggregator.py:33
power_state_aggregator.PowerStateAggregator.__init__
def __init__(self)
Definition: power_state_aggregator.py:27
power_state_aggregator.PowerStateAggregator.current_cb
def current_cb(self, msg)
Definition: power_state_aggregator.py:51


cob_bms_driver
Author(s): mig-mc , Mathias Lüdtke
autogenerated on Wed Nov 8 2023 03:47:34