swiftnav_piksi_tcp.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 import subprocess
4 import os
5 import struct
6 import time
7 import sys
8 import rospy
9 import numpy as np
10 import math
11 from sbp.client.drivers.network_drivers import TCPDriver
12 from sbp.client import Handler, Framer
13 from sbp.settings import SBP_MSG_SETTINGS_READ_RESP, MsgSettingsWrite, MsgSettingsReadReq
14 from sbp.imu import SBP_MSG_IMU_RAW
15 from sbp.mag import SBP_MSG_MAG_RAW
16 from sbp.navigation import SBP_MSG_BASELINE_HEADING_DEP_A, SBP_MSG_POS_LLH, SBP_MSG_BASELINE_NED
17 from datetime import datetime
18 from nav_msgs.msg import Odometry
19 from sensor_msgs.msg import Imu, MagneticField, NavSatFix
20 from std_msgs.msg import String, Int32, Bool
21 from geometry_msgs.msg import TwistStamped
22 
23 class SwiftNavDriver(object):
24 
25  def __init__(self):
26  rospy.loginfo("[RR_SWIFTNAV_PIKSI] Initializing")
27 
28  # Initialize message and publisher structures
29  self.drive_direction = "forward"
30  self.comms_enabled = False
31  self.ncat_process = None
32  self.previous_x = 0
33  self.previous_y = 0
34 
35  # TOPIC: swift_gps/llh/fix_mode
36  # This topic reports the fix_mode of llh position
37  # 0 - Invalid
38  # 1 - Single Point Position (SSP)
39  # 2 - Differential GNSS (DGNSS)
40  # 3 - Float RTK
41  # 4 - Fixed RTK
42  # 5 - Dead Reckoning
43  # 6 - Satellite-based Augmentation System (SBAS)
44 
45  # ROS Publishers
46  self.pub_imu = rospy.Publisher('/swift_gps/imu/raw', Imu, queue_size=10)
47  self.pub_mag = rospy.Publisher('/swift_gps/mag/raw', MagneticField, queue_size=10)
48  self.pub_llh = rospy.Publisher('/swift_gps/llh/position', NavSatFix, queue_size=3)
49  self.pub_llh_n_sats = rospy.Publisher('/swift_gps/llh/n_sats', Int32, queue_size=3)
50  self.pub_llh_fix_mode = rospy.Publisher('/swift_gps/llh/fix_mode', Int32, queue_size=3)
51  self.pub_ecef_odom = rospy.Publisher('/swift_gps/baseline/ecef/position', Odometry, queue_size=3)
52 
53  # ROS Subscriber
54  self.sub_rtk_cmd = rospy.Subscriber("/swift_gps/enable_comms", Bool, self.enable_comms_cb)
55  self.sub_cmd_vel = rospy.Subscriber("/cmd_vel/managed", TwistStamped, self.cmd_vel_cb)
56 
57  # ROS Parameters
58  rospy.loginfo("[RR_SWIFTNAV_PIKSI] Loading ROS Parameters")
59 
60  path_piksi_ip_address = rospy.search_param('piksi_ip_address')
61  self.piksi_ip_address = rospy.get_param(path_piksi_ip_address, '1.2.3.10')
62  rospy.loginfo("[RR_SWIFTNAV_PIKSI] Piksi IP address: %s", self.piksi_ip_address)
63 
64  path_piksi_port = rospy.search_param('piksi_port')
65  self.piksi_port = rospy.get_param(path_piksi_port, '55555')
66  rospy.loginfo("[RR_SWIFTNAV_PIKSI] Piksi Port: %s", self.piksi_port)
67 
68  path_base_station_ip_address = rospy.search_param('base_station_ip_address')
69  self.base_station_ip_address = rospy.get_param(path_base_station_ip_address, '111.111.111.111')
70  rospy.loginfo("[RR_SWIFTNAV_PIKSI] Base Station IP address: %s", self.base_station_ip_address)
71 
72  path_base_station_port = rospy.search_param('base_station_port')
73  self.base_station_port = rospy.get_param(path_base_station_port, '55555')
74  rospy.loginfo("[RR_SWIFTNAV_PIKSI] Base Station Port: %s", self.base_station_port)
75 
76  path_computer_ip_address = rospy.search_param('computer_ip_address')
77  self.computer_ip_address = rospy.get_param(path_computer_ip_address, '1.2.3.55')
78  rospy.loginfo("[RR_SWIFTNAV_PIKSI] Computer IP address: %s", self.computer_ip_address)
79 
80  # Create SwiftNav Callbacks
81  with TCPDriver(self.piksi_ip_address, self.piksi_port) as driver:
82  with Handler(Framer(driver.read, driver.write)) as source:
83  driver.flush()
84  time.sleep(2)
85  source.add_callback(self.publish_baseline_msg, SBP_MSG_BASELINE_NED)
86  source.add_callback(self.publish_imu_msg,SBP_MSG_IMU_RAW)
87  source.add_callback(self.publish_mag_msg,SBP_MSG_MAG_RAW)
88  source.add_callback(self.publish_llh_msg,SBP_MSG_POS_LLH)
89  source.start
90 
91  rospy.spin()
92 
93 
94  def cmd_vel_cb(self, cmd_vel):
95  if cmd_vel.twist.linear.x > 0:
96  self.drive_direction = "forward"
97  if cmd_vel.twist.linear.x < 0:
98  self.drive_direction = "reverse"
99 
100 
101  def enable_comms_cb(self, msg):
102  if (msg.data == True):
103  # Note: ncat is the linux networking tool used to tunnel the RTK data through the main PC
104  if (self.ncat_process is None):
105  str_cmd = '/usr/bin/ncat -l ' + str(self.computer_ip_address) + ' ' + str(self.piksi_port) + ' --sh-exec "/usr/bin/ncat ' + str(self.base_station_ip_address) + ' ' + str(self.base_station_port) + '"'
106  rospy.loginfo(str_cmd)
107  self.ncat_process = subprocess.Popen(str_cmd, shell=True)
108  self.comms_enabled = True
109  rospy.loginfo("[RR_SWIFNAV_PIKSI] GPS comms enabled, ncat started")
110  else:
111  rospy.logwarn("[RR_SWIFTNAV_PIKSI] GPS comms already enabled, ignoring request")
112  if (msg.data == False):
113  if (self.ncat_process is not None):
114  subprocess.call(["kill", "-9", "%d" % self.ncat_process.pid])
115  self.ncat_process.wait()
116  os.system('killall ncat')
117  rospy.loginfo("[RR_SWIFT_NAV_PIKSI] GPS comms disables, ncat stopped")
118  self.comms_enabled = False
119  self.ncat_process=None
120  else:
121  rospy.logwarn("[RR_SWIFTNAV_PIKSI] RTK GPS already disabled, ignoring request")
122 
123  def publish_baseline_msg(self, msg, **metadata):
124  if not self.comms_enabled:
125  return
126 
127  # Obtain position and accuracies and convert from mm to m
128  x_pos = float(msg.e)/1000
129  y_pos = float(msg.n)/1000
130  z_pos = float(msg.d)/1000
131  h_accuracy = float(msg.h_accuracy)/1000
132  v_accuracy = float(msg.v_accuracy)/1000
133 
134  if (x_pos,y_pos) == (0.0,0.0):
135  rospy.logwarn_throttle(10,"SwiftNav GPS baseline reported x=0 y=0. Message not published")
136  return
137 
138  # Build the ROS Odometry message
139  ecef_odom_msg = Odometry()
140  ecef_odom_msg.child_frame_id = 'gps_link'
141  ecef_odom_msg.header.stamp = rospy.Time.now()
142  ecef_odom_msg.header.frame_id = 'map'
143  ecef_odom_msg.pose.pose.position.x = x_pos
144  ecef_odom_msg.pose.pose.position.y = y_pos
145  ecef_odom_msg.pose.pose.position.z = 0
146 
147  # Calculate distance travelled since last RTK measurement
148  if self.drive_direction=="forward":
149  delta_x = x_pos - self.previous_x
150  delta_y = y_pos - self.previous_y
151  if self.drive_direction=="reverse":
152  delta_x = self.previous_x - x_pos
153  delta_y = self.previous_y - y_pos
154  distance_travelled = np.sqrt(np.power(delta_x,2) + np.power(delta_y,2))
155 
156  # Normalize the orientation vector
157  if (distance_travelled==0):
158  delta_x_hat = 0
159  delta_y_hat = 0
160  else:
161  delta_x_hat = delta_x / distance_travelled
162  delta_y_hat = delta_y / distance_travelled
163 
164  if (distance_travelled>0.04):
165  angle = np.arctan2(delta_y_hat, delta_x_hat)
166  ecef_odom_msg.pose.pose.orientation.z = 1*np.sin(angle/2)
167  ecef_odom_msg.pose.pose.orientation.w = np.cos(angle/2)
168 
169  # Update the old positions
170  self.previous_x = x_pos
171  self.previous_y = y_pos
172 
173  # Calculate the position covariances using the accuracy reported by the Piksi
174  cov_x = cov_y = h_accuracy
175 
176  # Calculate the orientation covariance, the further we have moved the more accurate orientation is
177  if (0<=distance_travelled and distance_travelled<=0.04):
178  theta_accuracy = 1000
179  elif(0.04<distance_travelled and distance_travelled<=0.01):
180  theta_accuracy = 0.348
181  elif(0.01<distance_travelled and distance_travelled<=0.4):
182  theta_accuracy = 0.174
183  elif(0.4<distance_travelled):
184  theta_accuracy = 0.14
185  else:
186  theta_accuracy = -1
187  rospy.logerr_throttle(5,"distance travelled was negative")
188 
189  cov_theta = theta_accuracy
190  ecef_odom_msg.pose.covariance = [cov_x, 0, 0, 0, 0, 0,
191  0, cov_y, 0, 0, 0, 0,
192  0, 0, 0, 0, 0, 0,
193  0, 0, 0, 0, 0, 0,
194  0, 0, 0, 0, 0, 0,
195  0, 0, 0, 0, 0, cov_theta]
196  # Publish earth-centered-earth-fixed message
197  self.pub_ecef_odom.publish(ecef_odom_msg)
198 
199  def publish_mag_msg(self, msg, **metadata):
200  mag_msg = MagneticField()
201  mag_msg.header.stamp = rospy.Time.now()
202  mag_msg.header.frame_id = 'gps_link'
203  # sbp reports in microteslas, the MagneticField() msg requires field in teslas
204  magscale = 1E-6
205  mag_msg.magnetic_field.x = msg.mag_x*magscale
206  mag_msg.magnetic_field.y = msg.mag_y*magscale
207  mag_msg.magnetic_field.z = msg.mag_z*magscale
208  mag_msg.magnetic_field_covariance = [0,0,0,
209  0,0,0,
210  0,0,0]
211  #publish to /gps/mag/raw
212  self.pub_mag.publish(mag_msg)
213 
214  def publish_imu_msg(self, msg, **metadata):
215  imu_msg = Imu()
216  imu_msg.header.stamp = rospy.Time.now()
217  imu_msg.header.frame_id = 'gps_link'
218  # acc_range scale settings to +- 4g (8192 LSB/g), gyro_range to +-500 (65.6 LSB/deg/s)
219  ascale=9.8/8192.0 # output in meters per second-squared
220  gscale=3.14159/180/65.6 # output in radians per second
221  imu_msg.angular_velocity.x = msg.gyr_x*gscale
222  imu_msg.angular_velocity.y = msg.gyr_y*gscale
223  imu_msg.angular_velocity.z = msg.gyr_z*gscale
224  imu_msg.linear_acceleration.x = msg.acc_x*ascale
225  imu_msg.linear_acceleration.y = msg.acc_y*ascale
226  imu_msg.linear_acceleration.z = msg.acc_z*ascale
227  imu_msg.orientation_covariance = [0,0,0,
228  0,0,0,
229  0,0,0]
230  imu_msg.angular_velocity_covariance= [0,0,0,
231  0,0,0,
232  0,0,0.01]
233  imu_msg.linear_acceleration_covariance= [0.01,0,0,
234  0,0.01,0,
235  0,0,0.01]
236  # Publish to /gps/imu/raw
237  self.pub_imu.publish(imu_msg)
238 
239 
240  def publish_llh_msg(self, msg, **metadata):
241  llh_msg = NavSatFix()
242  llh_msg.latitude = msg.lat
243  llh_msg.longitude = msg.lon
244  llh_msg.altitude = msg.height
245  llh_msg.position_covariance_type = 2
246  llh_msg.position_covariance = [9,0,0,
247  0,9,0,
248  0,0,9]
249  # Publish ROS messages
250  self.pub_llh.publish(llh_msg)
251  self.pub_llh_n_sats.publish(Int32(msg.n_sats))
252  self.pub_llh_fix_mode.publish(Int32(msg.flags))
253 
254 if __name__ == "__main__":
255  rospy.init_node('rr_swiftnav_gps_node')
256  swift_nav_driver = SwiftNavDriver()
257 
258 
259 
def publish_llh_msg(self, msg, metadata)
def publish_mag_msg(self, msg, metadata)
def publish_imu_msg(self, msg, metadata)
def publish_baseline_msg(self, msg, metadata)


rr_swiftnav_piksi
Author(s):
autogenerated on Sun Feb 14 2021 03:32:52