00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 import csv
00021 import time
00022 import rospy
00023
00024 from mavros.srv import ParamPull, ParamPush, ParamGet, ParamSet
00025
00026
00027 class Parameter(object):
00028 """Class representing one parameter"""
00029 def __init__(self, param_id, param_value=0):
00030 self.param_id = param_id
00031 self.param_value = param_value
00032
00033 def __repr__(self):
00034 return "<Parameter '{}': {}>".format(self.param_id, self.param_value)
00035
00036
00037 class ParamFile(object):
00038 """Base class for param file parsers"""
00039 def __init__(self, args):
00040 pass
00041
00042 def read(self, file_):
00043 """Returns a iterable of Parameters"""
00044 raise NotImplementedError
00045
00046 def write(self, file_, parametes):
00047 """Writes Parameters to file"""
00048 raise NotImplementedError
00049
00050
00051 class MissionPlannerParam(ParamFile):
00052 """Parse MissionPlanner param files"""
00053
00054 class CSVDialect(csv.Dialect):
00055 delimiter = ','
00056 doublequote = False
00057 skipinitialspace = True
00058 lineterminator = '\r\n'
00059 quoting = csv.QUOTE_NONE
00060
00061 def read(self, file_):
00062 to_numeric = lambda x: float(x) if '.' in x else int(x)
00063
00064 for data in csv.reader(file_, self.CSVDialect):
00065 if data[0].startswith('#'):
00066 continue
00067
00068 if len(data) != 2:
00069 raise ValueError("wrong field count")
00070
00071 yield Parameter(data[0].strip(), to_numeric(data[1]));
00072
00073 def write(self, file_, parameters):
00074 writer = csv.writer(file_, self.CSVDialect)
00075 writer.writerow(("#NOTE: " + time.strftime("%d.%m.%Y %T") ,))
00076 for p in parameters:
00077 writer.writerow((p.param_id, p.param_value))
00078
00079
00080 class QGroundControlParam(ParamFile):
00081 """Parse QGC param files"""
00082
00083 class CSVDialect(csv.Dialect):
00084 delimiter = '\t'
00085 doublequote = False
00086 skipinitialspace = True
00087 lineterminator = '\n'
00088 quoting = csv.QUOTE_NONE
00089
00090 def __init__(self, args):
00091 self.mavros_ns = args.mavros_ns
00092
00093 def read(self, file_):
00094 to_numeric = lambda x: float(x) if '.' in x else int(x)
00095
00096 for data in csv.reader(file_, self.CSVDialect):
00097 if data[0].startswith('#'):
00098 continue
00099
00100 if len(data) != 5:
00101 raise ValueError("wrong field count")
00102
00103 yield Parameter(data[2].strip(), to_numeric(data[3]));
00104
00105 def write(self, file_, parameters):
00106 def to_type(x):
00107 if isinstance(x, float):
00108 return 9
00109 elif isinstance(x, int):
00110 return 6
00111 else:
00112 raise ValueError("unknown type: " + repr(type(x)))
00113
00114 sysid = rospy.get_param(self.mavros_ns + "/target_system_id", 1)
00115 compid = rospy.get_param(self.mavros_ns + "/target_component_id", 1)
00116
00117 writer = csv.writer(file_, self.CSVDialect)
00118 writer.writerow(("# NOTE: " + time.strftime("%d.%m.%Y %T"), ))
00119 writer.writerow(("# Onboard parameters saved by mavparam for ({}, {})".format(sysid, compid), ))
00120 writer.writerow(("# MAV ID" , "COMPONENT ID", "PARAM NAME", "VALUE", "(TYPE)"))
00121 for p in parameters:
00122 writer.writerow((sysid, compid, p.param_id, p.param_value, to_type(p.param_value), ))
00123
00124
00125 def param_ret_value(ret):
00126 if ret.integer != 0:
00127 return ret.integer
00128 elif ret.real != 0.0:
00129 return ret.real
00130 else:
00131 return 0
00132
00133
00134 def param_get(param_id, ns="/mavros"):
00135 try:
00136 get_cl = rospy.ServiceProxy(ns + "/param/get", ParamGet)
00137 ret = get_cl(param_id=param_id)
00138 except rospy.ServiceException as ex:
00139 raise IOError(str(ex))
00140
00141 if not ret.success:
00142 raise IOError("Request failed.")
00143
00144 return param_ret_value(ret)
00145
00146
00147 def param_set(param_id, value, ns="/mavros"):
00148 if isinstance(value, float):
00149 val_f = value
00150 val_i = 0
00151 else:
00152 val_f = 0.0
00153 val_i = value
00154
00155 try:
00156 set_cl = rospy.ServiceProxy(ns + "/param/set", ParamSet)
00157 ret = set_cl(param_id=param_id,
00158 integer=val_i,
00159 real=val_f
00160 )
00161 except rospy.ServiceException as ex:
00162 raise IOError(str(ex))
00163
00164 if not ret.success:
00165 raise IOError("Request failed.")
00166
00167 return param_ret_value(ret)
00168
00169
00170 def param_get_all(force_pull=False, ns="/mavros"):
00171 try:
00172 pull_cl = rospy.ServiceProxy(ns + "/param/pull", ParamPull)
00173 ret = pull_cl(force_pull=force_pull)
00174 except rospy.ServiceException as ex:
00175 raise IOError(str(ex))
00176
00177 if not ret.success:
00178 raise IOError("Request failed.")
00179
00180 params = rospy.get_param(ns + "/param")
00181
00182 return (ret.param_received,
00183 sorted((Parameter(k, v) for k, v in params.iteritems()),
00184 cmp=lambda x, y: cmp(x.param_id, y.param_id))
00185 )
00186
00187 def param_set_list(param_list, ns="/mavros"):
00188
00189 for p in param_list:
00190 rospy.set_param(ns + "/param/" + p.param_id, p.param_value)
00191
00192
00193 try:
00194 push_cl = rospy.ServiceProxy(ns + "/param/push", ParamPush)
00195 ret = push_cl()
00196 except rospy.ServiceException as ex:
00197 raise IOError(str(ex))
00198
00199 if not ret.success:
00200 raise IOError("Request failed.")
00201
00202 return ret.param_transfered