transport.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # -*- coding: utf-8 -*-
00003 # Copyright: Yuki Furuta <furushchev@jsk.imi.i.u-tokyo.ac.jp>
00004 
00005 
00006 import socket
00007 import struct
00008 import rospy
00009 from threading import Thread, Event
00010 
00011 class SocketTransport(Thread):
00012     def __init__(self, host, port, max_retry):
00013         super(SocketTransport, self).__init__()
00014         self.host = host
00015         self.port = port
00016         self.max_retry = max_retry  # if 0, try connect forever
00017         self.send_buffer_size = 1024
00018         self.recv_buffer_size = 1024
00019 
00020         self.socket = None
00021 
00022         # var for thread
00023         self.poll_rate = 0.1  # 10 Hz
00024         self.alive = None
00025         self.callbacks = []
00026 
00027     def start(self):
00028         self.alive = Event()
00029         self.connect()
00030         super(SocketTransport, self).start()
00031         rospy.loginfo("[%s] Started" % type(self).__name__)
00032 
00033     def join(self):
00034         if self.alive is not None:
00035             self.alive.set()
00036         if self.is_connected():
00037             self.disconnect()
00038         if self.is_alive():
00039             super(SocketTransport, self).join()
00040         rospy.loginfo("[%s] Stopped" % type(self).__name__)
00041 
00042     def run(self):
00043         recvbuf = ""
00044         while not self.alive.wait(self.poll_rate):
00045             try:
00046                 self.socket.settimeout(1)
00047                 recvbuf += self.socket.recv(self.recv_buffer_size)
00048             except socket.error as e:
00049                 if e.message == "timed out":
00050                     continue
00051                 self.reconnect()
00052             try:
00053                 parsed_data, parsed_length = self.parse(recvbuf)
00054                 for data in parsed_data:
00055                     for cb in self.callbacks:
00056                         cb(data)
00057                 if len(recvbuf) > parsed_length:
00058                     recvbuf = recvbuf[parsed_length:]
00059                 else:
00060                     recvbuf = ""
00061             except ValueError:
00062                 pass
00063             except RuntimeError as e:
00064                 rospy.logerr("Failed to parse data: %s" % str(e))
00065 
00066     def is_connected(self):
00067         if self.socket is None:
00068             return False
00069         try:
00070             self.socket.getpeername()
00071             return True
00072         except socket.error as e:
00073             if e.errno == 107:
00074                 # no connection error
00075                 return False
00076             else:
00077                 raise e
00078 
00079     def connect(self):
00080         retry = 0
00081         err = None
00082         info = (self.host, self.port)
00083         while not rospy.is_shutdown() and (self.max_retry == 0 or retry < self.max_retry):
00084             s = None
00085             try:
00086                 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
00087                 s.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, self.send_buffer_size)
00088                 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
00089                 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
00090                 s.connect(info)
00091                 rospy.loginfo("[%s] Connected to server %s", type(self).__name__, info)
00092                 self.socket = s
00093                 return True
00094             except socket.error as e:
00095                 err = e
00096                 if s:
00097                     s.close()
00098                 retry += 1
00099                 rospy.sleep(1.0)
00100         raise IOError("[%s] Failed to connect to %s after %d retries: %s" % (type(self).__name__, info, retry, str(err)))
00101 
00102     def disconnect(self):
00103         self.socket.close()
00104         self.socket = None
00105         rospy.loginfo("[%s] Disconnected", type(self).__name__)
00106 
00107     def reconnect(self):
00108         if self.is_connected():
00109             self.disconnect()
00110         self.connect()
00111 
00112     def send(self, data):
00113         try:
00114             self.socket.sendall(data)
00115         except socket.error:
00116             self.reconnect()
00117         except AttributeError:
00118             self.connect()
00119 
00120     def on_received_data(self, cb):
00121         self.callbacks.append(cb)
00122 
00123     def parse(self, data):
00124         raise NotImplementedError()
00125 
00126 
00127 if __name__ == '__main__':
00128     pass


julius_ros
Author(s): Yuki Furuta
autogenerated on Wed Jul 10 2019 03:24:05