00001
00002
00003 import rospy
00004 from jsk_network_tools.msg import FC2OCSLargeData
00005 from jsk_network_tools.silverhammer_util import *
00006 from threading import Lock
00007 from StringIO import StringIO
00008 from std_msgs.msg import Time
00009 from io import BytesIO
00010 from socket import *
00011 from struct import pack
00012 import diagnostic_updater
00013 from diagnostic_msgs.msg import DiagnosticStatus
00014 import roslib
00015 from roslib.message import get_message_class
00016
00017
00018
00019
00020
00021 from multiprocessing import Process, Queue
00022
00023
00024 def receive_process_func(packets_queue, receive_ip, receive_port, packet_size, message_class, topic_prefix, pesimistic, fragment_packets_torelance):
00025
00026 socket_server = socket(AF_INET, SOCK_DGRAM)
00027 socket_server.setsockopt(SOL_SOCKET, SO_RCVBUF, 1564475392)
00028 recv_buffer_size = socket_server.getsockopt(SOL_SOCKET, SO_RCVBUF)
00029 rospy.loginfo("kernel receive socket buffer: %d" % recv_buffer_size)
00030 if recv_buffer_size < 1564475392:
00031 rospy.logwarn("kernel receive socket buffer must be at least 1564475392 bytes.")
00032 import os, rospkg
00033 pkg_path = rospkg.RosPack().get_path('jsk_network_tools')
00034 cmd_path = os.path.join(pkg_path, 'scripts', 'expand_udp_receive_buffer.sh')
00035 rospy.loginfo('changing kernel parameter...')
00036 try:
00037 res = os.system('sudo ' + cmd_path)
00038 if res == 0:
00039 rospy.loginfo('successfly changed kernel parameter')
00040 else:
00041 raise Exception('return code is non zero')
00042 except Exception as e:
00043 rospy.logerr('failed to change expand kernel udp receive buffer size.')
00044 rospy.logwarn('maybe you don\'t yet add NOPASSWD attribute to sudo user group?')
00045 rospy.logwarn('try to change from "%sudo ALL=(ALL) ALL" to "%sudo ALL=(ALL) NOPASSWD:ALL"')
00046 rospy.loginfo('continuing without existing buffer size.')
00047
00048 rospy.logwarn("try to bind %s:%d" % (receive_ip, receive_port))
00049 socket_server.bind((receive_ip, receive_port))
00050
00051 packets = {}
00052 last_received_data_seq_id = -1
00053
00054 while not rospy.is_shutdown():
00055 recv_data, addr = socket_server.recvfrom(packet_size)
00056 packet = LargeDataUDPPacket.fromData(recv_data, packet_size)
00057 if packet.seq_id in packets:
00058 packets[packet.seq_id] += [packet]
00059 else:
00060 packets[packet.seq_id] = [packet]
00061 last_received_data_seq_id = packet.seq_id
00062
00063
00064 for seq_id, pcts in packets.items():
00065 if pcts[0].num == len(pcts):
00066 packets_queue.put(packets.pop(seq_id))
00067
00068
00069 for seq_id, pcts in packets.items():
00070 if seq_id < last_received_data_seq_id - fragment_packets_torelance and len(pcts) != pcts[0].num:
00071 if pesimistic:
00072 rospy.logwarn("packets(seq: %d, recv: %d/%d) are pruned." % (seq_id, len(pcts), pcts[0].num))
00073 del packets[seq_id]
00074 else:
00075 rospy.logwarn("packets(seq: %d, recv: %d/%d) are incomplete data." % (seq_id, len(pcts), pcts[0].num))
00076 packets_queue.put(packets.pop(seq_id))
00077
00078
00079 class SilverHammerReceiver:
00080 def __init__(self):
00081 message_class_str = rospy.get_param("~message",
00082 "jsk_network_tools/FC2OCSLargeData")
00083 try:
00084 self.message_class = get_message_class(message_class_str)
00085 except:
00086 raise Exception("invalid topic type: %s"%message_class_str)
00087 self.lock = Lock()
00088 self.diagnostic_updater = diagnostic_updater.Updater()
00089 self.diagnostic_updater.setHardwareID("none")
00090 self.diagnostic_updater.add("HighspeedReceiver", self.diagnosticCallback)
00091 self.latch = rospy.get_param("~latch", True)
00092 self.pesimistic = rospy.get_param("~pesimistic", False)
00093 self.fragment_packets_torelance = rospy.get_param("~fragment_packets_torelance", 20)
00094 self.timestamp_overwrite_topics = rospy.get_param("~timestamp_overwrite_topics", [])
00095 self.publish_only_if_updated_topics = rospy.get_param("~publish_only_if_updated_topics", [])
00096 self.prev_seq_ids = {}
00097 self.receive_port = rospy.get_param("~receive_port", 16484)
00098 self.receive_ip = rospy.get_param("~receive_ip", "localhost")
00099 self.topic_prefix = rospy.get_param("~topic_prefix", "/from_fc")
00100 if not self.topic_prefix.startswith("/"):
00101 self.topic_prefix = "/" + self.topic_prefix
00102 if self.topic_prefix == "/":
00103 self.topic_prefix = ""
00104 self.publishers = publishersFromMessage(self.message_class,
00105 self.topic_prefix,
00106 latch=self.latch)
00107 self.packet_size = rospy.get_param("~packet_size", 1400)
00108 self.launched_time = rospy.Time.now()
00109 self.last_received_time = rospy.Time(0)
00110 self.last_received_time_pub = rospy.Publisher("~last_received_time", Time)
00111 self.last_published_seq_id = -1
00112 self.diagnostic_timer = rospy.Timer(rospy.Duration(1.0 / 10),
00113 self.diagnosticTimerCallback)
00114 self.packets_queue = Queue()
00115
00116
00117
00118 self.receive_process = Process(target=receive_process_func, args=(self.packets_queue,
00119
00120 self.receive_ip,
00121 self.receive_port,
00122 self.packet_size,
00123 self.message_class,
00124 self.topic_prefix,
00125 self.pesimistic,
00126 self.fragment_packets_torelance))
00127 def on_shutdown(self):
00128 if self.receive_process.is_alive():
00129 try:
00130 self.receive_process.terminate()
00131 self.receive_process.join(timeout=3)
00132 if self.receive_process.is_alive():
00133 raise
00134 except Exception as e:
00135 if "no attribute 'terminate'" in e.message:
00136 return
00137 pid = self.receive_process.pid
00138 rospy.logerr("failed to terminate process %d: %s" % (pid, e))
00139 try:
00140 rospy.loginfo("trying to kill process %d" % pid)
00141 import os, signal
00142 os.kill(pid, signal.SIGKILL)
00143 except Exception as ex:
00144 rospy.logfatal("failed to kill process %d: %s." % (pid, ex))
00145 rospy.logfatal("It will be zombie process" % pid)
00146
00147 def diagnosticCallback(self, stat):
00148
00149 stat.summary(DiagnosticStatus.OK, "OK")
00150 with self.lock:
00151 now = rospy.Time.now()
00152 stat.add("Uptime [sec]",
00153 (now - self.launched_time).to_sec())
00154 stat.add("Time from last input [sec]",
00155 (now - self.last_received_time).to_sec())
00156 stat.add("UDP address", self.receive_ip)
00157 stat.add("UDP port", self.receive_port)
00158 return stat
00159 def diagnosticTimerCallback(self, event):
00160 self.diagnostic_updater.update()
00161 with self.lock:
00162 self.last_received_time_pub.publish(self.last_received_time)
00163 def run(self):
00164 self.receive_process.start()
00165 while not rospy.is_shutdown():
00166 rospy.loginfo("%d packets are in queue" % self.packets_queue.qsize())
00167 packets = self.packets_queue.get()
00168 packets.sort(key=lambda p: p.id)
00169 self.last_received_time = rospy.Time.now()
00170 try:
00171 self.concatenatePackets(packets)
00172 except Exception,e:
00173 rospy.logerr("failed to concatenate packets: %s", e.message)
00174 else:
00175 self.receive_process.join()
00176
00177 def concatenatePackets(self, packets):
00178 if packets[0].seq_id < self.last_published_seq_id:
00179 rospy.logwarn("publishing out-of-ordered packets %d -> %d" % (self.last_published_seq_id, packets[0].seq_id))
00180 self.last_published_seq_id = packets[0].seq_id
00181 packet_data_length = len(packets[0].data)
00182 packet_index = 0
00183 b = StringIO()
00184 for i in range(packets[0].num):
00185 if packets[packet_index].id == i:
00186 packet = packets[packet_index]
00187 b.write(packet.data)
00188 packet_index = packet_index + 1
00189 else:
00190
00191 b.write(chr(0) * packet_data_length)
00192 deserialized_data = []
00193 rospy.msg.deserialize_messages(b, deserialized_data,
00194 self.message_class)
00195 rospy.loginfo("received %d message" % len(deserialized_data))
00196 if len(deserialized_data) > 0:
00197
00198 msg = deserialized_data[0]
00199 messages = decomposeLargeMessage(msg, self.topic_prefix)
00200 now = rospy.Time.now()
00201 for pub in self.publishers:
00202 if pub.name in messages:
00203 if not pub.name in self.publish_only_if_updated_topics:
00204 rospy.loginfo("publishing %s" % pub.name)
00205 if pub.name in self.timestamp_overwrite_topics:
00206 if (hasattr(messages[pub.name], "header") and
00207 hasattr(messages[pub.name].header, "stamp")):
00208 messages[pub.name].header.stamp = now
00209 pub.publish(messages[pub.name])
00210
00211 else:
00212 rospy.logwarn("""cannot find '%s' in deserialized messages %s""" % (pub.name, messages.keys()))
00213 synchronized_publishers = []
00214 at_lest_one_topic = False
00215
00216 for pub in self.publishers:
00217 if pub.name in messages:
00218 if pub.name in self.publish_only_if_updated_topics:
00219 synchronized_publishers.append(pub)
00220 if (hasattr(messages[pub.name], "header") and
00221 hasattr(messages[pub.name].header, "stamp")):
00222 messages[pub.name].header.stamp = now
00223 if (hasattr(messages[pub.name], "header") and
00224 hasattr(messages[pub.name].header, "seq")):
00225
00226 if (pub.name in self.prev_seq_ids.keys() and
00227 messages[pub.name].header.seq == self.prev_seq_ids[pub.name]):
00228 rospy.loginfo("skip publishing %s " % (pub.name))
00229 pass
00230 else:
00231 rospy.loginfo("messages[%s].header.seq: %d"% (pub.name,
00232 messages[pub.name].header.seq))
00233 rospy.loginfo("self.prev_seq_ids[%s]: %d" % (pub.name,
00234 messages[pub.name].header.seq))
00235 self.prev_seq_ids[pub.name] = messages[pub.name].header.seq
00236 at_lest_one_topic = True
00237 if at_lest_one_topic:
00238 for pub in synchronized_publishers:
00239 pub.publish(messages[pub.name])
00240 else:
00241 rospy.logerr("missed some packets")
00242
00243
00244 if __name__ == "__main__":
00245 rospy.init_node("silverhammer_highspeed_receiver")
00246 receiver = SilverHammerReceiver()
00247 rospy.on_shutdown(receiver.on_shutdown)
00248 receiver.run()