PortAdmin.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # -*- coding: euc-jp -*-
3 
4 
16 
17 import traceback
18 import sys
19 from omniORB import CORBA
20 
21 import RTC
22 import OpenRTM_aist
23 
24 
25 
26 
41 class PortAdmin:
42  """
43  """
44 
45 
46 
47 
54  class comp_op:
55  def __init__(self, name=None, factory=None):
56  if name:
57  self._name = name
58  if factory:
59  self._name = factory.getProfile().name
60 
61  def __call__(self, obj):
62  name_ = obj.getProfile().name
63  return self._name == name_
64 
65 
66 
73  def __init__(self, name):
74  self._name = name
75 
76  def __call__(self, p):
77  prof = p.get_port_profile()
78  name_ = prof.name
79  return self._name == name_
80 
81 
82 
88  class del_port:
89  def __init__(self, pa):
90  self._pa = pa
91  return
92 
93  def __call__(self, p):
94  self._pa.deletePort(p)
95 
96 
97  class find_port:
98  # find_port(const PortService_ptr& p) : m_port(p) {};
99  def __init__(self, p):
100  self._port = p
101 
102  # bool operator()(const PortService_ptr& p)
103  def __call__(self, p):
104  return self._port._is_equivalent(p)
105 
106 
107 
120  def __init__(self, orb, poa):
121  # ORB ���֥������� self._orb = orb # POA ���֥������� self._poa = poa # Port�Υ��֥������ȥ�ե���󥹤Υꥹ��. PortServiceList self._portRefs = [] # �����Х�Ȥ�ľ�ܳ�Ǽ���륪�֥������ȥޥ͡����� self._portServants = OpenRTM_aist.ObjectManager(self.comp_op) self._rtcout = OpenRTM_aist.Manager.instance().getLogbuf("PortAdmin") ## # @if jp # # @brief Port �ꥹ�Ȥμ��� # # registerPort() �ˤ����Ͽ���줿 Port �� �ꥹ�Ȥ�������롣 # # @param self # # @return Port �ꥹ�� # # @else # # @brief Get PortServiceList # # This operation returns the pointer to the PortServiceList of Ports regsitered # by registerPort(). # # @return PortServiceList+ The pointer points PortServiceList # # @endif def getPortServiceList(self): return self._portRefs ## # @if jp # # @brief PorProfile �ꥹ�Ȥμ��� # # addPort() �ˤ����Ͽ���줿 Port �� Profile �ꥹ�Ȥ�������롣 # # @return PortProfile �ꥹ�� # # @else # # @brief Get PorProfileList # # This operation gets the Profile list of Ports registered by # addPort(). # # @return The pointer points PortProfile list # # @endif # def getPortProfileList(self): ret = [] for p in self._portRefs: ret.append(p.get_port_profile()) return ret ## # @if jp # # @brief Port �Υ��֥������Ȼ��Ȥμ��� # # port_name �ǻ��ꤷ�� Port �Υ��֥������Ȼ��Ȥ��֤��� # port_name �ǻ��ꤹ�� Port �Ϥ��餫���� registerPort() ����Ͽ����Ƥ� # �ʤ���Фʤ�ʤ��� # # @param self # @param port_name ���Ȥ��֤�Port��̾�� # # @return Port_ptr Port�Υ��֥������Ȼ��� # # @else # # @brief Get PortServiceList # # This operation returns the pointer to the PortServiceList of Ports regsitered # by registerPort(). # # @param port_name The name of Port to be returned the reference. # # @return Port_ptr Port's object reference. # # @endif def getPortRef(self, port_name): index = OpenRTM_aist.CORBA_SeqUtil.find(self._portRefs, self.find_port_name(port_name)) if index >= 0: return self._portRefs[index] return None ## # @if jp # # @brief Port �Υ����Х�ȤΥݥ��󥿤μ��� # # port_name �ǻ��ꤷ�� Port �Υ����Х�ȤΥݥ��󥿤��֤��� # port_name �ǻ��ꤹ�� Port �Ϥ��餫���� registerPort() ����Ͽ����Ƥ� # �ʤ���Фʤ�ʤ��� # # @param self # @param port_name ���Ȥ��֤�Port��̾�� # # @return PortBase* Port�����Х�ȴ��쥯�饹�Υݥ��� # # @else # # @brief Getpointer to the Port's servant # # This operation returns the pointer to the PortBase servant regsitered # by registerPort(). # # @param port_name The name of Port to be returned the servant pointer. # # @return PortBase* Port's servant's pointer. # # @endif def getPort(self, port_name): return self._portServants.find(port_name) ## # @if jp # # @brief Port ����Ͽ���� # # ���� port �ǻ��ꤵ�줿 Port �Υ����Х�Ȥ���Ͽ���롣 # ��Ͽ���줿 Port �Υ����Х�Ȥϥ��󥹥ȥ饯����Ϳ����줿POA ��� # activate ���졢���Υ��֥������Ȼ��Ȥ�Port��Profile�˥��åȤ���롣 # # @param self # @param port Port �����Х�� # # @else # # @brief Regsiter Port # # This operation registers the Port's servant given by argument. # The given Port's servant will be activated on the POA that is given # to the constructor, and the created object reference is set # to the Port's profile. # # @param port The Port's servant. # # @endif # void registerPort(PortBase& port); def registerPort(self, port): if not self.addPort(port): self._rtcout.RTC_ERROR("registerPort() failed.") return # void registerPort(PortService_ptr port); # def registerPortByReference(self, port_ref): # self.addPortByReference(port_ref) # return # new interface. since 1.0.0-RELEASE # void addPort(PortBase& port); def addPort(self, port): if isinstance(port, RTC._objref_PortService): index = OpenRTM_aist.CORBA_SeqUtil.find(self._portRefs, self.find_port_name(port.get_port_profile().name)) if index >= 0: return False self._portRefs.append(port) return True else: index = OpenRTM_aist.CORBA_SeqUtil.find(self._portRefs, self.find_port_name(port.getName())) if index >= 0: return False self._portRefs.append(port.getPortRef()) return self._portServants.registerObject(port) return False # new interface. since 1.0.0-RELEASE # void addPort(PortService_ptr port); # def addPortByReference(self, port_ref): # self._portRefs.append(port_ref) # return ## # @if jp # # @brief Port ����Ͽ�������� # # ���� port �ǻ��ꤵ�줿 Port ����Ͽ�������롣 # ������� Port �� deactivate ���졢Port��Profile�Υ�ե���󥹤ˤϡ� # nil�ͤ���������롣 # # @param self # @param port Port �����Х�� # # @else # # @brief Delete the Port's registration # # This operation unregisters the Port's registration. # When the Port is unregistered, Port is deactivated, and the object # reference in the Port's profile is set to nil. # # @param port The Port's servant. # # @endif def deletePort(self, port): if not self.removePort(port): self._rtcout.RTC_ERROR("deletePort(PortBase&) failed.") return # new interface. since 1.0.0-RELEASE def removePort(self, port): try: if isinstance(port,RTC._objref_PortService): OpenRTM_aist.CORBA_SeqUtil.erase_if(self._portRefs, self.find_port(port)) return True port.disconnect_all() tmp = port.getProfile().name OpenRTM_aist.CORBA_SeqUtil.erase_if(self._portRefs, self.find_port_name(tmp)) self._poa.deactivate_object(self._poa.servant_to_id(port)) port.setPortRef(RTC.PortService._nil) if not self._portServants.unregisterObject(tmp): return False return True except: self._rtcout.RTC_ERROR(OpenRTM_aist.Logger.print_exception()) return False ## # @if jp # # @brief ̾�λ���ˤ��Port ����Ͽ�������� # # �����ǻ��ꤵ�줿̾������� Port ����Ͽ�������롣 # ������� Port �� deactivate ���졢Port��Profile�Υ�ե���󥹤ˤϡ� # nil�ͤ���������롣 # # @param self # @param port_name Port ��̾�� # # @else # # @brief Delete the Port' registration # # This operation delete the Port's registration specified by port_ name. # When the Port is unregistered, Port is deactivated, and the object # reference in the Port's profile is set to nil. # # @param port_name The Port's name. # # @endif def deletePortByName(self, port_name): if not port_name: return p = self._portServants.find(port_name) self.removePort(p) return ## # @if jp # @brief ���Ƥ� Port �Υ��󥿡��ե������� activates ���� # @else # @brief Activate all Port interfaces # @endif # void PortAdmin::activatePorts() def activatePorts(self): ports = self._portServants.getObjects() for port in ports: port.activateInterfaces() return ## # @if jp # @brief ���Ƥ� Port �Υ��󥿡��ե������� deactivates ���� # @else # @brief Deactivate all Port interfaces # @endif # void PortAdmin::deactivatePorts() def deactivatePorts(self): ports = self._portServants.getObjects() for port in ports: port.deactivateInterfaces() return ## # @if jp # # @brief ���Ƥ� Port ��deactivate����Ͽ�������� # # ��Ͽ����Ƥ������Ƥ�Port���Ф��ơ������Х�Ȥ�deactivate��Ԥ��� # ��Ͽ�ꥹ�Ȥ��������롣 # # @param self # # @else # # @brief Unregister the Port # # This operation deactivates the all Port and deletes the all Port's # registrations from the list. # # @endif def finalizePorts(self): self.deactivatePorts() ports = [] ports = self._portServants.getObjects() len_ = len(ports) for i in range(len_): idx = (len_ - 1) - i self.removePort(ports[idx])
122  self._orb = orb
123 
124  # POA ���֥������� self._poa = poa # Port�Υ��֥������ȥ�ե���󥹤Υꥹ��. PortServiceList self._portRefs = [] # �����Х�Ȥ�ľ�ܳ�Ǽ���륪�֥������ȥޥ͡����� self._portServants = OpenRTM_aist.ObjectManager(self.comp_op) self._rtcout = OpenRTM_aist.Manager.instance().getLogbuf("PortAdmin") ## # @if jp # # @brief Port �ꥹ�Ȥμ��� # # registerPort() �ˤ����Ͽ���줿 Port �� �ꥹ�Ȥ�������롣 # # @param self # # @return Port �ꥹ�� # # @else # # @brief Get PortServiceList # # This operation returns the pointer to the PortServiceList of Ports regsitered # by registerPort(). # # @return PortServiceList+ The pointer points PortServiceList # # @endif def getPortServiceList(self): return self._portRefs ## # @if jp # # @brief PorProfile �ꥹ�Ȥμ��� # # addPort() �ˤ����Ͽ���줿 Port �� Profile �ꥹ�Ȥ�������롣 # # @return PortProfile �ꥹ�� # # @else # # @brief Get PorProfileList # # This operation gets the Profile list of Ports registered by # addPort(). # # @return The pointer points PortProfile list # # @endif # def getPortProfileList(self): ret = [] for p in self._portRefs: ret.append(p.get_port_profile()) return ret ## # @if jp # # @brief Port �Υ��֥������Ȼ��Ȥμ��� # # port_name �ǻ��ꤷ�� Port �Υ��֥������Ȼ��Ȥ��֤��� # port_name �ǻ��ꤹ�� Port �Ϥ��餫���� registerPort() ����Ͽ����Ƥ� # �ʤ���Фʤ�ʤ��� # # @param self # @param port_name ���Ȥ��֤�Port��̾�� # # @return Port_ptr Port�Υ��֥������Ȼ��� # # @else # # @brief Get PortServiceList # # This operation returns the pointer to the PortServiceList of Ports regsitered # by registerPort(). # # @param port_name The name of Port to be returned the reference. # # @return Port_ptr Port's object reference. # # @endif def getPortRef(self, port_name): index = OpenRTM_aist.CORBA_SeqUtil.find(self._portRefs, self.find_port_name(port_name)) if index >= 0: return self._portRefs[index] return None ## # @if jp # # @brief Port �Υ����Х�ȤΥݥ��󥿤μ��� # # port_name �ǻ��ꤷ�� Port �Υ����Х�ȤΥݥ��󥿤��֤��� # port_name �ǻ��ꤹ�� Port �Ϥ��餫���� registerPort() ����Ͽ����Ƥ� # �ʤ���Фʤ�ʤ��� # # @param self # @param port_name ���Ȥ��֤�Port��̾�� # # @return PortBase* Port�����Х�ȴ��쥯�饹�Υݥ��� # # @else # # @brief Getpointer to the Port's servant # # This operation returns the pointer to the PortBase servant regsitered # by registerPort(). # # @param port_name The name of Port to be returned the servant pointer. # # @return PortBase* Port's servant's pointer. # # @endif def getPort(self, port_name): return self._portServants.find(port_name) ## # @if jp # # @brief Port ����Ͽ���� # # ���� port �ǻ��ꤵ�줿 Port �Υ����Х�Ȥ���Ͽ���롣 # ��Ͽ���줿 Port �Υ����Х�Ȥϥ��󥹥ȥ饯����Ϳ����줿POA ��� # activate ���졢���Υ��֥������Ȼ��Ȥ�Port��Profile�˥��åȤ���롣 # # @param self # @param port Port �����Х�� # # @else # # @brief Regsiter Port # # This operation registers the Port's servant given by argument. # The given Port's servant will be activated on the POA that is given # to the constructor, and the created object reference is set # to the Port's profile. # # @param port The Port's servant. # # @endif # void registerPort(PortBase& port); def registerPort(self, port): if not self.addPort(port): self._rtcout.RTC_ERROR("registerPort() failed.") return # void registerPort(PortService_ptr port); # def registerPortByReference(self, port_ref): # self.addPortByReference(port_ref) # return # new interface. since 1.0.0-RELEASE # void addPort(PortBase& port); def addPort(self, port): if isinstance(port, RTC._objref_PortService): index = OpenRTM_aist.CORBA_SeqUtil.find(self._portRefs, self.find_port_name(port.get_port_profile().name)) if index >= 0: return False self._portRefs.append(port) return True else: index = OpenRTM_aist.CORBA_SeqUtil.find(self._portRefs, self.find_port_name(port.getName())) if index >= 0: return False self._portRefs.append(port.getPortRef()) return self._portServants.registerObject(port) return False # new interface. since 1.0.0-RELEASE # void addPort(PortService_ptr port); # def addPortByReference(self, port_ref): # self._portRefs.append(port_ref) # return ## # @if jp # # @brief Port ����Ͽ�������� # # ���� port �ǻ��ꤵ�줿 Port ����Ͽ�������롣 # ������� Port �� deactivate ���졢Port��Profile�Υ�ե���󥹤ˤϡ� # nil�ͤ���������롣 # # @param self # @param port Port �����Х�� # # @else # # @brief Delete the Port's registration # # This operation unregisters the Port's registration. # When the Port is unregistered, Port is deactivated, and the object # reference in the Port's profile is set to nil. # # @param port The Port's servant. # # @endif def deletePort(self, port): if not self.removePort(port): self._rtcout.RTC_ERROR("deletePort(PortBase&) failed.") return # new interface. since 1.0.0-RELEASE def removePort(self, port): try: if isinstance(port,RTC._objref_PortService): OpenRTM_aist.CORBA_SeqUtil.erase_if(self._portRefs, self.find_port(port)) return True port.disconnect_all() tmp = port.getProfile().name OpenRTM_aist.CORBA_SeqUtil.erase_if(self._portRefs, self.find_port_name(tmp)) self._poa.deactivate_object(self._poa.servant_to_id(port)) port.setPortRef(RTC.PortService._nil) if not self._portServants.unregisterObject(tmp): return False return True except: self._rtcout.RTC_ERROR(OpenRTM_aist.Logger.print_exception()) return False ## # @if jp # # @brief ̾�λ���ˤ��Port ����Ͽ�������� # # �����ǻ��ꤵ�줿̾������� Port ����Ͽ�������롣 # ������� Port �� deactivate ���졢Port��Profile�Υ�ե���󥹤ˤϡ� # nil�ͤ���������롣 # # @param self # @param port_name Port ��̾�� # # @else # # @brief Delete the Port' registration # # This operation delete the Port's registration specified by port_ name. # When the Port is unregistered, Port is deactivated, and the object # reference in the Port's profile is set to nil. # # @param port_name The Port's name. # # @endif def deletePortByName(self, port_name): if not port_name: return p = self._portServants.find(port_name) self.removePort(p) return ## # @if jp # @brief ���Ƥ� Port �Υ��󥿡��ե������� activates ���� # @else # @brief Activate all Port interfaces # @endif # void PortAdmin::activatePorts() def activatePorts(self): ports = self._portServants.getObjects() for port in ports: port.activateInterfaces() return ## # @if jp # @brief ���Ƥ� Port �Υ��󥿡��ե������� deactivates ���� # @else # @brief Deactivate all Port interfaces # @endif # void PortAdmin::deactivatePorts() def deactivatePorts(self): ports = self._portServants.getObjects() for port in ports: port.deactivateInterfaces() return ## # @if jp # # @brief ���Ƥ� Port ��deactivate����Ͽ�������� # # ��Ͽ����Ƥ������Ƥ�Port���Ф��ơ������Х�Ȥ�deactivate��Ԥ��� # ��Ͽ�ꥹ�Ȥ��������롣 # # @param self # # @else # # @brief Unregister the Port # # This operation deactivates the all Port and deletes the all Port's # registrations from the list. # # @endif def finalizePorts(self): self.deactivatePorts() ports = [] ports = self._portServants.getObjects() len_ = len(ports) for i in range(len_): idx = (len_ - 1) - i self.removePort(ports[idx])
125  self._poa = poa
126 
127  # Port�Υ��֥������ȥ�ե���󥹤Υꥹ��. PortServiceList
128  self._portRefs = []
129 
130  # �����Х�Ȥ�ľ�ܳ�Ǽ���륪�֥������ȥޥ͡����� self._portServants = OpenRTM_aist.ObjectManager(self.comp_op) self._rtcout = OpenRTM_aist.Manager.instance().getLogbuf("PortAdmin") ## # @if jp # # @brief Port �ꥹ�Ȥμ��� # # registerPort() �ˤ����Ͽ���줿 Port �� �ꥹ�Ȥ�������롣 # # @param self # # @return Port �ꥹ�� # # @else # # @brief Get PortServiceList # # This operation returns the pointer to the PortServiceList of Ports regsitered # by registerPort(). # # @return PortServiceList+ The pointer points PortServiceList # # @endif def getPortServiceList(self): return self._portRefs ## # @if jp # # @brief PorProfile �ꥹ�Ȥμ��� # # addPort() �ˤ����Ͽ���줿 Port �� Profile �ꥹ�Ȥ�������롣 # # @return PortProfile �ꥹ�� # # @else # # @brief Get PorProfileList # # This operation gets the Profile list of Ports registered by # addPort(). # # @return The pointer points PortProfile list # # @endif # def getPortProfileList(self): ret = [] for p in self._portRefs: ret.append(p.get_port_profile()) return ret ## # @if jp # # @brief Port �Υ��֥������Ȼ��Ȥμ��� # # port_name �ǻ��ꤷ�� Port �Υ��֥������Ȼ��Ȥ��֤��� # port_name �ǻ��ꤹ�� Port �Ϥ��餫���� registerPort() ����Ͽ����Ƥ� # �ʤ���Фʤ�ʤ��� # # @param self # @param port_name ���Ȥ��֤�Port��̾�� # # @return Port_ptr Port�Υ��֥������Ȼ��� # # @else # # @brief Get PortServiceList # # This operation returns the pointer to the PortServiceList of Ports regsitered # by registerPort(). # # @param port_name The name of Port to be returned the reference. # # @return Port_ptr Port's object reference. # # @endif def getPortRef(self, port_name): index = OpenRTM_aist.CORBA_SeqUtil.find(self._portRefs, self.find_port_name(port_name)) if index >= 0: return self._portRefs[index] return None ## # @if jp # # @brief Port �Υ����Х�ȤΥݥ��󥿤μ��� # # port_name �ǻ��ꤷ�� Port �Υ����Х�ȤΥݥ��󥿤��֤��� # port_name �ǻ��ꤹ�� Port �Ϥ��餫���� registerPort() ����Ͽ����Ƥ� # �ʤ���Фʤ�ʤ��� # # @param self # @param port_name ���Ȥ��֤�Port��̾�� # # @return PortBase* Port�����Х�ȴ��쥯�饹�Υݥ��� # # @else # # @brief Getpointer to the Port's servant # # This operation returns the pointer to the PortBase servant regsitered # by registerPort(). # # @param port_name The name of Port to be returned the servant pointer. # # @return PortBase* Port's servant's pointer. # # @endif def getPort(self, port_name): return self._portServants.find(port_name) ## # @if jp # # @brief Port ����Ͽ���� # # ���� port �ǻ��ꤵ�줿 Port �Υ����Х�Ȥ���Ͽ���롣 # ��Ͽ���줿 Port �Υ����Х�Ȥϥ��󥹥ȥ饯����Ϳ����줿POA ��� # activate ���졢���Υ��֥������Ȼ��Ȥ�Port��Profile�˥��åȤ���롣 # # @param self # @param port Port �����Х�� # # @else # # @brief Regsiter Port # # This operation registers the Port's servant given by argument. # The given Port's servant will be activated on the POA that is given # to the constructor, and the created object reference is set # to the Port's profile. # # @param port The Port's servant. # # @endif # void registerPort(PortBase& port); def registerPort(self, port): if not self.addPort(port): self._rtcout.RTC_ERROR("registerPort() failed.") return # void registerPort(PortService_ptr port); # def registerPortByReference(self, port_ref): # self.addPortByReference(port_ref) # return # new interface. since 1.0.0-RELEASE # void addPort(PortBase& port); def addPort(self, port): if isinstance(port, RTC._objref_PortService): index = OpenRTM_aist.CORBA_SeqUtil.find(self._portRefs, self.find_port_name(port.get_port_profile().name)) if index >= 0: return False self._portRefs.append(port) return True else: index = OpenRTM_aist.CORBA_SeqUtil.find(self._portRefs, self.find_port_name(port.getName())) if index >= 0: return False self._portRefs.append(port.getPortRef()) return self._portServants.registerObject(port) return False # new interface. since 1.0.0-RELEASE # void addPort(PortService_ptr port); # def addPortByReference(self, port_ref): # self._portRefs.append(port_ref) # return ## # @if jp # # @brief Port ����Ͽ�������� # # ���� port �ǻ��ꤵ�줿 Port ����Ͽ�������롣 # ������� Port �� deactivate ���졢Port��Profile�Υ�ե���󥹤ˤϡ� # nil�ͤ���������롣 # # @param self # @param port Port �����Х�� # # @else # # @brief Delete the Port's registration # # This operation unregisters the Port's registration. # When the Port is unregistered, Port is deactivated, and the object # reference in the Port's profile is set to nil. # # @param port The Port's servant. # # @endif def deletePort(self, port): if not self.removePort(port): self._rtcout.RTC_ERROR("deletePort(PortBase&) failed.") return # new interface. since 1.0.0-RELEASE def removePort(self, port): try: if isinstance(port,RTC._objref_PortService): OpenRTM_aist.CORBA_SeqUtil.erase_if(self._portRefs, self.find_port(port)) return True port.disconnect_all() tmp = port.getProfile().name OpenRTM_aist.CORBA_SeqUtil.erase_if(self._portRefs, self.find_port_name(tmp)) self._poa.deactivate_object(self._poa.servant_to_id(port)) port.setPortRef(RTC.PortService._nil) if not self._portServants.unregisterObject(tmp): return False return True except: self._rtcout.RTC_ERROR(OpenRTM_aist.Logger.print_exception()) return False ## # @if jp # # @brief ̾�λ���ˤ��Port ����Ͽ�������� # # �����ǻ��ꤵ�줿̾������� Port ����Ͽ�������롣 # ������� Port �� deactivate ���졢Port��Profile�Υ�ե���󥹤ˤϡ� # nil�ͤ���������롣 # # @param self # @param port_name Port ��̾�� # # @else # # @brief Delete the Port' registration # # This operation delete the Port's registration specified by port_ name. # When the Port is unregistered, Port is deactivated, and the object # reference in the Port's profile is set to nil. # # @param port_name The Port's name. # # @endif def deletePortByName(self, port_name): if not port_name: return p = self._portServants.find(port_name) self.removePort(p) return ## # @if jp # @brief ���Ƥ� Port �Υ��󥿡��ե������� activates ���� # @else # @brief Activate all Port interfaces # @endif # void PortAdmin::activatePorts() def activatePorts(self): ports = self._portServants.getObjects() for port in ports: port.activateInterfaces() return ## # @if jp # @brief ���Ƥ� Port �Υ��󥿡��ե������� deactivates ���� # @else # @brief Deactivate all Port interfaces # @endif # void PortAdmin::deactivatePorts() def deactivatePorts(self): ports = self._portServants.getObjects() for port in ports: port.deactivateInterfaces() return ## # @if jp # # @brief ���Ƥ� Port ��deactivate����Ͽ�������� # # ��Ͽ����Ƥ������Ƥ�Port���Ф��ơ������Х�Ȥ�deactivate��Ԥ��� # ��Ͽ�ꥹ�Ȥ��������롣 # # @param self # # @else # # @brief Unregister the Port # # This operation deactivates the all Port and deletes the all Port's # registrations from the list. # # @endif def finalizePorts(self): self.deactivatePorts() ports = [] ports = self._portServants.getObjects() len_ = len(ports) for i in range(len_): idx = (len_ - 1) - i self.removePort(ports[idx])
132 
133  self._rtcout = OpenRTM_aist.Manager.instance().getLogbuf("PortAdmin")
134 
135 
157  return self._portRefs
158 
159 
160 
181  ret = []
182  for p in self._portRefs:
183  ret.append(p.get_port_profile())
184 
185  return ret
186 
187 
188 
214  def getPortRef(self, port_name):
215  index = OpenRTM_aist.CORBA_SeqUtil.find(self._portRefs, self.find_port_name(port_name))
216  if index >= 0:
217  return self._portRefs[index]
218  return None
219 
220 
221 
247  def getPort(self, port_name):
248  return self._portServants.find(port_name)
249 
250 
251 
276  def registerPort(self, port):
277  if not self.addPort(port):
278  self._rtcout.RTC_ERROR("registerPort() failed.")
279  return
280 
281  # void registerPort(PortService_ptr port);
282  # def registerPortByReference(self, port_ref):
283  # self.addPortByReference(port_ref)
284  # return
285 
286  # new interface. since 1.0.0-RELEASE
287  # void addPort(PortBase& port);
288  def addPort(self, port):
289  if isinstance(port, RTC._objref_PortService):
291  self.find_port_name(port.get_port_profile().name))
292  if index >= 0:
293  return False
294  self._portRefs.append(port)
295  return True
296  else:
298  self.find_port_name(port.getName()))
299  if index >= 0:
300  return False
301  self._portRefs.append(port.getPortRef())
302  return self._portServants.registerObject(port)
303  return False
304 
305  # new interface. since 1.0.0-RELEASE
306  # void addPort(PortService_ptr port);
307  # def addPortByReference(self, port_ref):
308  # self._portRefs.append(port_ref)
309  # return
310 
311 
334  def deletePort(self, port):
335  if not self.removePort(port):
336  self._rtcout.RTC_ERROR("deletePort(PortBase&) failed.")
337  return
338 
339  # new interface. since 1.0.0-RELEASE
340  def removePort(self, port):
341  try:
342  if isinstance(port,RTC._objref_PortService):
344  return True
345 
346  port.disconnect_all()
347  tmp = port.getProfile().name
349 
350  self._poa.deactivate_object(self._poa.servant_to_id(port))
351  port.setPortRef(RTC.PortService._nil)
352 
353  if not self._portServants.unregisterObject(tmp):
354  return False
355  return True
356  except:
357  self._rtcout.RTC_ERROR(OpenRTM_aist.Logger.print_exception())
358  return False
359 
360 
361 
384  def deletePortByName(self, port_name):
385  if not port_name:
386  return
387 
388  p = self._portServants.find(port_name)
389  self.removePort(p)
390  return
391 
392 
393 
400  def activatePorts(self):
401  ports = self._portServants.getObjects()
402  for port in ports:
403  port.activateInterfaces()
404 
405  return
406 
407 
408 
415  def deactivatePorts(self):
416  ports = self._portServants.getObjects()
417  for port in ports:
418  port.deactivateInterfaces()
419 
420  return
421 
422 
423 
441  def finalizePorts(self):
442  self.deactivatePorts()
443  ports = []
444  ports = self._portServants.getObjects()
445  len_ = len(ports)
446  for i in range(len_):
447  idx = (len_ - 1) - i
448  self.removePort(ports[idx])
def deletePortByName(self, port_name)
Delete the Port' registration.
Definition: PortAdmin.py:384
def deletePort(self, port)
Delete the Port's registration.
Definition: PortAdmin.py:334
def deactivatePorts(self)
Deactivate all Port interfaces void PortAdmin::deactivatePorts()
Definition: PortAdmin.py:415
def finalizePorts(self)
Unregister the Port.
Definition: PortAdmin.py:441
def getPortRef(self, port_name)
Get PortServiceList.
Definition: PortAdmin.py:214
def getPortServiceList(self)
Get PortServiceList.
Definition: PortAdmin.py:156
def __init__(self, orb, poa)
Constructor.
Definition: PortAdmin.py:120
def __init__(self, name=None, factory=None)
Definition: PortAdmin.py:55
def registerPort(self, port)
Regsiter Port.
Definition: PortAdmin.py:276
def find(seq, f)
Return the index of CORBA sequence element that functor matches.
Functor to delete the Port.
Definition: PortAdmin.py:88
def getPort(self, port_name)
Getpointer to the Port's servant.
Definition: PortAdmin.py:247
def append(dest, src)
Definition: NVUtil.py:386
def getPortProfileList(self)
Get PorProfileList.
Definition: PortAdmin.py:180
def activatePorts(self)
Activate all Port interfaces void PortAdmin::activatePorts()
Definition: PortAdmin.py:400


openrtm_aist_python
Author(s): Shinji Kurihara
autogenerated on Mon Feb 28 2022 23:01:06