Go to the documentation of this file.00001
00002
00003
00004
00005
00006 import socket
00007 import struct
00008 import rospy
00009 from threading import Thread, Event
00010
00011 class SocketTransport(Thread):
00012 def __init__(self, host, port, max_retry):
00013 super(SocketTransport, self).__init__()
00014 self.host = host
00015 self.port = port
00016 self.max_retry = max_retry
00017 self.send_buffer_size = 1024
00018 self.recv_buffer_size = 1024
00019
00020 self.socket = None
00021
00022
00023 self.poll_rate = 0.1
00024 self.alive = None
00025 self.callbacks = []
00026
00027 def start(self):
00028 self.alive = Event()
00029 self.connect()
00030 super(SocketTransport, self).start()
00031 rospy.loginfo("[%s] Started" % type(self).__name__)
00032
00033 def join(self):
00034 if self.alive is not None:
00035 self.alive.set()
00036 if self.is_connected():
00037 self.disconnect()
00038 if self.is_alive():
00039 super(SocketTransport, self).join()
00040 rospy.loginfo("[%s] Stopped" % type(self).__name__)
00041
00042 def run(self):
00043 recvbuf = ""
00044 while not self.alive.wait(self.poll_rate):
00045 try:
00046 self.socket.settimeout(1)
00047 recvbuf += self.socket.recv(self.recv_buffer_size)
00048 except socket.error as e:
00049 if e.message == "timed out":
00050 continue
00051 self.reconnect()
00052 try:
00053 parsed_data, parsed_length = self.parse(recvbuf)
00054 for data in parsed_data:
00055 for cb in self.callbacks:
00056 cb(data)
00057 if len(recvbuf) > parsed_length:
00058 recvbuf = recvbuf[parsed_length:]
00059 else:
00060 recvbuf = ""
00061 except ValueError:
00062 pass
00063 except RuntimeError as e:
00064 rospy.logerr("Failed to parse data: %s" % str(e))
00065
00066 def is_connected(self):
00067 if self.socket is None:
00068 return False
00069 try:
00070 self.socket.getpeername()
00071 return True
00072 except socket.error as e:
00073 if e.errno == 107:
00074
00075 return False
00076 else:
00077 raise e
00078
00079 def connect(self):
00080 retry = 0
00081 err = None
00082 info = (self.host, self.port)
00083 while not rospy.is_shutdown() and (self.max_retry == 0 or retry < self.max_retry):
00084 s = None
00085 try:
00086 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
00087 s.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, self.send_buffer_size)
00088 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
00089 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
00090 s.connect(info)
00091 rospy.loginfo("[%s] Connected to server %s", type(self).__name__, info)
00092 self.socket = s
00093 return True
00094 except socket.error as e:
00095 err = e
00096 if s:
00097 s.close()
00098 retry += 1
00099 rospy.sleep(1.0)
00100 raise IOError("[%s] Failed to connect to %s after %d retries: %s" % (type(self).__name__, info, retry, str(err)))
00101
00102 def disconnect(self):
00103 self.socket.close()
00104 self.socket = None
00105 rospy.loginfo("[%s] Disconnected", type(self).__name__)
00106
00107 def reconnect(self):
00108 if self.is_connected():
00109 self.disconnect()
00110 self.connect()
00111
00112 def send(self, data):
00113 try:
00114 self.socket.sendall(data)
00115 except socket.error:
00116 self.reconnect()
00117 except AttributeError:
00118 self.connect()
00119
00120 def on_received_data(self, cb):
00121 self.callbacks.append(cb)
00122
00123 def parse(self, data):
00124 raise NotImplementedError()
00125
00126
00127 if __name__ == '__main__':
00128 pass