ntrip_client.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 import ssl
4 import time
5 import base64
6 import socket
7 import select
8 import logging
9 
10 from .nmea_parser import NMEAParser
11 from .rtcm_parser import RTCMParser
12 
13 _CHUNK_SIZE = 1024
14 _SOURCETABLE_RESPONSES = [
15  'SOURCETABLE 200 OK'
16 ]
17 _SUCCESS_RESPONSES = [
18  'ICY 200 OK',
19  'HTTP/1.0 200 OK',
20  'HTTP/1.1 200 OK'
21 ]
22 _UNAUTHORIZED_RESPONSES = [
23  '401'
24 ]
25 
27 
28  # Public constants
29  DEFAULT_RECONNECT_ATTEMPT_MAX = 10
30  DEFAULT_RECONNECT_ATEMPT_WAIT_SECONDS = 5
31  DEFAULT_RTCM_TIMEOUT_SECONDS = 4
32 
33  def __init__(self, host, port, mountpoint, ntrip_version, username, password, logerr=logging.error, logwarn=logging.warning, loginfo=logging.info, logdebug=logging.debug):
34  # Bit of a strange pattern here, but save the log functions so we can be agnostic of ROS
35  self._logerr = logerr
36  self._logwarn = logwarn
37  self._loginfo = loginfo
38  self._logdebug = logdebug
39 
40  # Save the server info
41  self._host = host
42  self._port = port
43  self._mountpoint = mountpoint
44  self._ntrip_version = ntrip_version
45  if username is not None and password is not None:
46  self._basic_credentials = base64.b64encode('{}:{}'.format(
47  username, password).encode('utf-8')).decode('utf-8')
48  else:
49  self._basic_credentials = None
50 
51  # Initialize this so we don't throw an exception when closing
52  self._raw_socket = None
53  self._server_socket = None
54 
55  # Setup some parsers to parse incoming messages
57  logerr=logerr,
58  logwarn=logwarn,
59  loginfo=loginfo,
60  logdebug=logdebug
61  )
63  logerr=logerr,
64  logwarn=logwarn,
65  loginfo=loginfo,
66  logdebug=logdebug
67  )
68 
69  # Public SSL configuration
70  self.ssl = False
71  self.cert = None
72  self.key = None
73  self.ca_cert = None
74 
75  # Setup some state
76  self._shutdown = False
77  self._connected = False
78 
79  # Private reconnect info
85  self._first_rtcm_received = False
87 
88  # Public reconnect info
92 
93  def connect(self):
94  # Create a socket object that we will use to connect to the server
95  self._server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
96  self._server_socket.settimeout(5)
97 
98  # Connect the socket to the server
99  try:
100  self._server_socket.connect((self._host, self._port))
101  except Exception as e:
102  self._logerr(
103  'Unable to connect socket to server at http://{}:{}'.format(self._host, self._port))
104  self._logerr('Exception: {}'.format(str(e)))
105  return False
106 
107  # If SSL, wrap the socket
108  if self.ssl:
109  # Configre the context based on the config
110  self._ssl_context = ssl.create_default_context()
111  if self.cert:
112  self._ssl_context.load_cert_chain(self.cert, self.key)
113  if self.ca_cert:
114  self._ssl_context.load_verify_locations(self.ca_cert)
115 
116  # Save the old socket for later just in case, and create a new SSL socket
117  self._raw_socket = self._server_socket
118  self._server_socket = self._ssl_context.wrap_socket(self._raw_socket, server_hostname=self._host)
119 
120  # Send the HTTP Request
121  try:
122  self._server_socket.send(self._form_request())
123  except Exception as e:
124  self._logerr(
125  'Unable to send request to server at http://{}:{}'.format(self._host, self._port))
126  self._logerr('Exception: {}'.format(str(e)))
127  return False
128 
129  # Get the response from the server
130  response = ''
131  try:
132  response = self._server_socket.recv(_CHUNK_SIZE).decode('utf-8')
133  except Exception as e:
134  self._logerr(
135  'Unable to read response from server at http://{}:{}'.format(self._host, self._port))
136  self._logerr('Exception: {}'.format(str(e)))
137  return False
138 
139  # Properly handle the response
140  if any(success in response for success in _SUCCESS_RESPONSES):
141  self._connected = True
142 
143  # Some debugging hints about the kind of error we received
144  known_error = False
145  if any(sourcetable in response for sourcetable in _SOURCETABLE_RESPONSES):
146  self._logwarn('Received sourcetable response from the server. This probably means the mountpoint specified is not valid')
147  known_error = True
148  elif any(unauthorized in response for unauthorized in _UNAUTHORIZED_RESPONSES):
149  self._logwarn('Received unauthorized response from the server. Check your username, password, and mountpoint to make sure they are correct.')
150  known_error = True
151  elif not self._connected and (self._ntrip_version == None or self._ntrip_version == ''):
152  self._logwarn('Received unknown error from the server. Note that the NTRIP version was not specified in the launch file. This is not necesarilly the cause of this error, but it may be worth checking your NTRIP casters documentation to see if the NTRIP version needs to be specified.')
153  known_error = True
154 
155  # Wish we could just return from the above checks, but some casters return both a success and an error in the response
156  # If we received any known error, even if we received a success it should be considered a failure
157  if known_error or not self._connected:
158  self._logerr('Invalid response received from http://{}:{}/{}'.format(
159  self._host, self._port, self._mountpoint))
160  self._logerr('Response: {}'.format(response))
161  return False
162  else:
163  self._loginfo(
164  'Connected to http://{}:{}/{}'.format(self._host, self._port, self._mountpoint))
165  return True
166 
167 
168  def disconnect(self):
169  # Disconnect the socket
170  self._connected = False
171  try:
172  if self._server_socket:
173  self._server_socket.shutdown(socket.SHUT_RDWR)
174  if self._raw_socket:
175  self._raw_socket.shutdown(socket.SHUT_RDWR)
176  except Exception as e:
177  self._logdebug('Encountered exception when shutting down the socket. This can likely be ignored')
178  self._logdebug('Exception: {}'.format(e))
179  try:
180  if self._server_socket:
181  self._server_socket.close()
182  if self._raw_socket:
183  self._raw_socket.close()
184  except Exception as e:
185  self._logdebug('Encountered exception when closing the socket. This can likely be ignored')
186  self._logdebug('Exception: {}'.format(e))
187 
188  def reconnect(self):
189  if self._connected:
190  while not self._shutdown:
191  self._reconnect_attempt_count += 1
192  self.disconnect()
193  connect_success = self.connect()
194  if not connect_success and self._reconnect_attempt_count < self.reconnect_attempt_max:
195  self._logerr('Reconnect to http://{}:{} failed. Retrying in {} seconds'.format(self._host, self._port, self.reconnect_attempt_wait_seconds))
196  time.sleep(self.reconnect_attempt_wait_seconds)
198  self._reconnect_attempt_count = 0
199  raise Exception("Reconnect was attempted {} times, but never succeeded".format(self._reconnect_attempt_count))
200  break
201  elif connect_success:
202  self._reconnect_attempt_count = 0
203  break
204  else:
205  self._logdebug('Reconnect called while still connected, ignoring')
206 
207  def send_nmea(self, sentence):
208  if not self._connected:
209  self._logwarn('NMEA sent before client was connected, discarding NMEA')
210  return
211 
212  # Not sure if this is the right thing to do, but python will escape the return characters at the end of the string, so do this manually
213  if sentence[-4:] == '\\r\\n':
214  sentence = sentence[:-4] + '\r\n'
215  elif sentence[-2:] != '\r\n':
216  sentence = sentence + '\r\n'
217 
218  # Check if it is a valid NMEA sentence
219  if not self.nmea_parser.is_valid_sentence(sentence):
220  self._logwarn("Invalid NMEA sentence, not sending to server")
221  return
222 
223  # Encode the data and send it to the socket
224  try:
225  self._server_socket.send(sentence.encode('utf-8'))
226  except Exception as e:
227  self._logwarn('Unable to send NMEA sentence to server.')
228  self._logwarn('Exception: {}'.format(str(e)))
229  self._nmea_send_failed_count += 1
231  self._logwarn("NMEA sentence failed to send to server {} times, restarting".format(self._nmea_send_failed_count))
232  self.reconnect()
233  self._nmea_send_failed_count = 0
234  self.send_nmea(sentence) # Try sending the NMEA sentence again
235 
236 
237  def recv_rtcm(self):
238  if not self._connected:
239  self._logwarn(
240  'RTCM requested before client was connected, returning empty list')
241  return []
242 
243  # If it has been too long since we received an RTCM packet, reconnect
244  if time.time() - self.rtcm_timeout_seconds >= self._recv_rtcm_last_packet_timestamp and self._first_rtcm_received:
245  self._logerr('RTCM data not received for {} seconds, reconnecting'.format(self.rtcm_timeout_seconds))
246  self.reconnect()
247  self._first_rtcm_received = False
248 
249  # Check if there is any data available on the socket
250  read_sockets, _, _ = select.select([self._server_socket], [], [], 0)
251  if not read_sockets:
252  return []
253 
254  # Since we only ever pass the server socket to the list of read sockets, we can just read from that
255  # Read all available data into a buffer
256  data = b''
257  while True:
258  try:
259  chunk = self._server_socket.recv(_CHUNK_SIZE)
260  data += chunk
261  if len(chunk) < _CHUNK_SIZE:
262  break
263  except Exception as e:
264  self._logerr('Error while reading {} bytes from socket'.format(_CHUNK_SIZE))
265  if not self._socket_is_open():
266  self._logerr('Socket appears to be closed. Reconnecting')
267  self.reconnect()
268  return []
269  break
270  self._logdebug('Read {} bytes'.format(len(data)))
271 
272  # If 0 bytes were read from the socket even though we were told data is available multiple times,
273  # it can be safely assumed that we can reconnect as the server has closed the connection
274  if len(data) == 0:
275  self._read_zero_bytes_count += 1
277  self._logwarn('Reconnecting because we received 0 bytes from the socket even though it said there was data available {} times'.format(self._read_zero_bytes_count))
278  self.reconnect()
279  self._read_zero_bytes_count = 0
280  return []
281  else:
282  # Looks like we received valid data, so note when the data was received
283  self._recv_rtcm_last_packet_timestamp = time.time()
284  self._first_rtcm_received = True
285 
286  # Send the data to the RTCM parser to parse it
287  return self.rtcm_parser.parse(data) if data else []
288 
289  def shutdown(self):
290  # Set some state, and then disconnect
291  self._shutdown = True
292  self.disconnect()
293 
294  def _form_request(self):
295  if self._ntrip_version != None and self._ntrip_version != '':
296  request_str = 'GET /{} HTTP/1.0\r\nNtrip-Version: {}\r\nUser-Agent: NTRIP ntrip_client_ros\r\n'.format(
297  self._mountpoint, self._ntrip_version)
298  else:
299  request_str = 'GET /{} HTTP/1.0\r\nUser-Agent: NTRIP ntrip_client_ros\r\n'.format(
300  self._mountpoint)
301  if self._basic_credentials is not None:
302  request_str += 'Authorization: Basic {}\r\n'.format(
303  self._basic_credentials)
304  request_str += '\r\n'
305  return request_str.encode('utf-8')
306 
307  def _socket_is_open(self):
308  try:
309  # this will try to read bytes without blocking and also without removing them from buffer (peek only)
310  data = self._server_socket.recv(_CHUNK_SIZE, socket.MSG_DONTWAIT | socket.MSG_PEEK)
311  if len(data) == 0:
312  return False
313  except BlockingIOError:
314  return True # socket is open and reading from it would block
315  except ConnectionResetError:
316  self._logwarn('Connection reset by peer')
317  return False # socket was closed for some other reason
318  except socket.timeout:
319  return True # timeout likely means that the socket is still open
320  except Exception as e:
321  self._logwarn('Socket appears to be closed')
322  self._logwarn('Exception: {}'.format(e))
323  return False
324  return True
def __init__(self, host, port, mountpoint, ntrip_version, username, password, logerr=logging.error, logwarn=logging.warning, loginfo=logging.info, logdebug=logging.debug)
Definition: ntrip_client.py:33


ntrip_client
Author(s): Parker Hannifin Corp
autogenerated on Thu Aug 18 2022 02:39:24