Go to the documentation of this file.00001
00002
00003 import rospy
00004 from jsk_network_tools.msg import FC2OCSLargeData, SilverhammerInternalBuffer
00005 from jsk_network_tools.silverhammer_util import *
00006 from threading import Lock
00007 from StringIO import StringIO
00008 from std_msgs.msg import Time
00009 from io import BytesIO
00010 from socket import *
00011 from struct import pack
00012 import diagnostic_updater
00013 from diagnostic_msgs.msg import DiagnosticStatus
00014 import roslib
00015 from roslib.message import get_message_class
00016
00017
00018
00019
00020
00021 from multiprocessing import Process, Queue
00022
00023
00024 class SilverHammerReceiver:
00025 def __init__(self):
00026 message_class_str = rospy.get_param("~message",
00027 "jsk_network_tools/FC2OCSLargeData")
00028 try:
00029 self.message_class = get_message_class(message_class_str)
00030 except:
00031 raise Exception("invalid topic type: %s"%message_class_str)
00032 self.lock = Lock()
00033 self.diagnostic_updater = diagnostic_updater.Updater()
00034 self.diagnostic_updater.setHardwareID("none")
00035 self.diagnostic_updater.add("HighspeedReceiver", self.diagnosticCallback)
00036 self.latch = rospy.get_param("~latch", True)
00037 self.pesimistic = rospy.get_param("~pesimistic", False)
00038 self.fragment_packets_torelance = rospy.get_param("~fragment_packets_torelance", 20)
00039 self.timestamp_overwrite_topics = rospy.get_param("~timestamp_overwrite_topics", [])
00040 self.publish_only_if_updated_topics = rospy.get_param("~publish_only_if_updated_topics", [])
00041 self.prev_seq_ids = {}
00042 self.receive_port = rospy.get_param("~receive_port", 16484)
00043 self.receive_ip = rospy.get_param("~receive_ip", "localhost")
00044 self.topic_prefix = rospy.get_param("~topic_prefix", "/from_fc")
00045 if not self.topic_prefix.startswith("/"):
00046 self.topic_prefix = "/" + self.topic_prefix
00047 if self.topic_prefix == "/":
00048 self.topic_prefix = ""
00049 self.publishers = publishersFromMessage(self.message_class,
00050 self.topic_prefix,
00051 latch=self.latch)
00052 self.packet_size = rospy.get_param("~packet_size", 1400)
00053 self.launched_time = rospy.Time.now()
00054 self.last_received_time = rospy.Time(0)
00055 self.last_received_time_pub = rospy.Publisher("~last_received_time", Time)
00056 self.last_published_seq_id = -1
00057 self.diagnostic_timer = rospy.Timer(rospy.Duration(1.0 / 10),
00058 self.diagnosticTimerCallback)
00059 self.packets_queue = Queue()
00060 def diagnosticCallback(self, stat):
00061
00062 stat.summary(DiagnosticStatus.OK, "OK")
00063 with self.lock:
00064 now = rospy.Time.now()
00065 stat.add("Uptime [sec]",
00066 (now - self.launched_time).to_sec())
00067 stat.add("Time from last input [sec]",
00068 (now - self.last_received_time).to_sec())
00069 stat.add("UDP address", self.receive_ip)
00070 stat.add("UDP port", self.receive_port)
00071 return stat
00072 def diagnosticTimerCallback(self, event):
00073 self.diagnostic_updater.update()
00074 with self.lock:
00075 self.last_received_time_pub.publish(self.last_received_time)
00076 def run(self):
00077 sub = rospy.Subscriber("~input", SilverhammerInternalBuffer, self.callback)
00078 rospy.spin()
00079 def callback(self, msg):
00080 b = StringIO()
00081 b.write(msg.data)
00082 deserialized_data = []
00083 rospy.loginfo("data size: %d" % len(msg.data))
00084 rospy.msg.deserialize_messages(b, deserialized_data,
00085 self.message_class)
00086 rospy.loginfo("received %d message" % len(deserialized_data))
00087
00088 if len(deserialized_data) > 0:
00089
00090 msg = deserialized_data[0]
00091 messages = decomposeLargeMessage(msg, self.topic_prefix)
00092 now = rospy.Time.now()
00093 for pub in self.publishers:
00094 if pub.name in messages:
00095 if not pub.name in self.publish_only_if_updated_topics:
00096 rospy.loginfo("publishing %s" % pub.name)
00097 if pub.name in self.timestamp_overwrite_topics:
00098 if (hasattr(messages[pub.name], "header") and
00099 hasattr(messages[pub.name].header, "stamp")):
00100 messages[pub.name].header.stamp = now
00101 pub.publish(messages[pub.name])
00102
00103 else:
00104 rospy.logwarn("""cannot find '%s' in deserialized messages %s""" % (pub.name, messages.keys()))
00105 synchronized_publishers = []
00106 at_lest_one_topic = False
00107
00108 for pub in self.publishers:
00109 if pub.name in messages:
00110 if pub.name in self.publish_only_if_updated_topics:
00111 synchronized_publishers.append(pub)
00112 if (hasattr(messages[pub.name], "header") and
00113 hasattr(messages[pub.name].header, "stamp")):
00114 messages[pub.name].header.stamp = now
00115 if (hasattr(messages[pub.name], "header") and
00116 hasattr(messages[pub.name].header, "seq")):
00117
00118 if (pub.name in self.prev_seq_ids.keys() and
00119 messages[pub.name].header.seq == self.prev_seq_ids[pub.name]):
00120 rospy.loginfo("skip publishing %s " % (pub.name))
00121 pass
00122 else:
00123 rospy.loginfo("messages[%s].header.seq: %d"% (pub.name,
00124 messages[pub.name].header.seq))
00125 rospy.loginfo("self.prev_seq_ids[%s]: %d" % (pub.name,
00126 messages[pub.name].header.seq))
00127 self.prev_seq_ids[pub.name] = messages[pub.name].header.seq
00128 at_lest_one_topic = True
00129 if at_lest_one_topic:
00130 for pub in synchronized_publishers:
00131 pub.publish(messages[pub.name])
00132 else:
00133 rospy.logerr("missed some packets")
00134
00135
00136 if __name__ == "__main__":
00137 rospy.init_node("silverhammer_highspeed_receiver")
00138 receiver = SilverHammerReceiver()
00139 receiver.run()