SdoConfiguration.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # -*- coding: euc-jp -*-
3 
4 
16 
17 import sys
18 import copy
19 import threading
20 
21 import OpenRTM_aist
22 
33 import SDOPackage, SDOPackage__POA
34 
35 
36 
37 # SdoConfiguration with SeqEx 159120
38 # SdoConfiguration with SeqUtil 114504 114224
39 
40 
41 
61 def toProperties(prop, conf):
62  OpenRTM_aist.NVUtil.copyToProperties(prop, conf.configuration_data)
63 
64 
65 
87 def toConfigurationSet(conf, prop):
88  conf.description = prop.getProperty("description")
89  conf.id = prop.getName()
90  OpenRTM_aist.NVUtil.copyFromProperties(conf.configuration_data, prop)
91 
92 
93 
94 
164 class Configuration_impl(SDOPackage__POA.Configuration):
165  """
166  """
167 
168 
188  def __init__(self, configAdmin, sdoServiceAdmin):
189  """
190  \var self._deviceProfile SDO DeviceProfile with mutex lock
191  """
192  self._deviceProfile = SDOPackage.DeviceProfile("","","","",[])
193  self._dprofile_mutex = threading.RLock()
194 
195  """
196  \var self._serviceProfiles SDO ServiceProfileList
197  """
199  self._sprofile_mutex = threading.RLock()
200 
201  self._parameters = []
202  self._params_mutex = threading.RLock()
203 
204  self._configsets = configAdmin
205  self._config_mutex = threading.RLock()
206 
207  self._sdoservice = sdoServiceAdmin
208 
209  """
210  \var self._organizations SDO OrganizationList
211  """
212  self._organizations = []
213  self._org_mutex = threading.RLock()
214 
215  self._objref = self._this()
216  self._rtcout = OpenRTM_aist.Manager.instance().getLogbuf("rtobject.sdo_config")
217 
218 
219  #============================================================
220  #
221  # <<< CORBA interfaces >>>
222  #
223  #============================================================
224 
225 
266  def set_device_profile(self, dProfile):
267  self._rtcout.RTC_TRACE("set_device_profile()")
268  if dProfile is None:
269  raise SDOPackage.InvalidParameter("dProfile is empty.")
270 
271  try:
273  self._deviceProfile = dProfile
274  except:
275  self._rtcout.RTC_ERROR(OpenRTM_aist.Logger.print_exception())
276  raise SDOPackage.InternalError("Unknown Error")
277 
278  return True
279 
280 
281 
327  def add_service_profile(self, sProfile):
328  self._rtcout.RTC_TRACE("add_service_profile()")
329  if sProfile is None:
330  raise SDOPackage.InvalidParameter("sProfile is empty.")
331 
332  try:
333  return self._sdoservice.addSdoServiceConsumer(sProfile)
334  except:
335  self._rtcout.RTC_ERROR(OpenRTM_aist.Logger.print_exception())
336  raise SDOPackage.InternalError("Configuration.add_service_profile")
337 
338  return False
339 
340 
341 
377  def add_organization(self, org):
378  self._rtcout.RTC_TRACE("add_organization()")
379  if org is None:
380  raise SDOPackage.InvalidParameter("org is empty.")
381 
382  try:
384  except:
385  self._rtcout.RTC_ERROR(OpenRTM_aist.Logger.print_exception())
386  raise SDOPackage.InternalError("Configuration.add_organization")
387 
388  return True
389 
390 
391 
433  def remove_service_profile(self, id_):
434  self._rtcout.RTC_TRACE("remove_service_profile(%s)", id_)
435  if id_ is None:
436  raise SDOPackage.InvalidParameter("id is empty.")
437 
438  try:
439  return self._sdoservice.removeSdoServiceConsumer(id_)
440  except:
441  self._rtcout.RTC_ERROR(OpenRTM_aist.Logger.print_exception())
442  raise SDOPackage.InternalError("Configuration.remove_service_profile")
443 
444  return False
445 
446 
447 
487  def remove_organization(self, organization_id):
488  self._rtcout.RTC_TRACE("remove_organization(%s)", organization_id)
489  if organization_id is None:
490  raise SDOPackage.InvalidParameter("organization_id is empty.")
491 
492  try:
493  guard = OpenRTM_aist.ScopedLock(self._org_mutex)
495  self.org_id(organization_id))
496  except:
497  self._rtcout.RTC_ERROR(OpenRTM_aist.Logger.print_exception())
498  raise SDOPackage.InternalError("Configuration.remove_organization")
499 
500  return True
501 
502 
503 
538  self._rtcout.RTC_TRACE("get_configuration_parameters()")
539  try:
541  param = copy.copy(self._parameters)
542  return param
543  except:
544  self._rtcout.RTC_ERROR(OpenRTM_aist.Logger.print_exception())
545  raise SDOPackage.InternalError("Configuration.get_configuration_parameters")
546 
547  return []
548 
549 
550 
583  self._rtcout.RTC_TRACE("get_configuration_parameter_values()")
585  nvlist = []
586  return nvlist
587 
588 
589 
632  self._rtcout.RTC_TRACE("get_configuration_parameter_value(%s)", name)
633  if not name:
634  self._rtcout.RTC_ERROR(OpenRTM_aist.Logger.print_exception())
635  raise SDOPackage.InvalidParameter("Name is empty.")
636 
637  return None
638 
639 
640 
684  def set_configuration_parameter(self, name, value):
685  self._rtcout.RTC_TRACE("set_configuration_parameter(%s, value)", name)
686  if name is None or value is None:
687  raise SDOPackage.InvalidParameter("Name/Value is empty.")
688  return True
689 
690 
691 
728  self._rtcout.RTC_TRACE("get_configuration_sets()")
729  try:
731 
732  cf = self._configsets.getConfigurationSets()
733  len_ = len(cf)
734 
735  config_sets = [SDOPackage.ConfigurationSet("","",[]) for i in range(len_)]
736  for i in range(len_):
737  toConfigurationSet(config_sets[i], cf[i])
738 
739  return config_sets
740 
741  except:
742  self._rtcout.RTC_ERROR(OpenRTM_aist.Logger.print_exception())
743  raise SDOPackage.InternalError("Configuration.get_configuration_sets")
744 
745  return []
746 
747 
787  def get_configuration_set(self, config_id):
788  self._rtcout.RTC_TRACE("get_configuration_set(%s)", config_id)
789  if not config_id:
790  raise SDOPackage.InvalidParameter("ID is empty")
791 
793 
794  try:
795  if not self._configsets.haveConfig(config_id):
796  self._rtcout.RTC_ERROR("No such ConfigurationSet")
797  raise SDOPackage.InternalError("No such ConfigurationSet")
798  except:
799  self._rtcout.RTC_ERROR(OpenRTM_aist.Logger.print_exception())
800  raise SDOPackage.InternalError("Unknown exception")
801 
802 
803  configset = self._configsets.getConfigurationSet(config_id)
804 
805  try:
806  config = SDOPackage.ConfigurationSet("","",[])
807  toConfigurationSet(config, configset)
808  return config
809  except:
810  self._rtcout.RTC_ERROR(OpenRTM_aist.Logger.print_exception())
811  raise SDOPackage.InternalError("Configuration::get_configuration_set()")
812 
813  return SDOPackage.ConfigurationSet("","",[])
814 
815 
816 
861  def set_configuration_set_values(self, configuration_set):
862  self._rtcout.RTC_TRACE("set_configuration_set_values()")
863  if not configuration_set or not configuration_set.id:
864  raise SDOPackage.InvalidParameter("ID is empty.")
865 
866  try:
867  conf = OpenRTM_aist.Properties(key=configuration_set.id)
868  toProperties(conf, configuration_set)
869  # ----------------------------------------------------------------------------
870  # Because the format of port-name had been changed from <port_name>
871  # to <instance_name>.<port_name>, the following processing was added.
872  # (since r1648)
873 
874  if conf.findNode("exported_ports"):
875  exported_ports = conf.getProperty("exported_ports").split(",")
876  exported_ports_str = ""
877  for i in range(len(exported_ports)):
878  keyval = exported_ports[i].split(".")
879  if len(keyval) > 2:
880  exported_ports_str += keyval[0] + "." + keyval[-1]
881  else:
882  exported_ports_str += exported_ports[i]
883 
884  if i != (len(exported_ports)-1):
885  exported_ports_str += ","
886 
887  conf.setProperty("exported_ports",exported_ports_str)
888  #---------------------------------------------------------------------------
889  return self._configsets.setConfigurationSetValues(conf)
890  except:
891  self._rtcout.RTC_ERROR(OpenRTM_aist.Logger.print_exception())
892  raise SDOPackage.InternalError("Configuration::set_configuration_set_values()")
893 
894  return True
895 
896 
897 
948  self._rtcout.RTC_TRACE("get_active_configuration_set()")
949  if not self._configsets.isActive():
950  raise SDOPackage.NotAvailable("NotAvailable: Configuration.get_active_configuration_set()")
951 
952  try:
954  config = SDOPackage.ConfigurationSet("","",[])
955  toConfigurationSet(config, self._configsets.getActiveConfigurationSet())
956  return config
957  except:
958  self._rtcout.RTC_ERROR(OpenRTM_aist.Logger.print_exception())
959  raise SDOPackage.InternalError("Configuration.get_active_configuration_set()")
960 
961  return SDOPackage.ConfigurationSet("","",[])
962 
963 
964 
1005  def add_configuration_set(self, configuration_set):
1006  self._rtcout.RTC_TRACE("add_configuration_set()")
1007  if configuration_set is None:
1008  raise SDOPackage.InvalidParameter("configuration_set is empty.")
1009 
1010  try:
1012  config_id = configuration_set.id
1013  config = OpenRTM_aist.Properties(key=config_id)
1014  toProperties(config, configuration_set)
1015  return self._configsets.addConfigurationSet(config)
1016  except:
1017  self._rtcout.RTC_ERROR(OpenRTM_aist.Logger.print_exception())
1018  raise SDOPackage.InternalError("Configuration::add_configuration_set()")
1019 
1020  return True
1021 
1022 
1023 
1062  def remove_configuration_set(self, config_id):
1063  self._rtcout.RTC_TRACE("remove_configuration_set(%s)", config_id)
1064  if not config_id:
1065  raise SDOPackage.InvalidParameter("ID is empty.")
1066 
1067  try:
1069  return self._configsets.removeConfigurationSet(config_id)
1070  except:
1071  self._rtcout.RTC_ERROR(OpenRTM_aist.Logger.print_exception())
1072  raise SDOPackage.InternalError("Configuration.remove_configuration_set()")
1073 
1074  return False
1075 
1076 
1077 
1128  def activate_configuration_set(self, config_id):
1129  self._rtcout.RTC_TRACE("activate_configuration_set(%s)", config_id)
1130  if not config_id:
1131  raise SDOPackage.InvalidParameter("ID is empty.")
1132 
1133  if self._configsets.activateConfigurationSet(config_id):
1134  return True
1135  else:
1136  raise SDOPackage.InternalError("Configuration.activate_configuration_set()")
1137 
1138  return False
1139 
1140 
1141  #============================================================
1142  # end of CORBA interface definition
1143  #============================================================
1144 
1145 
1159  def getObjRef(self):
1160  return self._objref
1161 
1162 
1163 
1177  def getDeviceProfile(self):
1178  return self._deviceProfile
1179 
1180 
1181 
1196  return self._serviceProfiles
1197 
1198 
1199 
1216  def getServiceProfile(self, id):
1218  self.service_id(id))
1219 
1220  if index < 0:
1221  return SDOPackage.ServiceProfile("","",[],None)
1222 
1223  return self._serviceProfiles[index]
1224 
1225 
1226 
1240  def getOrganizations(self):
1241  return self._organizations
1242 
1243 
1244 
1258  def getUUID(self):
1259  return OpenRTM_aist.uuid1()
1260 
1261 
1262  # functor for NVList
1263 
1270  class nv_name:
1271  def __init__(self, name_):
1272  self._name = str(name_)
1273 
1274  def __call__(self, nv):
1275  name_ = str(nv.name)
1276  return self._name == name_
1277 
1278 
1279  # functor for ServiceProfile
1280 
1287  class service_id:
1288  def __init__(self, id_):
1289  self._id = str(id_)
1290 
1291  def __call__(self, s):
1292  id_ = str(s.id)
1293  return self._id == id_
1294 
1295 
1296  # functor for Organization
1297 
1304  class org_id:
1305  def __init__(self, id_):
1306  self._id = str(id_)
1307 
1308  def __call__(self, o):
1309  id_ = str(o.get_organization_id())
1310  return self._id == id_
1311 
1312 
1313  # functor for ConfigurationSet
1314 
1321  class config_id:
1322  def __init__(self, id_):
1323  self._id = str(id_)
1324 
1325  def __call__(self, c):
1326  id_ = str(c.id)
1327  return self._id == id_
def set_device_profile(self, dProfile)
[CORBA interface] Set DeviceProfile of SDO
def add_organization(self, org)
[CORBA interface] Add Organization
def get_configuration_parameters(self)
[CORBA interface] Getting a list of configuration parameter
def remove_service_profile(self, id_)
[CORBA interface] Remove ServiceProfile
def split(input, delimiter)
Split string by delimiter.
Definition: StringUtil.py:323
def push_back(seq, elem)
Push the new element back to the CORBA sequence.
Configuration implementation class.
def copyToProperties(prop, nvlist)
Copy to Proeprties from NVList.
Definition: NVUtil.py:118
The Properties class represents a persistent set of properties.
Definition: Properties.py:83
def add_service_profile(self, sProfile)
[CORBA interface] Set SDO&#39;s ServiceProfile
def remove_organization(self, organization_id)
[CORBA interface] Remove the reference of Organization
def toConfigurationSet(conf, prop)
Copy to NVList from Proeprties.
def remove_configuration_set(self, config_id)
[CORBA interface] Remove ConfigurationSet
def add_configuration_set(self, configuration_set)
[CORBA interface] Add ConfigurationSet
def set_configuration_parameter(self, name, value)
[CORBA interface] Modify the parameter value
def get_configuration_set(self, config_id)
[CORBA interface] Getting a ConfigurationSet
def get_configuration_parameter_value(self, name)
[CORBA interface] Getting value of configuration parameter
def __init__(self, configAdmin, sdoServiceAdmin)
class constructor
def get_configuration_parameter_values(self)
[CORBA interface] Getting value list of configuration parameter
def get_active_configuration_set(self)
[CORBA interface] Get active ConfigurationSet
def find(seq, f)
Return the index of CORBA sequence element that functor matches.
def get_configuration_sets(self)
[CORBA interface] Getting list of ConfigurationSet
def set_configuration_set_values(self, configuration_set)
[CORBA interface] Set ConfigurationSet
def activate_configuration_set(self, config_id)
[CORBA interface] Activate ConfigurationSet
def copyFromProperties(nv, prop)
Copy to NVList from Proeprties.
Definition: NVUtil.py:85
def toProperties(prop, conf)
Copy to Proeprties from NVList.


openrtm_aist_python
Author(s): Shinji Kurihara
autogenerated on Mon Feb 28 2022 23:01:06