ntrip_forwarding.py
Go to the documentation of this file.
1 #! /usr/bin/env python
2 
3 # Software License Agreement (BSD License)
4 #
5 # Copyright (c) 2015-2020, Dataspeed Inc.
6 # All rights reserved.
7 #
8 # Redistribution and use in source and binary forms, with or without modification,
9 # are permitted provided that the following conditions are met:
10 #
11 # * Redistributions of source code must retain the above copyright notice,
12 # this list of conditions and the following disclaimer.
13 # * Redistributions in binary form must reproduce the above copyright notice,
14 # this list of conditions and the following disclaimer in the documentation
15 # and/or other materials provided with the distribution.
16 # * Neither the name of Dataspeed Inc. nor the names of its
17 # contributors may be used to endorse or promote products derived from this
18 # software without specific prior written permission.
19 #
20 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
21 # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
22 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
23 # DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
24 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
25 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
26 # SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
27 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
28 # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
29 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 
31 import rospy
32 from std_msgs.msg import String
33 import base64
34 import socket
35 import errno
36 import time
37 import threading
38 import select
39 import sys
40 
41 if (sys.version_info > (3, 0)):
42  import queue as Queue
43 else:
44  import Queue as Queue
45 
46 # Testing
47 test_mode = False
48 dummy_gga = '$GPGGA,134451.797,4250.202,N,08320.949,W,1,12,1.0,0.0,M,0.0,M,,*7D'
49 
50 # Global variables
51 rtcm_queue = Queue.Queue()
52 gga_queue = Queue.Queue()
53 
54 
55 class NtripSocketThread (threading.Thread):
56  def __init__(self, caster_ip, caster_port, mountpoint, username, password):
57  threading.Thread.__init__(self)
58  self.stop_event = threading.Event()
60  self.sent_gga = False
61  self.ntrip_tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
62  self.connected_to_caster = False
63  self.username = username
64  self.password = password
65  self.mountpoint = mountpoint
66  self.caster_ip = caster_ip
67  self.caster_port = caster_port
68 
70  print('Connecting to NTRIP caster at %s:%d' % (self.caster_ip, self.caster_port))
71 
72  try:
73  self.ntrip_tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
74  self.ntrip_tcp_sock.settimeout(5.0)
75  self.ntrip_tcp_sock.connect((self.caster_ip, self.caster_port))
76  self.ntrip_tcp_sock.settimeout(None)
77  print('Successfully opened socket')
78  except Exception as ex:
79  print('Error connecting socket: %s' % ex)
80  self.ntrip_tcp_sock.settimeout(None)
81  return False
82 
83  encoded_credentials = base64.b64encode((self.username + ':' + self.password).encode('ascii'))
84  if (sys.version_info > (3, 0)):
85  server_request = ('GET /%s HTTP/1.0' % self.mountpoint).encode('utf-8') + b'\x0D' + b'\x0A' \
86  + 'User-Agent: NTRIP ABC/1.2.3'.encode('utf-8') + b'\x0D' + b'\x0A' \
87  + 'Accept: */*'.encode('utf-8') + b'\x0D' + b'\x0A' \
88  + 'Connection: close'.encode('utf-8') + b'\x0D' + b'\x0A' \
89  + 'Authorization: Basic '.encode('utf-8') + encoded_credentials + b'\x0D' + b'\x0A' + b'\x0D' + b'\x0A'
90  else:
91  server_request = 'GET /%s HTTP/1.0\r\nUser-Agent: NTRIP ABC/1.2.3\r\nAccept: */*\r\nConnection: close\r\nAuthorization: Basic %s\r\n\r\n' % (self.mountpoint, encoded_credentials)
92 
93  self.ntrip_tcp_sock.sendall(server_request)
94 
95  while True:
96  try:
97  response = self.ntrip_tcp_sock.recv(10000)
98  except socket.error as e:
99  err = e.args[0]
100  if err == errno.EAGAIN or err == errno.EWOULDBLOCK:
101  continue
102  else:
103  # a "real" error occurred
104  print(e)
105  return False
106  else:
107  if ('ICY 200 OK').encode() in response:
108  print('Successfully connected to NTRIP caster')
109  return True
110  else:
111  print('Received unexpected response from caster:\n%s' % response)
112  return False
113 
114  def run(self):
115  print('Starting NTRIP TCP socket thread')
116  while not self.stop_event.isSet():
117 
118  if not self.connected_to_caster:
119  if self.connect_to_ntrip_caster():
120  self.connected_to_caster = True
121  else:
122  time.sleep(0.05)
123  continue
124 
125  # Receive RTCM messages from NTRIP caster and put in queue to send to GPS receiver
126  try:
127  ready_to_read, ready_to_write, in_error = select.select([self.ntrip_tcp_sock, ], [self.ntrip_tcp_sock, ], [], 5)
128  except select.error:
129  self.ntrip_tcp_sock.close()
130  self.connected_to_caster = False
131  print('Error calling select(): resetting connection to NTRIP caster')
132  continue
133 
134  if len(ready_to_read) > 0:
135  rtcm_msg = self.ntrip_tcp_sock.recv(100000)
136  if len(rtcm_msg) > 0:
137  if (sys.version_info > (3, 0)):
138  if rtcm_msg[0] == 211:
139  rtcm_msg_len = 256 * rtcm_msg[1] + rtcm_msg[2]
140  rtcm_msg_no = (256 * rtcm_msg[3] + rtcm_msg[4]) / 16
141  print('Received RTCM message %d with length %d' % (rtcm_msg_no, rtcm_msg_len))
142  else:
143  if ord(rtcm_msg[0]) == 211:
144  rtcm_msg_len = 256 * ord(rtcm_msg[1]) + ord(rtcm_msg[2])
145  rtcm_msg_no = (256 * ord(rtcm_msg[3]) + ord(rtcm_msg[4])) / 16
146  print('Received RTCM message %d with length %d' % (rtcm_msg_no, rtcm_msg_len))
147 
148  rtcm_queue.put(rtcm_msg)
149  self.no_rtcm_data_count = 0
150 
151  # Get GPGGA messages from receive queue and send
152  # to NTRIP server to keep connection alive
153  if len(ready_to_write) > 0:
154  try:
155  gga_msg = gga_queue.get_nowait()
156  print('Sending new GGA message to NTRIP caster %s' % gga_msg)
157  self.ntrip_tcp_sock.send(gga_msg.encode('utf-8'))
158  self.sent_gga = True
159  except Queue.Empty:
160  pass
161 
162  if self.no_rtcm_data_count > 200:
163  print('No RTCM messages for 10 seconds; resetting connection to NTRIP caster')
164  self.ntrip_tcp_sock.close()
165  self.connected_to_caster = False
166  self.no_rtcm_data_count = 0
167 
168  if self.sent_gga:
169  self.no_rtcm_data_count += 1
170 
171  time.sleep(0.05)
172 
173  print('Stopping NTRIP TCP socket thread')
174  self.ntrip_tcp_sock.close()
175 
176  def stop(self):
177  self.stop_event.set()
178 
179 
180 class ReceiverThread (threading.Thread):
181  def __init__(self, broadcast_ip, broadcast_port):
182  threading.Thread.__init__(self)
183  self.stop_event = threading.Event()
184  self.receiver_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
185  self.receiver_sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
186  self.broadcast_ip = broadcast_ip
187  self.broadcast_port = broadcast_port
188 
189  def run(self):
190  print('Starting relay socket thread')
191  while not self.stop_event.isSet():
192  # Get RTCM messages from NTRIP TCP socket queue and send to GPS receiver over UDP
193  try:
194  rtcm_msg = rtcm_queue.get_nowait()
195  print('Broadcasting RTCM message to %s:%d' % (self.broadcast_ip, self.broadcast_port))
196  self.receiver_sock.sendto(rtcm_msg, (self.broadcast_ip, self.broadcast_port))
197  except Queue.Empty:
198  # Nothing in the RTCM message queue this time
199  pass
200 
201  time.sleep(0.05)
202 
203  def stop(self):
204  self.stop_event.set()
205 
206 
207 def stop_threads(workers):
208  for worker in workers:
209  worker.stop()
210  worker.join()
211 
212 
213 def start_threads(caster_ip, caster_port, mountpoint, username, password, broadcast_ip, broadcast_port):
214  workers = [NtripSocketThread(caster_ip, caster_port, mountpoint, username, password), ReceiverThread(broadcast_ip, broadcast_port)]
215 
216  for worker in workers:
217  worker.start()
218  return workers
219 
220 
222  def __init__(self):
223  rospy.init_node('ntrip_forwarding')
224 
225  self.caster_ip = rospy.get_param('~caster_ip', default='')
226  self.caster_port = rospy.get_param('~caster_port', default=0)
227  self.mountpoint = rospy.get_param('~mountpoint', default='')
228  self.username = rospy.get_param('~ntrip_username', default='')
229  self.password = rospy.get_param('~ntrip_password', default='')
230 
231  self.broadcast_ip = rospy.get_param('~broadcast_ip', default='195.0.0.255')
232  self.broadcast_port = rospy.get_param('~broadcast_port', default=50472)
233 
234  self.gga_timer = rospy.Timer(rospy.Duration(5.0), self.gga_timer_cb)
235  rospy.Subscriber('gps/gga', String, self.recv_gga)
236 
237  self.gga_msg = ''
239 
240  def recv_gga(self, msg):
241  self.gga_msg = msg.data
242 
243  def gga_timer_cb(self, event):
244  if test_mode:
245  gga_queue.put(dummy_gga)
246  else:
247  if len(self.gga_msg) > 0:
248  gga_queue.put(self.gga_msg)
249 
250  def on_shutdown(self):
251  print('Shutting down')
252  stop_threads(self.workers)
253 
254 
255 if __name__ == '__main__':
256 
257  ros_interface = RosInterface()
258  rospy.on_shutdown(ros_interface.on_shutdown)
259 
260  rospy.spin()
def __init__(self, caster_ip, caster_port, mountpoint, username, password)
def __init__(self, broadcast_ip, broadcast_port)
def start_threads(caster_ip, caster_port, mountpoint, username, password, broadcast_ip, broadcast_port)


oxford_gps_eth
Author(s): Kevin Hallenbeck
autogenerated on Sun Aug 9 2020 04:05:34