silverhammer_highspeed_receiver_with_internal_buffer.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 
00003 import rospy
00004 from jsk_network_tools.msg import FC2OCSLargeData, SilverhammerInternalBuffer
00005 from jsk_network_tools.silverhammer_util import *
00006 from threading import Lock
00007 from StringIO import StringIO
00008 from std_msgs.msg import Time
00009 from io import BytesIO
00010 from socket import *
00011 from struct import pack
00012 import diagnostic_updater
00013 from diagnostic_msgs.msg import DiagnosticStatus
00014 import roslib
00015 from roslib.message import get_message_class
00016 
00017 #ifdef USE_THREAD
00018 # from threading import Thread
00019 # from Queue import Queue
00020 #else
00021 from multiprocessing import Process, Queue
00022 #endif
00023 
00024 class SilverHammerReceiver:
00025     def __init__(self):
00026         message_class_str = rospy.get_param("~message", 
00027                                             "jsk_network_tools/FC2OCSLargeData")
00028         try:
00029             self.message_class = get_message_class(message_class_str)
00030         except:
00031             raise Exception("invalid topic type: %s"%message_class_str)
00032         self.lock = Lock()
00033         self.diagnostic_updater = diagnostic_updater.Updater()
00034         self.diagnostic_updater.setHardwareID("none")
00035         self.diagnostic_updater.add("HighspeedReceiver", self.diagnosticCallback)
00036         self.latch = rospy.get_param("~latch", True)
00037         self.pesimistic = rospy.get_param("~pesimistic", False)
00038         self.fragment_packets_torelance = rospy.get_param("~fragment_packets_torelance", 20)
00039         self.timestamp_overwrite_topics = rospy.get_param("~timestamp_overwrite_topics", [])
00040         self.publish_only_if_updated_topics = rospy.get_param("~publish_only_if_updated_topics", [])
00041         self.prev_seq_ids = {}
00042         self.receive_port = rospy.get_param("~receive_port", 16484)
00043         self.receive_ip = rospy.get_param("~receive_ip", "localhost")
00044         self.topic_prefix = rospy.get_param("~topic_prefix", "/from_fc")
00045         if not self.topic_prefix.startswith("/"):
00046             self.topic_prefix = "/" + self.topic_prefix
00047         if self.topic_prefix == "/":
00048             self.topic_prefix = ""
00049         self.publishers = publishersFromMessage(self.message_class,
00050                                                 self.topic_prefix, 
00051                                                 latch=self.latch)
00052         self.packet_size = rospy.get_param("~packet_size", 1400)   #2Hz
00053         self.launched_time = rospy.Time.now()
00054         self.last_received_time = rospy.Time(0)
00055         self.last_received_time_pub = rospy.Publisher("~last_received_time", Time)
00056         self.last_published_seq_id = -1
00057         self.diagnostic_timer = rospy.Timer(rospy.Duration(1.0 / 10),
00058                                             self.diagnosticTimerCallback)
00059         self.packets_queue = Queue()
00060     def diagnosticCallback(self, stat):
00061         # always OK
00062         stat.summary(DiagnosticStatus.OK, "OK")
00063         with self.lock:
00064             now = rospy.Time.now()
00065             stat.add("Uptime [sec]",
00066                      (now - self.launched_time).to_sec())
00067             stat.add("Time from last input [sec]", 
00068                      (now - self.last_received_time).to_sec())
00069             stat.add("UDP address", self.receive_ip)
00070             stat.add("UDP port", self.receive_port)
00071         return stat
00072     def diagnosticTimerCallback(self, event):
00073         self.diagnostic_updater.update()
00074         with self.lock:
00075             self.last_received_time_pub.publish(self.last_received_time)
00076     def run(self):
00077         sub = rospy.Subscriber("~input", SilverhammerInternalBuffer, self.callback)
00078         rospy.spin()
00079     def callback(self, msg):
00080         b = StringIO()
00081         b.write(msg.data)
00082         deserialized_data = []
00083         rospy.loginfo("data size: %d" % len(msg.data))
00084         rospy.msg.deserialize_messages(b, deserialized_data,
00085                                        self.message_class)
00086         rospy.loginfo("received %d message" % len(deserialized_data))
00087         
00088         if len(deserialized_data) > 0:
00089             # publish data
00090             msg = deserialized_data[0]
00091             messages = decomposeLargeMessage(msg, self.topic_prefix)
00092             now = rospy.Time.now()
00093             for pub in self.publishers:
00094                 if pub.name in messages:
00095                     if not pub.name in self.publish_only_if_updated_topics:
00096                         rospy.loginfo("publishing %s" % pub.name)
00097                         if pub.name in self.timestamp_overwrite_topics:
00098                             if (hasattr(messages[pub.name], "header") and
00099                                 hasattr(messages[pub.name].header, "stamp")):
00100                                 messages[pub.name].header.stamp = now
00101                         pub.publish(messages[pub.name])
00102                     #pub.publish(messages[pub.name])
00103                 else:
00104                     rospy.logwarn("""cannot find '%s' in deserialized messages %s""" % (pub.name, messages.keys()))
00105             synchronized_publishers = []
00106             at_lest_one_topic = False
00107             # Check if there is any topic to update
00108             for pub in self.publishers:
00109                 if pub.name in messages:
00110                     if pub.name in self.publish_only_if_updated_topics:
00111                         synchronized_publishers.append(pub)
00112                         if (hasattr(messages[pub.name], "header") and
00113                             hasattr(messages[pub.name].header, "stamp")):
00114                             messages[pub.name].header.stamp = now # Overwrite with the same timestamp
00115                         if (hasattr(messages[pub.name], "header") and
00116                             hasattr(messages[pub.name].header, "seq")):
00117                             # Skip rule
00118                             if (pub.name in self.prev_seq_ids.keys() and 
00119                                 messages[pub.name].header.seq == self.prev_seq_ids[pub.name]):
00120                                 rospy.loginfo("skip publishing %s " % (pub.name))
00121                                 pass
00122                             else:
00123                                 rospy.loginfo("messages[%s].header.seq: %d"% (pub.name,
00124                                                                               messages[pub.name].header.seq))
00125                                 rospy.loginfo("self.prev_seq_ids[%s]: %d"  % (pub.name,
00126                                                                               messages[pub.name].header.seq))
00127                                 self.prev_seq_ids[pub.name] = messages[pub.name].header.seq
00128                                 at_lest_one_topic = True
00129             if at_lest_one_topic:
00130                 for pub in synchronized_publishers:
00131                     pub.publish(messages[pub.name])
00132         else:
00133             rospy.logerr("missed some packets")
00134 
00135 
00136 if __name__ == "__main__":
00137     rospy.init_node("silverhammer_highspeed_receiver")
00138     receiver = SilverHammerReceiver()
00139     receiver.run()


jsk_network_tools
Author(s): Yusuke Furuta
autogenerated on Fri Sep 8 2017 03:38:47