silverhammer_highspeed_receiver_with_internal_buffer.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 import rospy
4 from jsk_network_tools.msg import FC2OCSLargeData, SilverhammerInternalBuffer
6 from threading import Lock
7 from StringIO import StringIO
8 from std_msgs.msg import Time
9 from io import BytesIO
10 from socket import *
11 from struct import pack
12 import diagnostic_updater
13 from diagnostic_msgs.msg import DiagnosticStatus
14 import roslib
15 from roslib.message import get_message_class
16 
17 #ifdef USE_THREAD
18 # from threading import Thread
19 # from Queue import Queue
20 #else
21 from multiprocessing import Process, Queue
22 #endif
23 
25  def __init__(self):
26  message_class_str = rospy.get_param("~message",
27  "jsk_network_tools/FC2OCSLargeData")
28  try:
29  self.message_class = get_message_class(message_class_str)
30  except:
31  raise Exception("invalid topic type: %s"%message_class_str)
32  self.lock = Lock()
34  self.diagnostic_updater.setHardwareID("none")
35  self.diagnostic_updater.add("HighspeedReceiver", self.diagnosticCallback)
36  self.latch = rospy.get_param("~latch", True)
37  self.pesimistic = rospy.get_param("~pesimistic", False)
38  self.fragment_packets_torelance = rospy.get_param("~fragment_packets_torelance", 20)
39  self.timestamp_overwrite_topics = rospy.get_param("~timestamp_overwrite_topics", [])
40  self.publish_only_if_updated_topics = rospy.get_param("~publish_only_if_updated_topics", [])
41  self.prev_seq_ids = {}
42  self.receive_port = rospy.get_param("~receive_port", 16484)
43  self.receive_ip = rospy.get_param("~receive_ip", "localhost")
44  self.topic_prefix = rospy.get_param("~topic_prefix", "/from_fc")
45  if not self.topic_prefix.startswith("/"):
46  self.topic_prefix = "/" + self.topic_prefix
47  if self.topic_prefix == "/":
48  self.topic_prefix = ""
50  self.topic_prefix,
51  latch=self.latch)
52  self.packet_size = rospy.get_param("~packet_size", 1400) #2Hz
53  self.launched_time = rospy.Time.now()
54  self.last_received_time = rospy.Time(0)
55  self.last_received_time_pub = rospy.Publisher("~last_received_time", Time)
57  self.diagnostic_timer = rospy.Timer(rospy.Duration(1.0 / 10),
59  self.packets_queue = Queue()
60  def diagnosticCallback(self, stat):
61  # always OK
62  stat.summary(DiagnosticStatus.OK, "OK")
63  with self.lock:
64  now = rospy.Time.now()
65  stat.add("Uptime [sec]",
66  (now - self.launched_time).to_sec())
67  stat.add("Time from last input [sec]",
68  (now - self.last_received_time).to_sec())
69  stat.add("UDP address", self.receive_ip)
70  stat.add("UDP port", self.receive_port)
71  return stat
72  def diagnosticTimerCallback(self, event):
73  self.diagnostic_updater.update()
74  with self.lock:
75  self.last_received_time_pub.publish(self.last_received_time)
76  def run(self):
77  sub = rospy.Subscriber("~input", SilverhammerInternalBuffer, self.callback)
78  rospy.spin()
79  def callback(self, msg):
80  b = StringIO()
81  b.write(msg.data)
82  deserialized_data = []
83  rospy.loginfo("data size: %d" % len(msg.data))
84  rospy.msg.deserialize_messages(b, deserialized_data,
85  self.message_class)
86  rospy.loginfo("received %d message" % len(deserialized_data))
87 
88  if len(deserialized_data) > 0:
89  # publish data
90  msg = deserialized_data[0]
91  messages = decomposeLargeMessage(msg, self.topic_prefix)
92  now = rospy.Time.now()
93  for pub in self.publishers:
94  if pub.name in messages:
95  if not pub.name in self.publish_only_if_updated_topics:
96  rospy.loginfo("publishing %s" % pub.name)
97  if pub.name in self.timestamp_overwrite_topics:
98  if (hasattr(messages[pub.name], "header") and
99  hasattr(messages[pub.name].header, "stamp")):
100  messages[pub.name].header.stamp = now
101  pub.publish(messages[pub.name])
102  #pub.publish(messages[pub.name])
103  else:
104  rospy.logwarn("""cannot find '%s' in deserialized messages %s""" % (pub.name, messages.keys()))
105  synchronized_publishers = []
106  at_lest_one_topic = False
107  # Check if there is any topic to update
108  for pub in self.publishers:
109  if pub.name in messages:
110  if pub.name in self.publish_only_if_updated_topics:
111  synchronized_publishers.append(pub)
112  if (hasattr(messages[pub.name], "header") and
113  hasattr(messages[pub.name].header, "stamp")):
114  messages[pub.name].header.stamp = now # Overwrite with the same timestamp
115  if (hasattr(messages[pub.name], "header") and
116  hasattr(messages[pub.name].header, "seq")):
117  # Skip rule
118  if (pub.name in self.prev_seq_ids.keys() and
119  messages[pub.name].header.seq == self.prev_seq_ids[pub.name]):
120  rospy.loginfo("skip publishing %s " % (pub.name))
121  pass
122  else:
123  rospy.loginfo("messages[%s].header.seq: %d"% (pub.name,
124  messages[pub.name].header.seq))
125  rospy.loginfo("self.prev_seq_ids[%s]: %d" % (pub.name,
126  messages[pub.name].header.seq))
127  self.prev_seq_ids[pub.name] = messages[pub.name].header.seq
128  at_lest_one_topic = True
129  if at_lest_one_topic:
130  for pub in synchronized_publishers:
131  pub.publish(messages[pub.name])
132  else:
133  rospy.logerr("missed some packets")
134 
135 
136 if __name__ == "__main__":
137  rospy.init_node("silverhammer_highspeed_receiver")
138  receiver = SilverHammerReceiver()
139  receiver.run()
def publishersFromMessage(msg, prefix="", latch=False)


jsk_network_tools
Author(s): Yusuke Furuta
autogenerated on Tue Feb 6 2018 03:45:07