silverhammer_highspeed_streamer.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 
00003 import rospy
00004 from jsk_network_tools.msg import FC2OCSLargeData
00005 from jsk_network_tools.silverhammer_util import *
00006 from threading import Lock
00007 from StringIO import StringIO
00008 from io import BytesIO
00009 from socket import *
00010 import diagnostic_updater
00011 from diagnostic_msgs.msg import DiagnosticStatus
00012 import roslib
00013 from roslib.message import get_message_class
00014 from std_msgs.msg import Time
00015 from dynamic_reconfigure.server import Server
00016 from jsk_network_tools.cfg import SilverhammerHighspeedStreamerConfig
00017 
00018 class SilverHammerStreamer:
00019     def __init__(self):
00020         message_class_str = rospy.get_param("~message", 
00021                                             "jsk_network_tools/FC2OCSLargeData")
00022         try:
00023             self.message_class = get_message_class(message_class_str)
00024         except:
00025             raise Exception("invalid topic type: %s"%message_class_str)
00026         self.lock = Lock()
00027         self.dynamic_reconfigure = Server(SilverhammerHighspeedStreamerConfig, self.dynamicReconfigureCallback)
00028         self.launched_time = rospy.Time.now()
00029         self.packet_interval = None
00030         self.diagnostic_updater = diagnostic_updater.Updater()
00031         self.diagnostic_updater.setHardwareID("none")
00032         # register function to publish diagnostic
00033         self.diagnostic_updater.add("HighspeedStreamer", self.diagnosticCallback)
00034         self.send_port = rospy.get_param("~to_port", 16484)
00035         self.send_ip = rospy.get_param("~to_ip", "localhost")
00036         self.last_send_time = rospy.Time(0)
00037         self.last_input_received_time = rospy.Time(0)
00038         self.last_send_time_pub = rospy.Publisher("~last_send_time", Time)
00039         self.last_input_received_time_pub = rospy.Publisher(
00040             "~last_input_received_time", Time)
00041         self.send_num = 0
00042         self.rate = rospy.get_param("~send_rate", 2)   #2Hz
00043         self.socket_client = socket(AF_INET, SOCK_DGRAM)
00044         self.packet_size = rospy.get_param("~packet_size", 1400) # for MTU:=1500
00045         subscriber_info = subscribersFromMessage(self.message_class())
00046         self.messages = {}
00047         self.subscribe(subscriber_info)
00048         self.counter = 0
00049         self.send_timer = rospy.Timer(rospy.Duration(1.0 / self.rate),
00050                                       self.sendTimerCallback)
00051         self.diagnostic_timer = rospy.Timer(rospy.Duration(1.0 / 10),
00052                                             self.diagnosticTimerCallback)
00053     def dynamicReconfigureCallback(self, config, level):
00054         with self.lock:
00055             self.bandwidth = config.bandwidth
00056             self.packet_sleep_sum = config.packet_sleep_sum
00057             return config
00058     def diagnosticCallback(self, stat):
00059         # always OK
00060         stat.summary(DiagnosticStatus.OK, "OK")
00061         with self.lock:
00062             now = rospy.Time.now()
00063             stat.add("Uptime [sec]",
00064                      (now - self.launched_time).to_sec())
00065             stat.add("Time from the last sending [sec]",
00066                      (now - self.last_send_time).to_sec())
00067             stat.add("Number of transmission", self.send_num)
00068             stat.add("Time from the last input [sec]",
00069                      (now - self.last_input_received_time).to_sec())
00070             # properties
00071             stat.add("UDP address", self.send_ip)
00072             stat.add("UDP port", self.send_port)
00073             stat.add("Expected Hz to send", "%f Hz" % self.rate)
00074             stat.add("Packet size", "%d bytes" % self.packet_size)
00075             if self.packet_interval:
00076                 stat.add("Latest nterval duration between packets",
00077                          self.packet_interval)
00078             self.last_send_time_pub.publish(self.last_send_time)
00079             self.last_input_received_time_pub.publish(self.last_input_received_time)
00080         return stat
00081     def diagnosticTimerCallback(self, event):
00082         self.diagnostic_updater.update()
00083     def timeTimerCallback(self, event):
00084         with self.lock:
00085             if self.last_send_time:
00086                 self.last_send_time_pub.publish(self.last_send_time)
00087             else:
00088                 self.last_send_time_pub.publish(Time(data=rospy.Time(0)))
00089             if self.last_input_received_time:
00090                 self.last_input_received_time_pub.publish(
00091                     self.last_input_received_time)
00092             else:
00093                 self.last_input_received_time_pub.publish(
00094                     Time(data=rospy.Time(0)))
00095     def sendTimerCallback(self, event):
00096         buffer = StringIO()
00097         with self.lock:
00098             msg = self.message_class()
00099             subscriber_info = subscribersFromMessage(msg)
00100             for topic, message_class in subscriber_info:
00101                 field_name = topic[1:].replace("/", "__")
00102                 if topic in self.messages:
00103                     setattr(msg, field_name, self.messages[topic])
00104             rospy.msg.serialize_message(buffer, 0, msg)
00105             self.last_send_time = rospy.Time.now()
00106         # send buffer as UDP
00107         self.sendBuffer(buffer.getvalue(), buffer.len * 8)
00108     def sendBuffer(self, buffer, buffer_size):
00109         self.send_num = self.send_num + 1
00110         packets = separateBufferIntoPackets(
00111             self.counter, buffer, self.packet_size)
00112         total_sec = float(buffer_size) / self.bandwidth
00113         packet_interval = total_sec / len(packets)
00114         self.packet_interval = packet_interval
00115         rospy.loginfo("sending %d packets", len(packets))
00116         rospy.loginfo("sending %d bits with %f interval" 
00117                       % (buffer_size, packet_interval))
00118         rospy.loginfo("total time to send is %f sec" % total_sec)
00119         r = rospy.Rate(1 / packet_interval / self.packet_sleep_sum)
00120         for p, i in zip(packets, range(len(packets))):
00121             self.socket_client.sendto(p.pack(), (self.send_ip, self.send_port))
00122             if i % self.packet_sleep_sum == 0:
00123                 r.sleep()
00124         self.counter = self.counter + 1
00125         if self.counter > 65535:
00126             self.counter = 0
00127     def subscribe(self, subscriber_infos):
00128         self.subscribers = []
00129         for topic, message_class in subscriber_infos:
00130             sub = rospy.Subscriber(topic, message_class, 
00131                                    self.genMessageCallback(topic))
00132             self.subscribers.append(sub)
00133     def messageCallback(self, msg, topic):
00134         with self.lock:
00135             self.messages[topic] = msg
00136             self.last_input_received_time = rospy.Time.now()
00137     def genMessageCallback(self, topic):
00138         return lambda msg: self.messageCallback(msg, topic)
00139 
00140 if __name__ == "__main__":
00141     rospy.init_node("silverhammer_highspeed_streamer")
00142     streamer = SilverHammerStreamer()
00143     rospy.spin()
00144     


jsk_network_tools
Author(s): Yusuke Furuta
autogenerated on Fri Sep 8 2017 03:38:47