silverhammer_highspeed_receiver.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 import rospy
4 from jsk_network_tools.msg import FC2OCSLargeData
6 from threading import Lock
7 from StringIO import StringIO
8 from std_msgs.msg import Time
9 from io import BytesIO
10 from socket import *
11 from struct import pack
12 import diagnostic_updater
13 from diagnostic_msgs.msg import DiagnosticStatus
14 import roslib
15 from roslib.message import get_message_class
16 
17 #ifdef USE_THREAD
18 # from threading import Thread
19 # from Queue import Queue
20 #else
21 from multiprocessing import Process, Queue
22 #endif
23 
24 def receive_process_func(packets_queue, receive_ip, receive_port, packet_size, message_class, topic_prefix, pesimistic, fragment_packets_torelance):
25  # create socket
26  socket_server = socket(AF_INET, SOCK_DGRAM)
27  socket_server.setsockopt(SOL_SOCKET, SO_RCVBUF, 1564475392)
28  recv_buffer_size = socket_server.getsockopt(SOL_SOCKET, SO_RCVBUF)
29  rospy.loginfo("kernel receive socket buffer: %d" % recv_buffer_size)
30  if recv_buffer_size < 1564475392:
31  rospy.logwarn("kernel receive socket buffer must be at least 1564475392 bytes.")
32  import os, rospkg
33  pkg_path = rospkg.RosPack().get_path('jsk_network_tools')
34  cmd_path = os.path.join(pkg_path, 'scripts', 'expand_udp_receive_buffer.sh')
35  rospy.loginfo('changing kernel parameter...')
36  try:
37  res = os.system('sudo ' + cmd_path)
38  if res == 0:
39  rospy.loginfo('successfly changed kernel parameter')
40  else:
41  raise Exception('return code is non zero')
42  except Exception as e:
43  rospy.logerr('failed to change expand kernel udp receive buffer size.')
44  rospy.logwarn('maybe you don\'t yet add NOPASSWD attribute to sudo user group?')
45  rospy.logwarn('try to change from "%sudo ALL=(ALL) ALL" to "%sudo ALL=(ALL) NOPASSWD:ALL"')
46  rospy.loginfo('continuing without existing buffer size.')
47 
48  rospy.logwarn("try to bind %s:%d" % (receive_ip, receive_port))
49  socket_server.bind((receive_ip, receive_port))
50 
51  packets = {}
52  last_received_data_seq_id = -1
53 
54  while not rospy.is_shutdown():
55  recv_data, addr = socket_server.recvfrom(packet_size)
56  packet = LargeDataUDPPacket.fromData(recv_data, packet_size)
57  if packet.seq_id in packets:
58  packets[packet.seq_id] += [packet]
59  else:
60  packets[packet.seq_id] = [packet]
61  last_received_data_seq_id = packet.seq_id
62 
63  # enqueue complete packets
64  for seq_id, pcts in packets.items():
65  if pcts[0].num == len(pcts):
66  packets_queue.put(packets.pop(seq_id))
67 
68  # prune incomplete packets
69  for seq_id, pcts in packets.items():
70  if seq_id < last_received_data_seq_id - fragment_packets_torelance and len(pcts) != pcts[0].num:
71  if pesimistic:
72  rospy.logwarn("packets(seq: %d, recv: %d/%d) are pruned." % (seq_id, len(pcts), pcts[0].num))
73  del packets[seq_id]
74  else:
75  rospy.logwarn("packets(seq: %d, recv: %d/%d) are incomplete data." % (seq_id, len(pcts), pcts[0].num))
76  packets_queue.put(packets.pop(seq_id))
77 
78 
80  def __init__(self):
81  message_class_str = rospy.get_param("~message",
82  "jsk_network_tools/FC2OCSLargeData")
83  try:
84  self.message_class = get_message_class(message_class_str)
85  except:
86  raise Exception("invalid topic type: %s"%message_class_str)
87  self.lock = Lock()
89  self.diagnostic_updater.setHardwareID("none")
90  self.diagnostic_updater.add("HighspeedReceiver", self.diagnosticCallback)
91  self.latch = rospy.get_param("~latch", True)
92  self.pesimistic = rospy.get_param("~pesimistic", False)
93  self.fragment_packets_torelance = rospy.get_param("~fragment_packets_torelance", 20)
94  self.timestamp_overwrite_topics = rospy.get_param("~timestamp_overwrite_topics", [])
95  self.publish_only_if_updated_topics = rospy.get_param("~publish_only_if_updated_topics", [])
96  self.prev_seq_ids = {}
97  self.receive_port = rospy.get_param("~receive_port", 16484)
98  self.receive_ip = rospy.get_param("~receive_ip", "localhost")
99  self.topic_prefix = rospy.get_param("~topic_prefix", "/from_fc")
100  if not self.topic_prefix.startswith("/"):
101  self.topic_prefix = "/" + self.topic_prefix
102  if self.topic_prefix == "/":
103  self.topic_prefix = ""
105  self.topic_prefix,
106  latch=self.latch)
107  self.packet_size = rospy.get_param("~packet_size", 1400) #2Hz
108  self.launched_time = rospy.Time.now()
109  self.last_received_time = rospy.Time(0)
110  self.last_received_time_pub = rospy.Publisher("~last_received_time", Time)
112  self.diagnostic_timer = rospy.Timer(rospy.Duration(1.0 / 10),
114  self.packets_queue = Queue()
115 #ifdef USE_THREAD
116 # self.receive_process = Thread(target=receive_process_func, args=(self.packets_queue,
117 #else
118  self.receive_process = Process(target=receive_process_func, args=(self.packets_queue,
119 #endif
120  self.receive_ip,
121  self.receive_port,
122  self.packet_size,
123  self.message_class,
124  self.topic_prefix,
125  self.pesimistic,
127  def on_shutdown(self):
128  if self.receive_process.is_alive():
129  try:
130  self.receive_process.terminate()
131  self.receive_process.join(timeout=3)
132  if self.receive_process.is_alive():
133  raise
134  except Exception as e:
135  if "no attribute 'terminate'" in e.message:
136  return
137  pid = self.receive_process.pid
138  rospy.logerr("failed to terminate process %d: %s" % (pid, e))
139  try:
140  rospy.loginfo("trying to kill process %d" % pid)
141  import os, signal
142  os.kill(pid, signal.SIGKILL)
143  except Exception as ex:
144  rospy.logfatal("failed to kill process %d: %s." % (pid, ex))
145  rospy.logfatal("It will be zombie process" % pid)
146 
147  def diagnosticCallback(self, stat):
148  # always OK
149  stat.summary(DiagnosticStatus.OK, "OK")
150  with self.lock:
151  now = rospy.Time.now()
152  stat.add("Uptime [sec]",
153  (now - self.launched_time).to_sec())
154  stat.add("Time from last input [sec]",
155  (now - self.last_received_time).to_sec())
156  stat.add("UDP address", self.receive_ip)
157  stat.add("UDP port", self.receive_port)
158  return stat
159  def diagnosticTimerCallback(self, event):
160  self.diagnostic_updater.update()
161  with self.lock:
162  self.last_received_time_pub.publish(self.last_received_time)
163  def run(self):
164  self.receive_process.start()
165  while not rospy.is_shutdown():
166  rospy.loginfo("%d packets are in queue" % self.packets_queue.qsize())
167  packets = self.packets_queue.get()
168  packets.sort(key=lambda p: p.id)
169  self.last_received_time = rospy.Time.now()
170  try:
171  self.concatenatePackets(packets)
172  except Exception,e:
173  rospy.logerr("failed to concatenate packets: %s", e.message)
174  else:
175  self.receive_process.join()
176 
177  def concatenatePackets(self, packets):
178  if packets[0].seq_id < self.last_published_seq_id:
179  rospy.logwarn("publishing out-of-ordered packets %d -> %d" % (self.last_published_seq_id, packets[0].seq_id))
180  self.last_published_seq_id = packets[0].seq_id
181  packet_data_length = len(packets[0].data)
182  packet_index = 0
183  b = StringIO()
184  for i in range(packets[0].num):
185  if packets[packet_index].id == i:
186  packet = packets[packet_index]
187  b.write(packet.data)
188  packet_index = packet_index + 1
189  else:
190  # fill by dummy data
191  b.write(chr(0) * packet_data_length)
192  deserialized_data = []
193  rospy.msg.deserialize_messages(b, deserialized_data,
194  self.message_class)
195  rospy.loginfo("received %d message" % len(deserialized_data))
196  if len(deserialized_data) > 0:
197  # publish data
198  msg = deserialized_data[0]
199  messages = decomposeLargeMessage(msg, self.topic_prefix)
200  now = rospy.Time.now()
201  for pub in self.publishers:
202  if pub.name in messages:
203  if not pub.name in self.publish_only_if_updated_topics:
204  rospy.loginfo("publishing %s" % pub.name)
205  if pub.name in self.timestamp_overwrite_topics:
206  if (hasattr(messages[pub.name], "header") and
207  hasattr(messages[pub.name].header, "stamp")):
208  messages[pub.name].header.stamp = now
209  pub.publish(messages[pub.name])
210  #pub.publish(messages[pub.name])
211  else:
212  rospy.logwarn("""cannot find '%s' in deserialized messages %s""" % (pub.name, messages.keys()))
213  synchronized_publishers = []
214  at_lest_one_topic = False
215  # Check if there is any topic to update
216  for pub in self.publishers:
217  if pub.name in messages:
218  if pub.name in self.publish_only_if_updated_topics:
219  synchronized_publishers.append(pub)
220  if (hasattr(messages[pub.name], "header") and
221  hasattr(messages[pub.name].header, "stamp")):
222  messages[pub.name].header.stamp = now # Overwrite with the same timestamp
223  if (hasattr(messages[pub.name], "header") and
224  hasattr(messages[pub.name].header, "seq")):
225  # Skip rule
226  if (pub.name in self.prev_seq_ids.keys() and
227  messages[pub.name].header.seq == self.prev_seq_ids[pub.name]):
228  rospy.loginfo("skip publishing %s " % (pub.name))
229  pass
230  else:
231  rospy.loginfo("messages[%s].header.seq: %d"% (pub.name,
232  messages[pub.name].header.seq))
233  rospy.loginfo("self.prev_seq_ids[%s]: %d" % (pub.name,
234  messages[pub.name].header.seq))
235  self.prev_seq_ids[pub.name] = messages[pub.name].header.seq
236  at_lest_one_topic = True
237  if at_lest_one_topic:
238  for pub in synchronized_publishers:
239  pub.publish(messages[pub.name])
240  else:
241  rospy.logerr("missed some packets")
242 
243 
244 if __name__ == "__main__":
245  rospy.init_node("silverhammer_highspeed_receiver")
246  receiver = SilverHammerReceiver()
247  rospy.on_shutdown(receiver.on_shutdown)
248  receiver.run()
def receive_process_func(packets_queue, receive_ip, receive_port, packet_size, message_class, topic_prefix, pesimistic, fragment_packets_torelance)
def publishersFromMessage(msg, prefix="", latch=False)


jsk_network_tools
Author(s): Yusuke Furuta
autogenerated on Tue Feb 6 2018 03:45:07