param.py
Go to the documentation of this file.
00001 # -*- coding: utf-8 -*-
00002 # vim:set ts=4 sw=4 et:
00003 #
00004 # Copyright 2014 Vladimir Ermakov.
00005 #
00006 # This file is part of the mavros package and subject to the license terms
00007 # in the top-level LICENSE file of the mavros repository.
00008 # https://github.com/mavlink/mavros/tree/master/LICENSE.md
00009 
00010 import csv
00011 import time
00012 import rospy
00013 import mavros
00014 
00015 from mavros_msgs.msg import ParamValue
00016 from mavros_msgs.srv import ParamPull, ParamPush, ParamGet, ParamSet
00017 
00018 
00019 class Parameter(object):
00020     """Class representing one parameter"""
00021     def __init__(self, param_id, param_value=0):
00022         self.param_id = param_id
00023         self.param_value = param_value
00024 
00025     def __repr__(self):
00026         return "<Parameter '{}': {}>".format(self.param_id, self.param_value)
00027 
00028 
00029 class ParamFile(object):
00030     """Base class for param file parsers"""
00031     def __init__(self, args):
00032         pass
00033 
00034     def read(self, file_):
00035         """Returns a iterable of Parameters"""
00036         raise NotImplementedError
00037 
00038     def write(self, file_, parametes):
00039         """Writes Parameters to file"""
00040         raise NotImplementedError
00041 
00042 
00043 class MissionPlannerParam(ParamFile):
00044     """Parse MissionPlanner param files"""
00045 
00046     class CSVDialect(csv.Dialect):
00047         delimiter = ','
00048         doublequote = False
00049         skipinitialspace = True
00050         lineterminator = '\r\n'
00051         quoting = csv.QUOTE_NONE
00052 
00053     def read(self, file_):
00054         to_numeric = lambda x: float(x) if '.' in x else int(x)
00055 
00056         for data in csv.reader(file_, self.CSVDialect):
00057             if data[0].startswith('#'):
00058                 continue # skip comments
00059 
00060             if len(data) != 2:
00061                 raise ValueError("wrong field count")
00062 
00063             yield Parameter(data[0].strip(), to_numeric(data[1]));
00064 
00065     def write(self, file_, parameters):
00066         writer = csv.writer(file_, self.CSVDialect)
00067         writer.writerow(("#NOTE: " + time.strftime("%d.%m.%Y %T") ,))
00068         for p in parameters:
00069             writer.writerow((p.param_id, p.param_value))
00070 
00071 
00072 class QGroundControlParam(ParamFile):
00073     """Parse QGC param files"""
00074 
00075     class CSVDialect(csv.Dialect):
00076         delimiter = '\t'
00077         doublequote = False
00078         skipinitialspace = True
00079         lineterminator = '\n'
00080         quoting = csv.QUOTE_NONE
00081 
00082     def read(self, file_):
00083         to_numeric = lambda x: float(x) if '.' in x else int(x)
00084 
00085         for data in csv.reader(file_, self.CSVDialect):
00086             if data[0].startswith('#'):
00087                 continue # skip comments
00088 
00089             if len(data) != 5:
00090                 raise ValueError("wrong field count")
00091 
00092             yield Parameter(data[2].strip(), to_numeric(data[3]));
00093 
00094     def write(self, file_, parameters):
00095         def to_type(x):
00096             if isinstance(x, float):
00097                 return 9 # REAL32
00098             elif isinstance(x, int):
00099                 return 6 # INT32
00100             else:
00101                 raise ValueError("unknown type: " + repr(type(x)))
00102 
00103         sysid = rospy.get_param(mavros.get_topic('target_system_id'), 1)
00104         compid = rospy.get_param(mavros.get_topic('target_component_id'), 1)
00105 
00106         writer = csv.writer(file_, self.CSVDialect)
00107         writer.writerow(("# NOTE: " + time.strftime("%d.%m.%Y %T"), ))
00108         writer.writerow(("# Onboard parameters saved by mavparam for ({}, {})".format(sysid, compid), ))
00109         writer.writerow(("# MAV ID" , "COMPONENT ID", "PARAM NAME", "VALUE", "(TYPE)"))
00110         for p in parameters:
00111             writer.writerow((sysid, compid, p.param_id, p.param_value, to_type(p.param_value), )) # XXX
00112 
00113 
00114 def param_ret_value(ret):
00115     if ret.value.integer != 0:
00116         return ret.value.integer
00117     elif ret.value.real != 0.0:
00118         return ret.value.real
00119     else:
00120         return 0
00121 
00122 
00123 def param_get(param_id):
00124     try:
00125         get = rospy.ServiceProxy(mavros.get_topic('param', 'get'), ParamGet)
00126         ret = get(param_id=param_id)
00127     except rospy.ServiceException as ex:
00128         raise IOError(str(ex))
00129 
00130     if not ret.success:
00131         raise IOError("Request failed.")
00132 
00133     return param_ret_value(ret)
00134 
00135 
00136 def param_set(param_id, value):
00137     if isinstance(value, float):
00138         val = ParamValue(integer=0, real=value)
00139     else:
00140         val = ParamValue(integer=value, real=0.0)
00141 
00142     try:
00143         set = rospy.ServiceProxy(mavros.get_topic('param', 'set'), ParamSet)
00144         ret = set(param_id=param_id, value=val)
00145     except rospy.ServiceException as ex:
00146         raise IOError(str(ex))
00147 
00148     if not ret.success:
00149         raise IOError("Request failed.")
00150 
00151     return param_ret_value(ret)
00152 
00153 
00154 def param_get_all(force_pull=False):
00155     try:
00156         pull = rospy.ServiceProxy(mavros.get_topic('param', 'pull'), ParamPull)
00157         ret = pull(force_pull=force_pull)
00158     except rospy.ServiceException as ex:
00159         raise IOError(str(ex))
00160 
00161     if not ret.success:
00162         raise IOError("Request failed.")
00163 
00164     params = rospy.get_param(mavros.get_topic('param'))
00165 
00166     return (ret.param_received,
00167             sorted((Parameter(k, v) for k, v in params.iteritems()),
00168                    cmp=lambda x, y: cmp(x.param_id, y.param_id))
00169             )
00170 
00171 
00172 def param_set_list(param_list):
00173     # 1. load parameters to parameter server
00174     for p in param_list:
00175         rospy.set_param(mavros.get_topic('param', p.param_id), p.param_value)
00176 
00177     # 2. request push all
00178     try:
00179         push = rospy.ServiceProxy(mavros.get_topic('param', 'push'), ParamPush)
00180         ret = push()
00181     except rospy.ServiceException as ex:
00182         raise IOError(str(ex))
00183 
00184     if not ret.success:
00185         raise IOError("Request failed.")
00186 
00187     return ret.param_transfered


mavros
Author(s): Vladimir Ermakov
autogenerated on Sat Jun 8 2019 19:55:19