00001
00002
00003 from __future__ import with_statement
00004
00005 import roslib; roslib.load_manifest('network_monitor_udp')
00006 import rospy
00007
00008 import sys
00009 import network_monitor_udp.msg as msgs
00010 import threading
00011 import time
00012 import traceback
00013 import threading
00014 import subprocess
00015 from diagnostic_msgs.msg import DiagnosticStatus, KeyValue, DiagnosticArray
00016
00017 inf = 1e1000
00018
00019 class NetworkWatchdog:
00020 def __init__(self, topic, timeout, max_latency, max_loss, action, action_name):
00021 self.action = action
00022 self.timeout = timeout
00023 self.max_latency = max_latency
00024 self.max_loss = max_loss
00025 self.action_name = action_name
00026
00027 self.start_time = rospy.get_time()
00028 self.prev_stamp = self.start_time
00029 self.trigger_count = 0
00030 self.outage_count = 0
00031 self.outage_length = 0
00032 self.longest_outage = 0
00033 self.longest_outage_time = None
00034 self.latest_trigger_time = None
00035 self.longest_action_duration = -inf
00036 self.longest_action_time = None
00037
00038 self.interruption_time = inf
00039 self.interruption_time_orig = inf
00040 self.action_running = False
00041 self.outage_in_progress = False
00042
00043 self.mutex = threading.Lock()
00044
00045 self.diag_pub = rospy.Publisher("/diagnostics", DiagnosticArray)
00046 rospy.Subscriber(topic, msgs.UdpMonitor, self.callback)
00047
00048 def callback(self, msg):
00049 sum = 1
00050 i = 0
00051 while i < len(msg.latency_bin_limits) and msg.latency_bin_limits[i] <= self.max_latency:
00052 sum -= msg.latency_bin_values[i]
00053 i += 1
00054
00055 stamp = msg.header.stamp.to_sec()
00056
00057 trigger = False
00058
00059 with self.mutex:
00060 if sum < self.max_loss:
00061 if self.outage_in_progress:
00062 self.outage_in_progress = False
00063 self.interruption_time = inf
00064 rospy.loginfo("Network is back up. Outage length %.1f s."%(stamp - self.interruption_time_orig))
00065 else:
00066 if not self.outage_in_progress:
00067 self.outage_in_progress = True
00068 rospy.logwarn("Network outage detected.")
00069 self.interruption_time = self.prev_stamp
00070 self.interruption_time_orig = self.prev_stamp
00071 self.outage_count += 1
00072 if stamp - self.interruption_time > self.timeout and not self.action_running:
00073 self.action_running = True
00074 self.trigger_count += 1
00075 trigger = True
00076 self.outage_length = stamp - self.interruption_time_orig
00077 if self.outage_length > self.longest_outage:
00078 self.longest_outage = self.outage_length
00079 self.longest_outage_time = self.interruption_time_orig
00080 self.prev_stamp = stamp
00081
00082 if trigger:
00083 self.latest_trigger_time = rospy.get_time()
00084 rospy.logerr("Network watchdog action triggered: %s", self.action_name)
00085 self.diagnostics()
00086 threading.Thread(target=self.do_action).start()
00087
00088 def do_action(self):
00089 try:
00090 self.action()
00091 except Exception:
00092 print "Caught exception during action."
00093 traceback.print_exc(10)
00094 now = rospy.get_time()
00095 with self.mutex:
00096 if self.interruption_time != inf:
00097 self.interruption_time = now
00098 self.action_running = False
00099 duration = now - self.latest_trigger_time
00100 if self.longest_action_duration < duration:
00101 self.longest_action_duration = duration
00102 self.longest_action_time = self.latest_trigger_time
00103
00104 def diagnostics(self):
00105 ds = DiagnosticStatus()
00106 ds.name = rospy.get_caller_id().lstrip('/') + ": " + "Network Watchdog"
00107 ds.hardware_id = "none"
00108
00109 with self.mutex:
00110 now = rospy.get_time()
00111 time_to_timeout = self.timeout - (now - self.interruption_time)
00112 elapsed_orig = now - self.interruption_time_orig
00113
00114
00115 ds.values.append(KeyValue("Timeout (s)", "%.1f"%self.timeout))
00116 ds.values.append(KeyValue("Latency threshold (ms)", str(int(self.max_latency * 1000))))
00117 ds.values.append(KeyValue("Loss threshold (%)", "%.1f"%(self.max_loss * 100)))
00118 ds.values.append(KeyValue("Timeout command", str(self.action_name)))
00119
00120
00121 ds.values.append(KeyValue("Network is down", str(self.outage_in_progress)))
00122 ds.values.append(KeyValue("Outage count", str(self.outage_count)))
00123 if self.outage_in_progress:
00124 ds.values.append(KeyValue("Time since outage start (s)", "%.1f"%elapsed_orig))
00125 if self.outage_length:
00126 ds.values.append(KeyValue("Latest outage length", "%.1f"%self.outage_length))
00127 ds.values.append(KeyValue("Longest outage length (s)", "%.1f"%self.longest_outage))
00128
00129
00130 ds.values.append(KeyValue("Timeout command in progress", str(self.action_running)))
00131 ds.values.append(KeyValue("Trigger count", str(self.trigger_count)))
00132 if self.outage_in_progress:
00133 if not self.action_running:
00134 ds.values.append(KeyValue("Time to timeout (s)", "%.1f"%(time_to_timeout)))
00135 if self.action_running:
00136 ds.values.append(KeyValue("Time since trigger", str(now - self.latest_trigger_time)))
00137 if self.longest_action_time:
00138 ds.values.append(KeyValue("Longest timeout command run time", "%.1f"%self.longest_action_duration))
00139
00140
00141 ds.values.append(KeyValue("Node start time", time.ctime(self.start_time)))
00142 if self.interruption_time_orig != inf:
00143 ds.values.append(KeyValue("Latest outage start time", time.ctime(self.interruption_time_orig)))
00144 if self.longest_outage_time:
00145 ds.values.append(KeyValue("Longest outage start time", time.ctime(self.longest_outage_time)))
00146 if self.latest_trigger_time:
00147 ds.values.append(KeyValue("Latest trigger time", time.ctime(self.latest_trigger_time)))
00148 if self.longest_action_time:
00149 ds.values.append(KeyValue("Time of longest timeout command", time.ctime(self.longest_action_time)))
00150
00151
00152 if self.interruption_time == inf:
00153 ds.message = "Network is up"
00154 ds.level = DiagnosticStatus.OK
00155 if self.action_running:
00156 ds.message += "; Timeout command in progress (%.0f s)"%(now - self.latest_trigger_time)
00157 else:
00158 if self.action_running:
00159 ds.message = "Timeout command in progress (%.0f s; %.0f s)"%(elapsed_orig, now - self.latest_trigger_time)
00160 ds.level = DiagnosticStatus.ERROR
00161 else:
00162 ds.message = "Network is down (%.0f s; %.0f s)"%(elapsed_orig, time_to_timeout)
00163 ds.level = DiagnosticStatus.WARN
00164
00165 da = DiagnosticArray()
00166 da.header.stamp = rospy.get_rostime()
00167 da.status.append(ds)
00168 self.diag_pub.publish(da)
00169
00170 class Action:
00171 def __init__(self, command):
00172 self.command = command
00173
00174 def run(self):
00175 subprocess.call(['/bin/sh', '-c', self.command], stdin = subprocess.PIPE)
00176
00177 def main():
00178 try:
00179 rospy.init_node('udp_monitor_watchdog', anonymous=True)
00180
00181 timeout = rospy.get_param("~timeout", 120)
00182 max_latency = rospy.get_param("~max_latency", 0.5)
00183 max_loss = rospy.get_param("~max_loss", 0.95)
00184 command = rospy.get_param("~command", "true")
00185
00186 nw = NetworkWatchdog('udpmonitor', timeout, max_latency, max_loss, Action(command).run, command)
00187
00188 sleeptime = rospy.Duration(1)
00189 while not rospy.is_shutdown():
00190 try:
00191 nw.diagnostics()
00192 except:
00193
00194 traceback.print_exc(10)
00195 rospy.sleep(sleeptime)
00196 except KeyboardInterrupt:
00197 pass
00198 if nw.action_running:
00199 print "Waiting for trigger command to complete."
00200
00201 if __name__ == "__main__":
00202 main()