silverhammer_highspeed_streamer.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 import rospy
4 from jsk_network_tools.msg import FC2OCSLargeData
6 from threading import Lock
7 from StringIO import StringIO
8 from io import BytesIO
9 from socket import *
10 import diagnostic_updater
11 from diagnostic_msgs.msg import DiagnosticStatus
12 import roslib
13 from roslib.message import get_message_class
14 from std_msgs.msg import Time
15 from dynamic_reconfigure.server import Server
16 from jsk_network_tools.cfg import SilverhammerHighspeedStreamerConfig
17 
19  def __init__(self):
20  message_class_str = rospy.get_param("~message",
21  "jsk_network_tools/FC2OCSLargeData")
22  try:
23  self.message_class = get_message_class(message_class_str)
24  except:
25  raise Exception("invalid topic type: %s"%message_class_str)
26  self.lock = Lock()
27  self.dynamic_reconfigure = Server(SilverhammerHighspeedStreamerConfig, self.dynamicReconfigureCallback)
28  self.launched_time = rospy.Time.now()
29  self.packet_interval = None
31  self.diagnostic_updater.setHardwareID("none")
32  # register function to publish diagnostic
33  self.diagnostic_updater.add("HighspeedStreamer", self.diagnosticCallback)
34  self.send_port = rospy.get_param("~to_port", 16484)
35  self.send_ip = rospy.get_param("~to_ip", "localhost")
36  self.last_send_time = rospy.Time(0)
37  self.last_input_received_time = rospy.Time(0)
38  self.last_send_time_pub = rospy.Publisher("~last_send_time", Time)
39  self.last_input_received_time_pub = rospy.Publisher(
40  "~last_input_received_time", Time)
41  self.send_num = 0
42  self.rate = rospy.get_param("~send_rate", 2) #2Hz
43  self.socket_client = socket(AF_INET, SOCK_DGRAM)
44  self.packet_size = rospy.get_param("~packet_size", 1400) # for MTU:=1500
45  subscriber_info = subscribersFromMessage(self.message_class())
46  self.messages = {}
47  self.subscribe(subscriber_info)
48  self.counter = 0
49  self.send_timer = rospy.Timer(rospy.Duration(1.0 / self.rate),
50  self.sendTimerCallback)
51  self.diagnostic_timer = rospy.Timer(rospy.Duration(1.0 / 10),
53  def dynamicReconfigureCallback(self, config, level):
54  with self.lock:
55  self.bandwidth = config.bandwidth
56  self.packet_sleep_sum = config.packet_sleep_sum
57  return config
58  def diagnosticCallback(self, stat):
59  # always OK
60  stat.summary(DiagnosticStatus.OK, "OK")
61  with self.lock:
62  now = rospy.Time.now()
63  stat.add("Uptime [sec]",
64  (now - self.launched_time).to_sec())
65  stat.add("Time from the last sending [sec]",
66  (now - self.last_send_time).to_sec())
67  stat.add("Number of transmission", self.send_num)
68  stat.add("Time from the last input [sec]",
69  (now - self.last_input_received_time).to_sec())
70  # properties
71  stat.add("UDP address", self.send_ip)
72  stat.add("UDP port", self.send_port)
73  stat.add("Expected Hz to send", "%f Hz" % self.rate)
74  stat.add("Packet size", "%d bytes" % self.packet_size)
75  if self.packet_interval:
76  stat.add("Latest nterval duration between packets",
77  self.packet_interval)
78  self.last_send_time_pub.publish(self.last_send_time)
79  self.last_input_received_time_pub.publish(self.last_input_received_time)
80  return stat
81  def diagnosticTimerCallback(self, event):
82  self.diagnostic_updater.update()
83  def timeTimerCallback(self, event):
84  with self.lock:
85  if self.last_send_time:
86  self.last_send_time_pub.publish(self.last_send_time)
87  else:
88  self.last_send_time_pub.publish(Time(data=rospy.Time(0)))
90  self.last_input_received_time_pub.publish(
92  else:
93  self.last_input_received_time_pub.publish(
94  Time(data=rospy.Time(0)))
95  def sendTimerCallback(self, event):
96  buffer = StringIO()
97  with self.lock:
98  msg = self.message_class()
99  subscriber_info = subscribersFromMessage(msg)
100  for topic, message_class in subscriber_info:
101  field_name = topic[1:].replace("/", "__")
102  if topic in self.messages:
103  setattr(msg, field_name, self.messages[topic])
104  rospy.msg.serialize_message(buffer, 0, msg)
105  self.last_send_time = rospy.Time.now()
106  # send buffer as UDP
107  self.sendBuffer(buffer.getvalue(), buffer.len * 8)
108  def sendBuffer(self, buffer, buffer_size):
109  self.send_num = self.send_num + 1
110  packets = separateBufferIntoPackets(
111  self.counter, buffer, self.packet_size)
112  total_sec = float(buffer_size) / self.bandwidth
113  packet_interval = total_sec / len(packets)
114  self.packet_interval = packet_interval
115  rospy.loginfo("sending %d packets", len(packets))
116  rospy.loginfo("sending %d bits with %f interval"
117  % (buffer_size, packet_interval))
118  rospy.loginfo("total time to send is %f sec" % total_sec)
119  r = rospy.Rate(1 / packet_interval / self.packet_sleep_sum)
120  for p, i in zip(packets, range(len(packets))):
121  self.socket_client.sendto(p.pack(), (self.send_ip, self.send_port))
122  if i % self.packet_sleep_sum == 0:
123  r.sleep()
124  self.counter = self.counter + 1
125  if self.counter > 65535:
126  self.counter = 0
127  def subscribe(self, subscriber_infos):
128  self.subscribers = []
129  for topic, message_class in subscriber_infos:
130  sub = rospy.Subscriber(topic, message_class,
131  self.genMessageCallback(topic))
132  self.subscribers.append(sub)
133  def messageCallback(self, msg, topic):
134  with self.lock:
135  self.messages[topic] = msg
136  self.last_input_received_time = rospy.Time.now()
137  def genMessageCallback(self, topic):
138  return lambda msg: self.messageCallback(msg, topic)
139 
140 if __name__ == "__main__":
141  rospy.init_node("silverhammer_highspeed_streamer")
142  streamer = SilverHammerStreamer()
143  rospy.spin()
144 
def separateBufferIntoPackets(seq_id, buffer, packet_size)


jsk_network_tools
Author(s): Yusuke Furuta
autogenerated on Tue Feb 6 2018 03:45:07