silverhammer_highspeed_receiver.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 
00003 import rospy
00004 from jsk_network_tools.msg import FC2OCSLargeData
00005 from jsk_network_tools.silverhammer_util import *
00006 from threading import Lock
00007 from StringIO import StringIO
00008 from io import BytesIO
00009 from socket import *
00010 from struct import pack
00011 
00012 import roslib
00013 from roslib.message import get_message_class
00014 
00015 class SilverHammerReceiver:
00016     def __init__(self):
00017         message_class_str = rospy.get_param("~message", 
00018                                             "jsk_network_tools/FC2OCSLargeData")
00019         try:
00020             self.message_class = get_message_class(message_class_str)
00021         except:
00022             raise Exception("invalid topic type: %s"%message_class_str)
00023         self.lock = Lock()
00024         self.receive_port = rospy.get_param("~receive_port", 16484)
00025         self.receive_ip = rospy.get_param("~receive_ip", "localhost")
00026         self.topic_prefix = rospy.get_param("~topic_prefix", "/from_fc")
00027         if not self.topic_prefix.startswith("/"):
00028             self.topic_prefix = "/" + self.topic_prefix
00029         if self.topic_prefix == "/":
00030             self.topic_prefix = ""
00031         self.publishers = publishersFromMessage(self.message_class, self.topic_prefix)
00032         self.socket_server = socket(AF_INET, SOCK_DGRAM)
00033         self.socket_server.bind((self.receive_ip, self.receive_port))
00034         self.packet_size = rospy.get_param("~packet_size", 1000)   #2Hz
00035         self.packets = []
00036     def run(self):
00037         while not rospy.is_shutdown():
00038             recv_data, addr = self.socket_server.recvfrom(self.packet_size)
00039             packet = LargeDataUDPPacket.fromData(recv_data, self.packet_size)
00040             self.packets.append(packet)
00041             if packet.num - 1 == packet.id:
00042                 # the end of packet
00043                 try:
00044                     self.concatenatePackets()
00045                 except Exception,e:
00046                     rospy.logerr("failed to concatenate packets: %s", e.message)
00047                 finally:
00048                     self.packets = []
00049             elif packet.seq_id != self.packets[-1].seq_id:
00050                 rospy.logerr("packet lossed!")
00051                 self.packets = [packet]   #reset packet
00052     def concatenatePackets(self):
00053         if self.packets[0].id == 0:
00054             seq_id = self.packets[0].seq_id
00055             # all the packet has same seq_id
00056             if len([p for p in self.packets if p.seq_id == seq_id]) == len(self.packets):
00057                 # received in order
00058                 # sort order
00059                 self.packets.sort(key=lambda p: p.id)
00060                 packet_data_length = len(self.packets[0].data)
00061                 packet_index = 0
00062                 b = StringIO()
00063                 if self.packets[0].num != len(self.packets):
00064                     rospy.logwarn("%d packet is missed", self.packets[0].num - len(self.packets))
00065                 for i in range(self.packets[0].num):
00066                     if self.packets[packet_index].id == i:
00067                         packet = self.packets[packet_index]
00068                         b.write(packet.data)
00069                         packet_index = packet_index + 1
00070                     else:
00071                         # fill by dummy data
00072                         b.write(chr(0) * packet_data_length)
00073                 deserialized_data = []
00074                 rospy.msg.deserialize_messages(b, deserialized_data,
00075                                                self.message_class)
00076                 rospy.loginfo("received %d message" % len(deserialized_data))
00077                 if len(deserialized_data) > 0:
00078                     # publish data
00079                     msg = deserialized_data[0]
00080                     messages = decomposeLargeMessage(msg, self.topic_prefix)
00081                     for pub in self.publishers:
00082                         if messages.has_key(pub.name):
00083                             rospy.loginfo("publishing %s" % pub.name)
00084                             pub.publish(messages[pub.name])
00085                         else:
00086                             rospy.logwarn("""cannot find '%s' in deserialized messages %s""" % (pub.name, messages.keys()))
00087                 # else:
00088                 #     rospy.logerr("order is not correct")
00089                 #     rospy.logerr("collected packets: %d, expected packets: %d", len(self.packets), self.packets[0].num)
00090             else:
00091                 rospy.logerr("missed some packets")
00092         else:
00093             rospy.logerr("failed to receive first packet")
00094         
00095 
00096 
00097 if __name__ == "__main__":
00098     rospy.init_node("silverhammer_highspeed_receiver")
00099     receiver = SilverHammerReceiver()
00100     receiver.run()
00101     


jsk_network_tools
Author(s): Yusuke Furuta
autogenerated on Sun Jan 25 2015 00:30:33