silverhammer_highspeed_receiver.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 
00003 import rospy
00004 from jsk_network_tools.msg import FC2OCSLargeData
00005 from jsk_network_tools.silverhammer_util import *
00006 from threading import Lock
00007 from StringIO import StringIO
00008 from std_msgs.msg import Time
00009 from io import BytesIO
00010 from socket import *
00011 from struct import pack
00012 import diagnostic_updater
00013 from diagnostic_msgs.msg import DiagnosticStatus
00014 import roslib
00015 from roslib.message import get_message_class
00016 
00017 #ifdef USE_THREAD
00018 # from threading import Thread
00019 # from Queue import Queue
00020 #else
00021 from multiprocessing import Process, Queue
00022 #endif
00023 
00024 def receive_process_func(packets_queue, receive_ip, receive_port, packet_size, message_class, topic_prefix, pesimistic, fragment_packets_torelance):
00025     # create socket
00026     socket_server = socket(AF_INET, SOCK_DGRAM)
00027     socket_server.setsockopt(SOL_SOCKET, SO_RCVBUF, 1564475392)
00028     recv_buffer_size = socket_server.getsockopt(SOL_SOCKET, SO_RCVBUF)
00029     rospy.loginfo("kernel receive socket buffer: %d" % recv_buffer_size)
00030     if recv_buffer_size < 1564475392:
00031         rospy.logwarn("kernel receive socket buffer must be at least 1564475392 bytes.")
00032         import os, rospkg
00033         pkg_path = rospkg.RosPack().get_path('jsk_network_tools')
00034         cmd_path = os.path.join(pkg_path, 'scripts', 'expand_udp_receive_buffer.sh')
00035         rospy.loginfo('changing kernel parameter...')
00036         try:
00037             res = os.system('sudo ' + cmd_path)
00038             if res == 0:
00039                 rospy.loginfo('successfly changed kernel parameter')
00040             else:
00041                 raise Exception('return code is non zero')
00042         except Exception as e:
00043             rospy.logerr('failed to change expand kernel udp receive buffer size.')
00044             rospy.logwarn('maybe you don\'t yet add NOPASSWD attribute to sudo user group?')
00045             rospy.logwarn('try to change from "%sudo ALL=(ALL) ALL" to "%sudo ALL=(ALL) NOPASSWD:ALL"')
00046         rospy.loginfo('continuing without existing buffer size.')
00047 
00048     rospy.logwarn("try to bind %s:%d" % (receive_ip, receive_port))
00049     socket_server.bind((receive_ip, receive_port))
00050 
00051     packets = {}
00052     last_received_data_seq_id = -1
00053 
00054     while not rospy.is_shutdown():
00055         recv_data, addr = socket_server.recvfrom(packet_size)
00056         packet = LargeDataUDPPacket.fromData(recv_data, packet_size)
00057         if packet.seq_id in packets:
00058             packets[packet.seq_id] += [packet]
00059         else:
00060             packets[packet.seq_id] = [packet]
00061         last_received_data_seq_id = packet.seq_id
00062 
00063         # enqueue complete packets
00064         for seq_id, pcts in packets.items():
00065             if pcts[0].num == len(pcts):
00066                 packets_queue.put(packets.pop(seq_id))
00067 
00068         # prune incomplete packets
00069         for seq_id, pcts in packets.items():
00070             if seq_id < last_received_data_seq_id - fragment_packets_torelance and len(pcts) != pcts[0].num:
00071                 if pesimistic:
00072                     rospy.logwarn("packets(seq: %d, recv: %d/%d) are pruned." % (seq_id, len(pcts), pcts[0].num))
00073                     del packets[seq_id]
00074                 else:
00075                     rospy.logwarn("packets(seq: %d, recv: %d/%d) are incomplete data." % (seq_id, len(pcts), pcts[0].num))
00076                     packets_queue.put(packets.pop(seq_id))
00077 
00078 
00079 class SilverHammerReceiver:
00080     def __init__(self):
00081         message_class_str = rospy.get_param("~message", 
00082                                             "jsk_network_tools/FC2OCSLargeData")
00083         try:
00084             self.message_class = get_message_class(message_class_str)
00085         except:
00086             raise Exception("invalid topic type: %s"%message_class_str)
00087         self.lock = Lock()
00088         self.diagnostic_updater = diagnostic_updater.Updater()
00089         self.diagnostic_updater.setHardwareID("none")
00090         self.diagnostic_updater.add("HighspeedReceiver", self.diagnosticCallback)
00091         self.latch = rospy.get_param("~latch", True)
00092         self.pesimistic = rospy.get_param("~pesimistic", False)
00093         self.fragment_packets_torelance = rospy.get_param("~fragment_packets_torelance", 20)
00094         self.timestamp_overwrite_topics = rospy.get_param("~timestamp_overwrite_topics", [])
00095         self.publish_only_if_updated_topics = rospy.get_param("~publish_only_if_updated_topics", [])
00096         self.prev_seq_ids = {}
00097         self.receive_port = rospy.get_param("~receive_port", 16484)
00098         self.receive_ip = rospy.get_param("~receive_ip", "localhost")
00099         self.topic_prefix = rospy.get_param("~topic_prefix", "/from_fc")
00100         if not self.topic_prefix.startswith("/"):
00101             self.topic_prefix = "/" + self.topic_prefix
00102         if self.topic_prefix == "/":
00103             self.topic_prefix = ""
00104         self.publishers = publishersFromMessage(self.message_class,
00105                                                 self.topic_prefix, 
00106                                                 latch=self.latch)
00107         self.packet_size = rospy.get_param("~packet_size", 1400)   #2Hz
00108         self.launched_time = rospy.Time.now()
00109         self.last_received_time = rospy.Time(0)
00110         self.last_received_time_pub = rospy.Publisher("~last_received_time", Time)
00111         self.last_published_seq_id = -1
00112         self.diagnostic_timer = rospy.Timer(rospy.Duration(1.0 / 10),
00113                                             self.diagnosticTimerCallback)
00114         self.packets_queue = Queue()
00115 #ifdef USE_THREAD
00116 #        self.receive_process = Thread(target=receive_process_func, args=(self.packets_queue,
00117 #else
00118         self.receive_process = Process(target=receive_process_func, args=(self.packets_queue,
00119 #endif
00120                                                                           self.receive_ip,
00121                                                                           self.receive_port,
00122                                                                           self.packet_size,
00123                                                                           self.message_class,
00124                                                                           self.topic_prefix,
00125                                                                           self.pesimistic,
00126                                                                           self.fragment_packets_torelance))
00127     def on_shutdown(self):
00128         if self.receive_process.is_alive():
00129             try:
00130                 self.receive_process.terminate()
00131                 self.receive_process.join(timeout=3)
00132                 if self.receive_process.is_alive():
00133                     raise
00134             except Exception as e:
00135                 if "no attribute 'terminate'" in e.message:
00136                     return
00137                 pid = self.receive_process.pid
00138                 rospy.logerr("failed to terminate process %d: %s" % (pid, e))
00139                 try:
00140                     rospy.loginfo("trying to kill process %d" % pid)
00141                     import os, signal
00142                     os.kill(pid, signal.SIGKILL)
00143                 except Exception as ex:
00144                     rospy.logfatal("failed to kill process %d: %s." % (pid, ex))
00145                     rospy.logfatal("It will be zombie process" % pid)
00146 
00147     def diagnosticCallback(self, stat):
00148         # always OK
00149         stat.summary(DiagnosticStatus.OK, "OK")
00150         with self.lock:
00151             now = rospy.Time.now()
00152             stat.add("Uptime [sec]",
00153                      (now - self.launched_time).to_sec())
00154             stat.add("Time from last input [sec]", 
00155                      (now - self.last_received_time).to_sec())
00156             stat.add("UDP address", self.receive_ip)
00157             stat.add("UDP port", self.receive_port)
00158         return stat
00159     def diagnosticTimerCallback(self, event):
00160         self.diagnostic_updater.update()
00161         with self.lock:
00162             self.last_received_time_pub.publish(self.last_received_time)
00163     def run(self):
00164         self.receive_process.start()
00165         while not rospy.is_shutdown():
00166             rospy.loginfo("%d packets are in queue" % self.packets_queue.qsize())
00167             packets = self.packets_queue.get()
00168             packets.sort(key=lambda p: p.id)
00169             self.last_received_time = rospy.Time.now()
00170             try:
00171                 self.concatenatePackets(packets)
00172             except Exception,e:
00173                 rospy.logerr("failed to concatenate packets: %s", e.message)
00174         else:
00175             self.receive_process.join()
00176 
00177     def concatenatePackets(self, packets):
00178         if packets[0].seq_id < self.last_published_seq_id:
00179             rospy.logwarn("publishing out-of-ordered packets %d -> %d" % (self.last_published_seq_id, packets[0].seq_id))
00180         self.last_published_seq_id = packets[0].seq_id
00181         packet_data_length = len(packets[0].data)
00182         packet_index = 0
00183         b = StringIO()
00184         for i in range(packets[0].num):
00185             if packets[packet_index].id == i:
00186                 packet = packets[packet_index]
00187                 b.write(packet.data)
00188                 packet_index = packet_index + 1
00189             else:
00190                 # fill by dummy data
00191                 b.write(chr(0) * packet_data_length)
00192         deserialized_data = []
00193         rospy.msg.deserialize_messages(b, deserialized_data,
00194                                        self.message_class)
00195         rospy.loginfo("received %d message" % len(deserialized_data))
00196         if len(deserialized_data) > 0:
00197             # publish data
00198             msg = deserialized_data[0]
00199             messages = decomposeLargeMessage(msg, self.topic_prefix)
00200             now = rospy.Time.now()
00201             for pub in self.publishers:
00202                 if pub.name in messages:
00203                     if not pub.name in self.publish_only_if_updated_topics:
00204                         rospy.loginfo("publishing %s" % pub.name)
00205                         if pub.name in self.timestamp_overwrite_topics:
00206                             if (hasattr(messages[pub.name], "header") and
00207                                 hasattr(messages[pub.name].header, "stamp")):
00208                                 messages[pub.name].header.stamp = now
00209                         pub.publish(messages[pub.name])
00210                     #pub.publish(messages[pub.name])
00211                 else:
00212                     rospy.logwarn("""cannot find '%s' in deserialized messages %s""" % (pub.name, messages.keys()))
00213             synchronized_publishers = []
00214             at_lest_one_topic = False
00215             # Check if there is any topic to update
00216             for pub in self.publishers:
00217                 if pub.name in messages:
00218                     if pub.name in self.publish_only_if_updated_topics:
00219                         synchronized_publishers.append(pub)
00220                         if (hasattr(messages[pub.name], "header") and
00221                             hasattr(messages[pub.name].header, "stamp")):
00222                             messages[pub.name].header.stamp = now # Overwrite with the same timestamp
00223                         if (hasattr(messages[pub.name], "header") and
00224                             hasattr(messages[pub.name].header, "seq")):
00225                             # Skip rule
00226                             if (pub.name in self.prev_seq_ids.keys() and 
00227                                 messages[pub.name].header.seq == self.prev_seq_ids[pub.name]):
00228                                 rospy.loginfo("skip publishing %s " % (pub.name))
00229                                 pass
00230                             else:
00231                                 rospy.loginfo("messages[%s].header.seq: %d"% (pub.name,
00232                                                                               messages[pub.name].header.seq))
00233                                 rospy.loginfo("self.prev_seq_ids[%s]: %d"  % (pub.name,
00234                                                                               messages[pub.name].header.seq))
00235                                 self.prev_seq_ids[pub.name] = messages[pub.name].header.seq
00236                                 at_lest_one_topic = True
00237             if at_lest_one_topic:
00238                 for pub in synchronized_publishers:
00239                     pub.publish(messages[pub.name])
00240         else:
00241             rospy.logerr("missed some packets")
00242 
00243 
00244 if __name__ == "__main__":
00245     rospy.init_node("silverhammer_highspeed_receiver")
00246     receiver = SilverHammerReceiver()
00247     rospy.on_shutdown(receiver.on_shutdown)
00248     receiver.run()


jsk_network_tools
Author(s): Yusuke Furuta
autogenerated on Fri Sep 8 2017 03:38:47