00001
00002
00003 import rospy
00004 from jsk_network_tools.msg import FC2OCSLargeData
00005 from jsk_network_tools.silverhammer_util import *
00006 from threading import Lock
00007 from StringIO import StringIO
00008 from io import BytesIO
00009 from socket import *
00010 import diagnostic_updater
00011 from diagnostic_msgs.msg import DiagnosticStatus
00012 import roslib
00013 from roslib.message import get_message_class
00014 from std_msgs.msg import Time
00015 from dynamic_reconfigure.server import Server
00016 from jsk_network_tools.cfg import SilverhammerHighspeedStreamerConfig
00017
00018 class SilverHammerStreamer:
00019 def __init__(self):
00020 message_class_str = rospy.get_param("~message",
00021 "jsk_network_tools/FC2OCSLargeData")
00022 try:
00023 self.message_class = get_message_class(message_class_str)
00024 except:
00025 raise Exception("invalid topic type: %s"%message_class_str)
00026 self.lock = Lock()
00027 self.dynamic_reconfigure = Server(SilverhammerHighspeedStreamerConfig, self.dynamicReconfigureCallback)
00028 self.launched_time = rospy.Time.now()
00029 self.packet_interval = None
00030 self.diagnostic_updater = diagnostic_updater.Updater()
00031 self.diagnostic_updater.setHardwareID("none")
00032
00033 self.diagnostic_updater.add("HighspeedStreamer", self.diagnosticCallback)
00034 self.send_port = rospy.get_param("~to_port", 16484)
00035 self.send_ip = rospy.get_param("~to_ip", "localhost")
00036 self.last_send_time = rospy.Time(0)
00037 self.last_input_received_time = rospy.Time(0)
00038 self.last_send_time_pub = rospy.Publisher("~last_send_time", Time)
00039 self.last_input_received_time_pub = rospy.Publisher(
00040 "~last_input_received_time", Time)
00041 self.send_num = 0
00042 self.rate = rospy.get_param("~send_rate", 2)
00043 self.socket_client = socket(AF_INET, SOCK_DGRAM)
00044 self.packet_size = rospy.get_param("~packet_size", 1400)
00045 subscriber_info = subscribersFromMessage(self.message_class())
00046 self.messages = {}
00047 self.subscribe(subscriber_info)
00048 self.counter = 0
00049 self.send_timer = rospy.Timer(rospy.Duration(1.0 / self.rate),
00050 self.sendTimerCallback)
00051 self.diagnostic_timer = rospy.Timer(rospy.Duration(1.0 / 10),
00052 self.diagnosticTimerCallback)
00053 def dynamicReconfigureCallback(self, config, level):
00054 with self.lock:
00055 self.bandwidth = config.bandwidth
00056 self.packet_sleep_sum = config.packet_sleep_sum
00057 return config
00058 def diagnosticCallback(self, stat):
00059
00060 stat.summary(DiagnosticStatus.OK, "OK")
00061 with self.lock:
00062 now = rospy.Time.now()
00063 stat.add("Uptime [sec]",
00064 (now - self.launched_time).to_sec())
00065 stat.add("Time from the last sending [sec]",
00066 (now - self.last_send_time).to_sec())
00067 stat.add("Number of transmission", self.send_num)
00068 stat.add("Time from the last input [sec]",
00069 (now - self.last_input_received_time).to_sec())
00070
00071 stat.add("UDP address", self.send_ip)
00072 stat.add("UDP port", self.send_port)
00073 stat.add("Expected Hz to send", "%f Hz" % self.rate)
00074 stat.add("Packet size", "%d bytes" % self.packet_size)
00075 if self.packet_interval:
00076 stat.add("Latest nterval duration between packets",
00077 self.packet_interval)
00078 self.last_send_time_pub.publish(self.last_send_time)
00079 self.last_input_received_time_pub.publish(self.last_input_received_time)
00080 return stat
00081 def diagnosticTimerCallback(self, event):
00082 self.diagnostic_updater.update()
00083 def timeTimerCallback(self, event):
00084 with self.lock:
00085 if self.last_send_time:
00086 self.last_send_time_pub.publish(self.last_send_time)
00087 else:
00088 self.last_send_time_pub.publish(Time(data=rospy.Time(0)))
00089 if self.last_input_received_time:
00090 self.last_input_received_time_pub.publish(
00091 self.last_input_received_time)
00092 else:
00093 self.last_input_received_time_pub.publish(
00094 Time(data=rospy.Time(0)))
00095 def sendTimerCallback(self, event):
00096 buffer = StringIO()
00097 with self.lock:
00098 msg = self.message_class()
00099 subscriber_info = subscribersFromMessage(msg)
00100 for topic, message_class in subscriber_info:
00101 field_name = topic[1:].replace("/", "__")
00102 if topic in self.messages:
00103 setattr(msg, field_name, self.messages[topic])
00104 rospy.msg.serialize_message(buffer, 0, msg)
00105 self.last_send_time = rospy.Time.now()
00106
00107 self.sendBuffer(buffer.getvalue(), buffer.len * 8)
00108 def sendBuffer(self, buffer, buffer_size):
00109 self.send_num = self.send_num + 1
00110 packets = separateBufferIntoPackets(
00111 self.counter, buffer, self.packet_size)
00112 total_sec = float(buffer_size) / self.bandwidth
00113 packet_interval = total_sec / len(packets)
00114 self.packet_interval = packet_interval
00115 rospy.loginfo("sending %d packets", len(packets))
00116 rospy.loginfo("sending %d bits with %f interval"
00117 % (buffer_size, packet_interval))
00118 rospy.loginfo("total time to send is %f sec" % total_sec)
00119 r = rospy.Rate(1 / packet_interval / self.packet_sleep_sum)
00120 for p, i in zip(packets, range(len(packets))):
00121 self.socket_client.sendto(p.pack(), (self.send_ip, self.send_port))
00122 if i % self.packet_sleep_sum == 0:
00123 r.sleep()
00124 self.counter = self.counter + 1
00125 if self.counter > 65535:
00126 self.counter = 0
00127 def subscribe(self, subscriber_infos):
00128 self.subscribers = []
00129 for topic, message_class in subscriber_infos:
00130 sub = rospy.Subscriber(topic, message_class,
00131 self.genMessageCallback(topic))
00132 self.subscribers.append(sub)
00133 def messageCallback(self, msg, topic):
00134 with self.lock:
00135 self.messages[topic] = msg
00136 self.last_input_received_time = rospy.Time.now()
00137 def genMessageCallback(self, topic):
00138 return lambda msg: self.messageCallback(msg, topic)
00139
00140 if __name__ == "__main__":
00141 rospy.init_node("silverhammer_highspeed_streamer")
00142 streamer = SilverHammerStreamer()
00143 rospy.spin()
00144