00001
00002
00003
00004
00005
00006
00007
00008
00009
00010 import csv
00011 import time
00012 import rospy
00013 import mavros
00014
00015 from mavros_msgs.msg import ParamValue
00016 from mavros_msgs.srv import ParamPull, ParamPush, ParamGet, ParamSet
00017
00018
00019 class Parameter(object):
00020 """Class representing one parameter"""
00021 def __init__(self, param_id, param_value=0):
00022 self.param_id = param_id
00023 self.param_value = param_value
00024
00025 def __repr__(self):
00026 return "<Parameter '{}': {}>".format(self.param_id, self.param_value)
00027
00028
00029 class ParamFile(object):
00030 """Base class for param file parsers"""
00031 def __init__(self, args):
00032 pass
00033
00034 def read(self, file_):
00035 """Returns a iterable of Parameters"""
00036 raise NotImplementedError
00037
00038 def write(self, file_, parametes):
00039 """Writes Parameters to file"""
00040 raise NotImplementedError
00041
00042
00043 class MissionPlannerParam(ParamFile):
00044 """Parse MissionPlanner param files"""
00045
00046 class CSVDialect(csv.Dialect):
00047 delimiter = ','
00048 doublequote = False
00049 skipinitialspace = True
00050 lineterminator = '\r\n'
00051 quoting = csv.QUOTE_NONE
00052
00053 def read(self, file_):
00054 to_numeric = lambda x: float(x) if '.' in x else int(x)
00055
00056 for data in csv.reader(file_, self.CSVDialect):
00057 if data[0].startswith('#'):
00058 continue
00059
00060 if len(data) != 2:
00061 raise ValueError("wrong field count")
00062
00063 yield Parameter(data[0].strip(), to_numeric(data[1]));
00064
00065 def write(self, file_, parameters):
00066 writer = csv.writer(file_, self.CSVDialect)
00067 writer.writerow(("#NOTE: " + time.strftime("%d.%m.%Y %T") ,))
00068 for p in parameters:
00069 writer.writerow((p.param_id, p.param_value))
00070
00071
00072 class QGroundControlParam(ParamFile):
00073 """Parse QGC param files"""
00074
00075 class CSVDialect(csv.Dialect):
00076 delimiter = '\t'
00077 doublequote = False
00078 skipinitialspace = True
00079 lineterminator = '\n'
00080 quoting = csv.QUOTE_NONE
00081
00082 def read(self, file_):
00083 to_numeric = lambda x: float(x) if '.' in x else int(x)
00084
00085 for data in csv.reader(file_, self.CSVDialect):
00086 if data[0].startswith('#'):
00087 continue
00088
00089 if len(data) != 5:
00090 raise ValueError("wrong field count")
00091
00092 yield Parameter(data[2].strip(), to_numeric(data[3]));
00093
00094 def write(self, file_, parameters):
00095 def to_type(x):
00096 if isinstance(x, float):
00097 return 9
00098 elif isinstance(x, int):
00099 return 6
00100 else:
00101 raise ValueError("unknown type: " + repr(type(x)))
00102
00103 sysid = rospy.get_param(mavros.get_topic('target_system_id'), 1)
00104 compid = rospy.get_param(mavros.get_topic('target_component_id'), 1)
00105
00106 writer = csv.writer(file_, self.CSVDialect)
00107 writer.writerow(("# NOTE: " + time.strftime("%d.%m.%Y %T"), ))
00108 writer.writerow(("# Onboard parameters saved by mavparam for ({}, {})".format(sysid, compid), ))
00109 writer.writerow(("# MAV ID" , "COMPONENT ID", "PARAM NAME", "VALUE", "(TYPE)"))
00110 for p in parameters:
00111 writer.writerow((sysid, compid, p.param_id, p.param_value, to_type(p.param_value), ))
00112
00113
00114 def param_ret_value(ret):
00115 if ret.value.integer != 0:
00116 return ret.value.integer
00117 elif ret.value.real != 0.0:
00118 return ret.value.real
00119 else:
00120 return 0
00121
00122
00123 def param_get(param_id):
00124 try:
00125 get = rospy.ServiceProxy(mavros.get_topic('param', 'get'), ParamGet)
00126 ret = get(param_id=param_id)
00127 except rospy.ServiceException as ex:
00128 raise IOError(str(ex))
00129
00130 if not ret.success:
00131 raise IOError("Request failed.")
00132
00133 return param_ret_value(ret)
00134
00135
00136 def param_set(param_id, value):
00137 if isinstance(value, float):
00138 val = ParamValue(integer=0, real=value)
00139 else:
00140 val = ParamValue(integer=value, real=0.0)
00141
00142 try:
00143 set = rospy.ServiceProxy(mavros.get_topic('param', 'set'), ParamSet)
00144 ret = set(param_id=param_id, value=val)
00145 except rospy.ServiceException as ex:
00146 raise IOError(str(ex))
00147
00148 if not ret.success:
00149 raise IOError("Request failed.")
00150
00151 return param_ret_value(ret)
00152
00153
00154 def param_get_all(force_pull=False):
00155 try:
00156 pull = rospy.ServiceProxy(mavros.get_topic('param', 'pull'), ParamPull)
00157 ret = pull(force_pull=force_pull)
00158 except rospy.ServiceException as ex:
00159 raise IOError(str(ex))
00160
00161 if not ret.success:
00162 raise IOError("Request failed.")
00163
00164 params = rospy.get_param(mavros.get_topic('param'))
00165
00166 return (ret.param_received,
00167 sorted((Parameter(k, v) for k, v in params.iteritems()),
00168 cmp=lambda x, y: cmp(x.param_id, y.param_id))
00169 )
00170
00171
00172 def param_set_list(param_list):
00173
00174 for p in param_list:
00175 rospy.set_param(mavros.get_topic('param', p.param_id), p.param_value)
00176
00177
00178 try:
00179 push = rospy.ServiceProxy(mavros.get_topic('param', 'push'), ParamPush)
00180 ret = push()
00181 except rospy.ServiceException as ex:
00182 raise IOError(str(ex))
00183
00184 if not ret.success:
00185 raise IOError("Request failed.")
00186
00187 return ret.param_transfered