Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044 import rospy
00045
00046
00047 import applanix_msgs.msg
00048
00049
00050 from port import Port
00051 from applanix_msgs.mapping import groups, msgs
00052 from handlers import GroupHandler, MessageHandler
00053 import translator
00054
00055
00056 from cStringIO import StringIO
00057 from threading import Lock
00058
00059
00060 class DataPort(Port):
00061 ALLMSGS_SEND_TIMEOUT = rospy.Duration.from_sec(0.01)
00062
00063 def run(self):
00064 self.sock.settimeout(1.0)
00065 errors = 0
00066
00067
00068 all_msgs = applanix_msgs.msg.AllMsgs()
00069 all_msgs_pub = rospy.Publisher("config", all_msgs.__class__, queue_size=5, latch=True)
00070
00071
00072 listener = SubscribeListenerManager()
00073
00074
00075 handlers = {}
00076 for group_num in groups.keys():
00077 include = True
00078 for prefix in self.opts['exclude_prefixes']:
00079 if groups[group_num][0].startswith(prefix): include = False
00080 if include:
00081 handlers[(applanix_msgs.msg.CommonHeader.START_GROUP, group_num)] = \
00082 GroupHandler(*groups[group_num], listener=listener.listener_for(group_num))
00083 for msg_num in msgs.keys():
00084 handlers[(applanix_msgs.msg.CommonHeader.START_MESSAGE, msg_num)] = \
00085 MessageHandler(*msgs[msg_num], all_msgs=all_msgs)
00086
00087 pkt_counters = {}
00088 bad_pkts = set()
00089
00090 while not self.finish.is_set():
00091 try:
00092 pkt_id, pkt_str = self.recv()
00093 if pkt_id != None:
00094 handlers[pkt_id].handle(StringIO(pkt_str))
00095
00096 except ValueError as e:
00097
00098 rospy.logwarn(str(e))
00099 continue
00100
00101 except KeyError:
00102
00103 if pkt_id not in pkt_counters:
00104 rospy.logwarn("Unhandled packet: %s.%d" % pkt_id)
00105
00106 except translator.TranslatorError:
00107 if pkt_id not in bad_pkts:
00108 rospy.logwarn("Error parsing %s.%d" % pkt_id)
00109 bad_pkts.add(pkt)
00110
00111 if pkt_id not in pkt_counters:
00112 pkt_counters[pkt_id] = 0
00113 pkt_counters[pkt_id] += 1
00114
00115
00116
00117
00118 if all_msgs.last_changed > all_msgs.last_sent and \
00119 rospy.get_rostime() > all_msgs.last_changed + self.ALLMSGS_SEND_TIMEOUT:
00120 all_msgs_pub.publish(all_msgs)
00121 all_msgs.last_sent = rospy.get_rostime()
00122
00123
00124 class SubscribeListenerManager():
00125 def __init__(self):
00126 self.lock = Lock()
00127 self.groups = set()
00128 self.publisher = rospy.Publisher("subscribed_groups", applanix_msgs.msg.Groups, queue_size=5, latch=True)
00129 self.publish()
00130
00131 def publish(self):
00132 self.publisher.publish(groups=list(self.groups))
00133
00134 def listener_for(self, group_num):
00135 return self.Listener(self, group_num)
00136
00137 class Listener(rospy.SubscribeListener):
00138 def __init__(self, manager, group_num):
00139 self.manager = manager
00140 self.group_num = group_num
00141
00142 def peer_subscribe(self, topic_name, topic_publish, peer_publish):
00143 with self.manager.lock:
00144 self.manager.groups.add(self.group_num)
00145 self.manager.publish()
00146
00147 def peer_unsubscribe(self, topic_name, num_peers):
00148 if num_peers == 0:
00149 with self.lock:
00150 self.manager.groups.discard(self.group_num)
00151 self.manager.publish()