param.py
Go to the documentation of this file.
00001 # -*- python -*-
00002 # vim:set ts=4 sw=4 et:
00003 #
00004 # Copyright 2014 Vladimir Ermakov.
00005 #
00006 # This program is free software; you can redistribute it and/or modify
00007 # it under the terms of the GNU General Public License as published by
00008 # the Free Software Foundation; either version 3 of the License, or
00009 # (at your option) any later version.
00010 #
00011 # This program is distributed in the hope that it will be useful, but
00012 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
00013 # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
00014 # for more details.
00015 #
00016 # You should have received a copy of the GNU General Public License along
00017 # with this program; if not, write to the Free Software Foundation, Inc.,
00018 # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
00019 
00020 import csv
00021 import time
00022 import rospy
00023 
00024 from mavros.srv import ParamPull, ParamPush, ParamGet, ParamSet
00025 
00026 
00027 class Parameter(object):
00028     """Class representing one parameter"""
00029     def __init__(self, param_id, param_value=0):
00030         self.param_id = param_id
00031         self.param_value = param_value
00032 
00033     def __repr__(self):
00034         return "<Parameter '{}': {}>".format(self.param_id, self.param_value)
00035 
00036 
00037 class ParamFile(object):
00038     """Base class for param file parsers"""
00039     def __init__(self, args):
00040         pass
00041 
00042     def read(self, file_):
00043         """Returns a iterable of Parameters"""
00044         raise NotImplementedError
00045 
00046     def write(self, file_, parametes):
00047         """Writes Parameters to file"""
00048         raise NotImplementedError
00049 
00050 
00051 class MissionPlannerParam(ParamFile):
00052     """Parse MissionPlanner param files"""
00053 
00054     class CSVDialect(csv.Dialect):
00055         delimiter = ','
00056         doublequote = False
00057         skipinitialspace = True
00058         lineterminator = '\r\n'
00059         quoting = csv.QUOTE_NONE
00060 
00061     def read(self, file_):
00062         to_numeric = lambda x: float(x) if '.' in x else int(x)
00063 
00064         for data in csv.reader(file_, self.CSVDialect):
00065             if data[0].startswith('#'):
00066                 continue # skip comments
00067 
00068             if len(data) != 2:
00069                 raise ValueError("wrong field count")
00070 
00071             yield Parameter(data[0].strip(), to_numeric(data[1]));
00072 
00073     def write(self, file_, parameters):
00074         writer = csv.writer(file_, self.CSVDialect)
00075         writer.writerow(("#NOTE: " + time.strftime("%d.%m.%Y %T") ,))
00076         for p in parameters:
00077             writer.writerow((p.param_id, p.param_value))
00078 
00079 
00080 class QGroundControlParam(ParamFile):
00081     """Parse QGC param files"""
00082 
00083     class CSVDialect(csv.Dialect):
00084         delimiter = '\t'
00085         doublequote = False
00086         skipinitialspace = True
00087         lineterminator = '\n'
00088         quoting = csv.QUOTE_NONE
00089 
00090     def __init__(self, args):
00091         self.mavros_ns = args.mavros_ns
00092 
00093     def read(self, file_):
00094         to_numeric = lambda x: float(x) if '.' in x else int(x)
00095 
00096         for data in csv.reader(file_, self.CSVDialect):
00097             if data[0].startswith('#'):
00098                 continue # skip comments
00099 
00100             if len(data) != 5:
00101                 raise ValueError("wrong field count")
00102 
00103             yield Parameter(data[2].strip(), to_numeric(data[3]));
00104 
00105     def write(self, file_, parameters):
00106         def to_type(x):
00107             if isinstance(x, float):
00108                 return 9 # REAL32
00109             elif isinstance(x, int):
00110                 return 6 # INT32
00111             else:
00112                 raise ValueError("unknown type: " + repr(type(x)))
00113 
00114         sysid = rospy.get_param(self.mavros_ns + "/target_system_id", 1)
00115         compid = rospy.get_param(self.mavros_ns + "/target_component_id", 1)
00116 
00117         writer = csv.writer(file_, self.CSVDialect)
00118         writer.writerow(("# NOTE: " + time.strftime("%d.%m.%Y %T"), ))
00119         writer.writerow(("# Onboard parameters saved by mavparam for ({}, {})".format(sysid, compid), ))
00120         writer.writerow(("# MAV ID" , "COMPONENT ID", "PARAM NAME", "VALUE", "(TYPE)"))
00121         for p in parameters:
00122             writer.writerow((sysid, compid, p.param_id, p.param_value, to_type(p.param_value), )) # XXX
00123 
00124 
00125 def param_ret_value(ret):
00126     if ret.integer != 0:
00127         return ret.integer
00128     elif ret.real != 0.0:
00129         return ret.real
00130     else:
00131         return 0
00132 
00133 
00134 def param_get(param_id, ns="/mavros"):
00135     try:
00136         get_cl = rospy.ServiceProxy(ns + "/param/get", ParamGet)
00137         ret = get_cl(param_id=param_id)
00138     except rospy.ServiceException as ex:
00139         raise IOError(str(ex))
00140 
00141     if not ret.success:
00142         raise IOError("Request failed.")
00143 
00144     return param_ret_value(ret)
00145 
00146 
00147 def param_set(param_id, value, ns="/mavros"):
00148     if isinstance(value, float):
00149         val_f = value
00150         val_i = 0
00151     else:
00152         val_f = 0.0
00153         val_i = value
00154 
00155     try:
00156         set_cl = rospy.ServiceProxy(ns + "/param/set", ParamSet)
00157         ret = set_cl(param_id=param_id,
00158                      integer=val_i,
00159                      real=val_f
00160                      )
00161     except rospy.ServiceException as ex:
00162         raise IOError(str(ex))
00163 
00164     if not ret.success:
00165         raise IOError("Request failed.")
00166 
00167     return param_ret_value(ret)
00168 
00169 
00170 def param_get_all(force_pull=False, ns="/mavros"):
00171     try:
00172         pull_cl = rospy.ServiceProxy(ns + "/param/pull", ParamPull)
00173         ret = pull_cl(force_pull=force_pull)
00174     except rospy.ServiceException as ex:
00175         raise IOError(str(ex))
00176 
00177     if not ret.success:
00178         raise IOError("Request failed.")
00179 
00180     params = rospy.get_param(ns + "/param")
00181 
00182     return (ret.param_received,
00183             sorted((Parameter(k, v) for k, v in params.iteritems()),
00184                    cmp=lambda x, y: cmp(x.param_id, y.param_id))
00185             )
00186 
00187 def param_set_list(param_list, ns="/mavros"):
00188     # 1. load parameters to parameter server
00189     for p in param_list:
00190         rospy.set_param(ns + "/param/" + p.param_id, p.param_value)
00191 
00192     # 2. request push all
00193     try:
00194         push_cl = rospy.ServiceProxy(ns + "/param/push", ParamPush)
00195         ret = push_cl()
00196     except rospy.ServiceException as ex:
00197         raise IOError(str(ex))
00198 
00199     if not ret.success:
00200         raise IOError("Request failed.")
00201 
00202     return ret.param_transfered


mavros
Author(s): Vladimir Ermakov
autogenerated on Wed Aug 26 2015 12:29:13