4 from jsk_network_tools.msg
import FC2OCSLargeData
6 from threading
import Lock
7 from StringIO
import StringIO
8 from std_msgs.msg
import Time
11 from struct
import pack
12 import diagnostic_updater
13 from diagnostic_msgs.msg
import DiagnosticStatus
15 from roslib.message
import get_message_class
21 from multiprocessing
import Process, Queue
24 def receive_process_func(packets_queue, receive_ip, receive_port, packet_size, message_class, topic_prefix, pesimistic, fragment_packets_torelance):
26 socket_server =
socket(AF_INET, SOCK_DGRAM)
27 socket_server.setsockopt(SOL_SOCKET, SO_RCVBUF, 1564475392)
28 recv_buffer_size = socket_server.getsockopt(SOL_SOCKET, SO_RCVBUF)
29 rospy.loginfo(
"kernel receive socket buffer: %d" % recv_buffer_size)
30 if recv_buffer_size < 1564475392:
31 rospy.logwarn(
"kernel receive socket buffer must be at least 1564475392 bytes.")
33 pkg_path = rospkg.RosPack().get_path(
'jsk_network_tools')
34 cmd_path = os.path.join(pkg_path,
'scripts',
'expand_udp_receive_buffer.sh')
35 rospy.loginfo(
'changing kernel parameter...')
37 res = os.system(
'sudo ' + cmd_path)
39 rospy.loginfo(
'successfly changed kernel parameter')
41 raise Exception(
'return code is non zero')
42 except Exception
as e:
43 rospy.logerr(
'failed to change expand kernel udp receive buffer size.')
44 rospy.logwarn(
'maybe you don\'t yet add NOPASSWD attribute to sudo user group?')
45 rospy.logwarn(
'try to change from "%sudo ALL=(ALL) ALL" to "%sudo ALL=(ALL) NOPASSWD:ALL"')
46 rospy.loginfo(
'continuing without existing buffer size.')
48 rospy.logwarn(
"try to bind %s:%d" % (receive_ip, receive_port))
49 socket_server.bind((receive_ip, receive_port))
52 last_received_data_seq_id = -1
54 while not rospy.is_shutdown():
55 recv_data, addr = socket_server.recvfrom(packet_size)
56 packet = LargeDataUDPPacket.fromData(recv_data, packet_size)
57 if packet.seq_id
in packets:
58 packets[packet.seq_id] += [packet]
60 packets[packet.seq_id] = [packet]
61 last_received_data_seq_id = packet.seq_id
64 for seq_id, pcts
in packets.items():
65 if pcts[0].num == len(pcts):
66 packets_queue.put(packets.pop(seq_id))
69 for seq_id, pcts
in packets.items():
70 if seq_id < last_received_data_seq_id - fragment_packets_torelance
and len(pcts) != pcts[0].num:
72 rospy.logwarn(
"packets(seq: %d, recv: %d/%d) are pruned." % (seq_id, len(pcts), pcts[0].num))
75 rospy.logwarn(
"packets(seq: %d, recv: %d/%d) are incomplete data." % (seq_id, len(pcts), pcts[0].num))
76 packets_queue.put(packets.pop(seq_id))
81 message_class_str = rospy.get_param(
"~message",
82 "jsk_network_tools/FC2OCSLargeData")
86 raise Exception(
"invalid topic type: %s"%message_class_str)
89 self.diagnostic_updater.setHardwareID(
"none")
91 self.
latch = rospy.get_param(
"~latch",
True)
98 self.
receive_ip = rospy.get_param(
"~receive_ip",
"localhost")
100 if not self.topic_prefix.startswith(
"/"):
128 if self.receive_process.is_alive():
130 self.receive_process.terminate()
131 self.receive_process.join(timeout=3)
132 if self.receive_process.is_alive():
134 except Exception
as e:
135 if "no attribute 'terminate'" in e.message:
137 pid = self.receive_process.pid
138 rospy.logerr(
"failed to terminate process %d: %s" % (pid, e))
140 rospy.loginfo(
"trying to kill process %d" % pid)
142 os.kill(pid, signal.SIGKILL)
143 except Exception
as ex:
144 rospy.logfatal(
"failed to kill process %d: %s." % (pid, ex))
145 rospy.logfatal(
"It will be zombie process" % pid)
149 stat.summary(DiagnosticStatus.OK,
"OK")
151 now = rospy.Time.now()
152 stat.add(
"Uptime [sec]",
154 stat.add(
"Time from last input [sec]",
160 self.diagnostic_updater.update()
164 self.receive_process.start()
165 while not rospy.is_shutdown():
166 rospy.loginfo(
"%d packets are in queue" % self.packets_queue.qsize())
167 packets = self.packets_queue.get()
168 packets.sort(key=
lambda p: p.id)
173 rospy.logerr(
"failed to concatenate packets: %s", e.message)
175 self.receive_process.join()
179 rospy.logwarn(
"publishing out-of-ordered packets %d -> %d" % (self.
last_published_seq_id, packets[0].seq_id))
181 packet_data_length = len(packets[0].data)
184 for i
in range(packets[0].num):
185 if packets[packet_index].id == i:
186 packet = packets[packet_index]
188 packet_index = packet_index + 1
191 b.write(chr(0) * packet_data_length)
192 deserialized_data = []
193 rospy.msg.deserialize_messages(b, deserialized_data,
195 rospy.loginfo(
"received %d message" % len(deserialized_data))
196 if len(deserialized_data) > 0:
198 msg = deserialized_data[0]
200 now = rospy.Time.now()
202 if pub.name
in messages:
204 rospy.loginfo(
"publishing %s" % pub.name)
206 if (hasattr(messages[pub.name],
"header")
and 207 hasattr(messages[pub.name].header,
"stamp")):
208 messages[pub.name].header.stamp = now
209 pub.publish(messages[pub.name])
212 rospy.logwarn(
"""cannot find '%s' in deserialized messages %s""" % (pub.name, messages.keys()))
213 synchronized_publishers = []
214 at_lest_one_topic =
False 217 if pub.name
in messages:
219 synchronized_publishers.append(pub)
220 if (hasattr(messages[pub.name],
"header")
and 221 hasattr(messages[pub.name].header,
"stamp")):
222 messages[pub.name].header.stamp = now
223 if (hasattr(messages[pub.name],
"header")
and 224 hasattr(messages[pub.name].header,
"seq")):
226 if (pub.name
in self.prev_seq_ids.keys()
and 227 messages[pub.name].header.seq == self.
prev_seq_ids[pub.name]):
228 rospy.loginfo(
"skip publishing %s " % (pub.name))
231 rospy.loginfo(
"messages[%s].header.seq: %d"% (pub.name,
232 messages[pub.name].header.seq))
233 rospy.loginfo(
"self.prev_seq_ids[%s]: %d" % (pub.name,
234 messages[pub.name].header.seq))
235 self.
prev_seq_ids[pub.name] = messages[pub.name].header.seq
236 at_lest_one_topic =
True 237 if at_lest_one_topic:
238 for pub
in synchronized_publishers:
239 pub.publish(messages[pub.name])
241 rospy.logerr(
"missed some packets")
244 if __name__ ==
"__main__":
245 rospy.init_node(
"silverhammer_highspeed_receiver")
247 rospy.on_shutdown(receiver.on_shutdown)
publish_only_if_updated_topics
def diagnosticCallback(self, stat)
def receive_process_func(packets_queue, receive_ip, receive_port, packet_size, message_class, topic_prefix, pesimistic, fragment_packets_torelance)
timestamp_overwrite_topics
fragment_packets_torelance
def diagnosticTimerCallback(self, event)
def concatenatePackets(self, packets)