transport.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 # Copyright: Yuki Furuta <furushchev@jsk.imi.i.u-tokyo.ac.jp>
4 
5 
6 import socket
7 import struct
8 import rospy
9 from threading import Thread, Event
10 
11 class SocketTransport(Thread):
12  def __init__(self, host, port, max_retry):
13  super(SocketTransport, self).__init__()
14  self.host = host
15  self.port = port
16  self.max_retry = max_retry # if 0, try connect forever
17  self.send_buffer_size = 1024
18  self.recv_buffer_size = 1024
19 
20  self.socket = None
21 
22  # var for thread
23  self.poll_rate = 0.1 # 10 Hz
24  self.alive = None
25  self.callbacks = []
26 
27  def start(self):
28  self.alive = Event()
29  self.connect()
30  super(SocketTransport, self).start()
31  rospy.loginfo("[%s] Started" % type(self).__name__)
32 
33  def join(self):
34  if self.alive is not None:
35  self.alive.set()
36  if self.is_connected():
37  self.disconnect()
38  if self.is_alive():
39  super(SocketTransport, self).join()
40  rospy.loginfo("[%s] Stopped" % type(self).__name__)
41 
42  def run(self):
43  recvbuf = ""
44  while not self.alive.wait(self.poll_rate):
45  try:
46  self.socket.settimeout(1)
47  recvbuf += self.socket.recv(self.recv_buffer_size)
48  except socket.error as e:
49  if e.message == "timed out":
50  continue
51  self.reconnect()
52  try:
53  parsed_data, parsed_length = self.parse(recvbuf)
54  for data in parsed_data:
55  for cb in self.callbacks:
56  cb(data)
57  if len(recvbuf) > parsed_length:
58  recvbuf = recvbuf[parsed_length:]
59  else:
60  recvbuf = ""
61  except ValueError:
62  pass
63  except RuntimeError as e:
64  rospy.logerr("Failed to parse data: %s" % str(e))
65 
66  def is_connected(self):
67  if self.socket is None:
68  return False
69  try:
70  self.socket.getpeername()
71  return True
72  except socket.error as e:
73  if e.errno == 107:
74  # no connection error
75  return False
76  else:
77  raise e
78 
79  def connect(self):
80  retry = 0
81  err = None
82  info = (self.host, self.port)
83  while not rospy.is_shutdown() and (self.max_retry == 0 or retry < self.max_retry):
84  s = None
85  try:
86  s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
87  s.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, self.send_buffer_size)
88  s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
89  s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
90  s.connect(info)
91  rospy.loginfo("[%s] Connected to server %s", type(self).__name__, info)
92  self.socket = s
93  return True
94  except socket.error as e:
95  err = e
96  if s:
97  s.close()
98  retry += 1
99  rospy.sleep(1.0)
100  raise IOError("[%s] Failed to connect to %s after %d retries: %s" % (type(self).__name__, info, retry, str(err)))
101 
102  def disconnect(self):
103  self.socket.close()
104  self.socket = None
105  rospy.loginfo("[%s] Disconnected", type(self).__name__)
106 
107  def reconnect(self):
108  if self.is_connected():
109  self.disconnect()
110  self.connect()
111 
112  def send(self, data):
113  try:
114  self.socket.sendall(data)
115  except socket.error:
116  self.reconnect()
117  except AttributeError:
118  self.connect()
119 
120  def on_received_data(self, cb):
121  self.callbacks.append(cb)
122 
123  def parse(self, data):
124  raise NotImplementedError()
125 
126 
127 if __name__ == '__main__':
128  pass
def __init__(self, host, port, max_retry)
Definition: transport.py:12


julius_ros
Author(s): Yuki Furuta
autogenerated on Wed Jul 10 2019 03:47:05