Go to the documentation of this file.00001
00002
00003 import rospy
00004 from jsk_network_tools.msg import FC2OCSLargeData
00005 from jsk_network_tools.silverhammer_util import *
00006 from threading import Lock
00007 from StringIO import StringIO
00008 from io import BytesIO
00009 from socket import *
00010 from struct import pack
00011
00012 import roslib
00013 from roslib.message import get_message_class
00014
00015 class SilverHammerReceiver:
00016 def __init__(self):
00017 message_class_str = rospy.get_param("~message",
00018 "jsk_network_tools/FC2OCSLargeData")
00019 try:
00020 self.message_class = get_message_class(message_class_str)
00021 except:
00022 raise Exception("invalid topic type: %s"%message_class_str)
00023 self.lock = Lock()
00024 self.receive_port = rospy.get_param("~receive_port", 16484)
00025 self.receive_ip = rospy.get_param("~receive_ip", "localhost")
00026 self.topic_prefix = rospy.get_param("~topic_prefix", "/from_fc")
00027 if not self.topic_prefix.startswith("/"):
00028 self.topic_prefix = "/" + self.topic_prefix
00029 if self.topic_prefix == "/":
00030 self.topic_prefix = ""
00031 self.publishers = publishersFromMessage(self.message_class, self.topic_prefix)
00032 self.socket_server = socket(AF_INET, SOCK_DGRAM)
00033 self.socket_server.bind((self.receive_ip, self.receive_port))
00034 self.packet_size = rospy.get_param("~packet_size", 1000)
00035 self.packets = []
00036 def run(self):
00037 while not rospy.is_shutdown():
00038 recv_data, addr = self.socket_server.recvfrom(self.packet_size)
00039 packet = LargeDataUDPPacket.fromData(recv_data, self.packet_size)
00040 self.packets.append(packet)
00041 if packet.num - 1 == packet.id:
00042
00043 try:
00044 self.concatenatePackets()
00045 except Exception,e:
00046 rospy.logerr("failed to concatenate packets: %s", e.message)
00047 finally:
00048 self.packets = []
00049 elif packet.seq_id != self.packets[-1].seq_id:
00050 rospy.logerr("packet lossed!")
00051 self.packets = [packet]
00052 def concatenatePackets(self):
00053 if self.packets[0].id == 0:
00054 seq_id = self.packets[0].seq_id
00055
00056 if len([p for p in self.packets if p.seq_id == seq_id]) == len(self.packets):
00057
00058
00059 self.packets.sort(key=lambda p: p.id)
00060 packet_data_length = len(self.packets[0].data)
00061 packet_index = 0
00062 b = StringIO()
00063 if self.packets[0].num != len(self.packets):
00064 rospy.logwarn("%d packet is missed", self.packets[0].num - len(self.packets))
00065 for i in range(self.packets[0].num):
00066 if self.packets[packet_index].id == i:
00067 packet = self.packets[packet_index]
00068 b.write(packet.data)
00069 packet_index = packet_index + 1
00070 else:
00071
00072 b.write(chr(0) * packet_data_length)
00073 deserialized_data = []
00074 rospy.msg.deserialize_messages(b, deserialized_data,
00075 self.message_class)
00076 rospy.loginfo("received %d message" % len(deserialized_data))
00077 if len(deserialized_data) > 0:
00078
00079 msg = deserialized_data[0]
00080 messages = decomposeLargeMessage(msg, self.topic_prefix)
00081 for pub in self.publishers:
00082 if messages.has_key(pub.name):
00083 rospy.loginfo("publishing %s" % pub.name)
00084 pub.publish(messages[pub.name])
00085 else:
00086 rospy.logwarn("""cannot find '%s' in deserialized messages %s""" % (pub.name, messages.keys()))
00087
00088
00089
00090 else:
00091 rospy.logerr("missed some packets")
00092 else:
00093 rospy.logerr("failed to receive first packet")
00094
00095
00096
00097 if __name__ == "__main__":
00098 rospy.init_node("silverhammer_highspeed_receiver")
00099 receiver = SilverHammerReceiver()
00100 receiver.run()
00101