transport.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 # Copyright: Yuki Furuta <furushchev@jsk.imi.i.u-tokyo.ac.jp>
4 
5 
6 import sys
7 import socket
8 import struct
9 import rospy
10 from threading import Thread, Event
11 
12 class SocketTransport(Thread):
13  def __init__(self, host, port, max_retry):
14  super(SocketTransport, self).__init__()
15  self.host = host
16  self.port = port
17  self.max_retry = max_retry # if 0, try connect forever
18  self.send_buffer_size = 1024
19  self.recv_buffer_size = 1024
20 
21  self.socket = None
22 
23  # var for thread
24  self.poll_rate = 0.1 # 10 Hz
25  self.alive = None
26  self.callbacks = []
27 
28  def start(self):
29  self.alive = Event()
30  self.connect()
31  super(SocketTransport, self).start()
32  rospy.loginfo("[%s] Started" % type(self).__name__)
33 
34  def join(self):
35  if self.alive is not None:
36  self.alive.set()
37  if self.is_connected():
38  self.disconnect()
39  if self.is_alive():
40  super(SocketTransport, self).join()
41  rospy.loginfo("[%s] Stopped" % type(self).__name__)
42 
43  def run(self):
44  recvbuf = ""
45  while not self.alive.wait(self.poll_rate):
46  try:
47  self.socket.settimeout(1)
48  buf = self.socket.recv(self.recv_buffer_size)
49  if sys.version_info.major >= 3 and type(buf) == bytes: # on Python3 convert buf to str
50  buf = buf.decode()
51  recvbuf += buf
52  except socket.error as e:
53  if sys.version_info.major >= 3 and type(e) == socket.timeout:
54  continue
55  if sys.version_info.major < 3 and e.message == "timed out":
56  continue
57  self.reconnect()
58  try:
59  parsed_data, parsed_length = self.parse(recvbuf)
60  for data in parsed_data:
61  for cb in self.callbacks:
62  cb(data)
63  if len(recvbuf) > parsed_length:
64  recvbuf = recvbuf[parsed_length:]
65  else:
66  recvbuf = ""
67  except ValueError:
68  pass
69  except RuntimeError as e:
70  rospy.logerr("Failed to parse data: %s" % str(e))
71 
72  def is_connected(self):
73  if self.socket is None:
74  return False
75  try:
76  self.socket.getpeername()
77  return True
78  except socket.error as e:
79  if e.errno == 107:
80  # no connection error
81  return False
82  else:
83  raise e
84 
85  def connect(self):
86  retry = 0
87  err = None
88  info = (self.host, self.port)
89  while not rospy.is_shutdown() and (self.max_retry == 0 or retry < self.max_retry):
90  s = None
91  try:
92  s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
93  s.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, self.send_buffer_size)
94  s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
95  s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
96  s.connect(info)
97  rospy.loginfo("[%s] Connected to server %s", type(self).__name__, info)
98  self.socket = s
99  return True
100  except socket.error as e:
101  err = e
102  if s:
103  s.close()
104  retry += 1
105  rospy.sleep(1.0)
106  raise IOError("[%s] Failed to connect to %s after %d retries: %s" % (type(self).__name__, info, retry, str(err)))
107 
108  def disconnect(self):
109  self.socket.close()
110  self.socket = None
111  rospy.loginfo("[%s] Disconnected", type(self).__name__)
112 
113  def reconnect(self):
114  if self.is_connected():
115  self.disconnect()
116  self.connect()
117 
118  def send(self, data):
119  if sys.version_info.major >= 3 and type(data) == str: # on Python3 convert data to bytes
120  data = data.encode()
121  try:
122  self.socket.sendall(data)
123  except socket.error:
124  self.reconnect()
125  except AttributeError:
126  self.connect()
127 
128  def on_received_data(self, cb):
129  self.callbacks.append(cb)
130 
131  def parse(self, data):
132  raise NotImplementedError()
133 
134 
135 if __name__ == '__main__':
136  pass
def __init__(self, host, port, max_retry)
Definition: transport.py:13


julius_ros
Author(s): Yuki Furuta
autogenerated on Tue May 11 2021 02:55:36