32 from std_msgs.msg
import String
41 if (sys.version_info > (3, 0)):
48 dummy_gga =
'$GPGGA,134451.797,4250.202,N,08320.949,W,1,12,1.0,0.0,M,0.0,M,,*7D' 51 rtcm_queue = Queue.Queue()
52 gga_queue = Queue.Queue()
56 def __init__(self, caster_ip, caster_port, mountpoint, username, password):
57 threading.Thread.__init__(self)
73 self.
ntrip_tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
74 self.ntrip_tcp_sock.settimeout(5.0)
76 self.ntrip_tcp_sock.settimeout(
None)
77 print(
'Successfully opened socket')
78 except Exception
as ex:
79 print(
'Error connecting socket: %s' % ex)
80 self.ntrip_tcp_sock.settimeout(
None)
83 encoded_credentials = base64.b64encode((self.
username +
':' + self.
password).encode(
'ascii'))
84 if (sys.version_info > (3, 0)):
85 server_request = (
'GET /%s HTTP/1.0' % self.
mountpoint).encode(
'utf-8') + b
'\x0D' + b
'\x0A' \
86 +
'User-Agent: NTRIP ABC/1.2.3'.encode(
'utf-8') + b
'\x0D' + b
'\x0A' \
87 +
'Accept: */*'.encode(
'utf-8') + b
'\x0D' + b
'\x0A' \
88 +
'Connection: close'.encode(
'utf-8') + b
'\x0D' + b
'\x0A' \
89 +
'Authorization: Basic '.encode(
'utf-8') + encoded_credentials + b
'\x0D' + b
'\x0A' + b
'\x0D' + b
'\x0A' 91 server_request =
'GET /%s HTTP/1.0\r\nUser-Agent: NTRIP ABC/1.2.3\r\nAccept: */*\r\nConnection: close\r\nAuthorization: Basic %s\r\n\r\n' % (self.
mountpoint, encoded_credentials)
93 self.ntrip_tcp_sock.sendall(server_request)
97 response = self.ntrip_tcp_sock.recv(10000)
98 except socket.error
as e:
100 if err == errno.EAGAIN
or err == errno.EWOULDBLOCK:
107 if (
'ICY 200 OK').encode()
in response:
108 print(
'Successfully connected to NTRIP caster')
111 print(
'Received unexpected response from caster:\n%s' % response)
115 print(
'Starting NTRIP TCP socket thread')
116 while not self.stop_event.isSet():
129 self.ntrip_tcp_sock.close()
131 print(
'Error calling select(): resetting connection to NTRIP caster')
134 if len(ready_to_read) > 0:
135 rtcm_msg = self.ntrip_tcp_sock.recv(100000)
136 if len(rtcm_msg) > 0:
137 if (sys.version_info > (3, 0)):
138 if rtcm_msg[0] == 211:
139 rtcm_msg_len = 256 * rtcm_msg[1] + rtcm_msg[2]
140 rtcm_msg_no = (256 * rtcm_msg[3] + rtcm_msg[4]) / 16
141 print(
'Received RTCM message %d with length %d' % (rtcm_msg_no, rtcm_msg_len))
143 if ord(rtcm_msg[0]) == 211:
144 rtcm_msg_len = 256 * ord(rtcm_msg[1]) + ord(rtcm_msg[2])
145 rtcm_msg_no = (256 * ord(rtcm_msg[3]) + ord(rtcm_msg[4])) / 16
146 print(
'Received RTCM message %d with length %d' % (rtcm_msg_no, rtcm_msg_len))
148 rtcm_queue.put(rtcm_msg)
153 if len(ready_to_write) > 0:
155 gga_msg = gga_queue.get_nowait()
156 print(
'Sending new GGA message to NTRIP caster %s' % gga_msg)
157 self.ntrip_tcp_sock.send(gga_msg.encode(
'utf-8'))
163 print(
'No RTCM messages for 10 seconds; resetting connection to NTRIP caster')
164 self.ntrip_tcp_sock.close()
173 print(
'Stopping NTRIP TCP socket thread')
174 self.ntrip_tcp_sock.close()
177 self.stop_event.set()
182 threading.Thread.__init__(self)
185 self.receiver_sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
190 print(
'Starting relay socket thread')
191 while not self.stop_event.isSet():
194 rtcm_msg = rtcm_queue.get_nowait()
204 self.stop_event.set()
208 for worker
in workers:
213 def start_threads(caster_ip, caster_port, mountpoint, username, password, broadcast_ip, broadcast_port):
216 for worker
in workers:
223 rospy.init_node(
'ntrip_forwarding')
225 self.
caster_ip = rospy.get_param(
'~caster_ip', default=
'')
228 self.
username = rospy.get_param(
'~ntrip_username', default=
'')
229 self.
password = rospy.get_param(
'~ntrip_password', default=
'')
231 self.
broadcast_ip = rospy.get_param(
'~broadcast_ip', default=
'195.0.0.255')
235 rospy.Subscriber(
'gps/gga', String, self.
recv_gga)
245 gga_queue.put(dummy_gga)
251 print(
'Shutting down')
255 if __name__ ==
'__main__':
258 rospy.on_shutdown(ros_interface.on_shutdown)
def __init__(self, caster_ip, caster_port, mountpoint, username, password)
def __init__(self, broadcast_ip, broadcast_port)
def start_threads(caster_ip, caster_port, mountpoint, username, password, broadcast_ip, broadcast_port)
def gga_timer_cb(self, event)
def stop_threads(workers)
def connect_to_ntrip_caster(self)