1 from omniORB
import CORBA, any, cdrUnmarshal, cdrMarshal
5 import RTC, OpenRTM, SDOPackage, RTM
6 from OpenRTM
import CdrData, OutPortCdr, InPortCdr
10 import string, math, socket
37 ports = self.ref.get_ports()
39 prof = p.get_port_profile()
40 name = prof.name.split(
'.')[1]
50 p = self.
ports[unicode(name)]
53 self.
ports[unicode(name)] = p
63 def service(self, instance_name, type_name="", port_name=""):
64 return findService(self, port_name, type_name, instance_name)
96 cfg = self.ref.get_configuration()
97 cfgsets = cfg.get_configuration_sets()
99 print(
"configuration set is not found")
102 for d
in cfgset.configuration_data:
104 return any.from_any(d.value)
113 def start(self, ec=None, timeout=3.0):
119 ret = ec.activate_component(self.
ref)
120 if ret != RTC.RTC_OK:
121 print (
'[rtm.py] \033[31m Failed to start %s(%s)\033[0m' % \
130 print (
'[rtm.py] \033[31m Failed to start %s(timeout)\033[0m' % \
140 def stop(self, ec=None, timeout=3.0):
146 ret = ec.deactivate_component(self.
ref)
147 if ret != RTC.RTC_OK:
148 print (
'[rtm.py] \033[31m Failed to stop %s(%s)\033[0m' % \
157 print (
'[rtm.py] \033[31m Failed to stop %s(timeout)\033[0m' % \
166 def reset(self, ec=None, timeout=3.0):
171 return ec.reset_component(self.
ref) == RTC.RTC_OK
181 return ec.get_component_state(self.
ref)
205 cprof = self.ref.get_component_profile()
206 return cprof.instance_name
218 uname = os.uname()[0]
219 if uname ==
"Darwin":
230 def load(self, basename, initfunc=""):
231 path = basename + self.
soext 235 self.ref.load_module(path, initfunc)
237 print(
"failed to load", path)
249 print(
'RTC named "' + name +
'" already exists.')
253 args +=
'?instance_name=' + name
254 ref = self.ref.create_component(args)
277 fps = self.ref.get_factory_profiles()
279 for p
in afp.properties:
280 if p.name ==
"implementation_id":
281 fs.append(any.from_any(p.value))
289 crefs = self.ref.get_components()
307 nc = NameComponent(name, kind)
316 global rootnc, orb, nshost, nsport, mgrhost, mgrport
326 rtm_argv = [sys.argv[0]]
327 for i
in range(len(sys.argv)):
328 if sys.argv[i] ==
'-o':
329 rtm_argv.extend([
'-o', sys.argv[i+1]])
331 mc = OpenRTM_aist.ManagerConfig();
332 mc.parseArgs(rtm_argv)
335 print(
"\033[34m[rtm.py] nshost already set as " + str(nshost) +
"\033[0m")
338 nshost = mc._argprop.getProperty(
"corba.nameservers").split(
":")[0]
342 nshost = socket.gethostname()
343 print(
"\033[34m[rtm.py] Failed to parse corba.nameservers, use " + str(nshost) +
" as nshost \033[0m")
346 print(
"\033[34m[rtm.py] nsport already set as " + str(nsport) +
"\033[0m")
349 nsport = int(mc._argprop.getProperty(
"corba.nameservers").split(
":")[1])
354 print(
"\033[34m[rtm.py] Failed to parse corba.nameservers, use " + str(nsport) +
" as nsport \033[0m")
357 print(
"\033[34m[rtm.py] mgrhost already set as " + str(mgrhost) +
"\033[0m")
362 print(
"\033[34m[rtm.py] mgrport already set as " + str(mgrport) +
"\033[0m")
365 mgrport = int(mc._argprop.getProperty(
"corba.master_manager").split(
":")[1])
370 print(
"\033[34m[rtm.py] Failed to parse corba.master_manager, use " + str(mgrport) +
"\033[0m")
372 print(
"\033[34m[rtm.py] configuration ORB with %s:%s\033[0m"%(nshost, nsport))
373 print(
"\033[34m[rtm.py] configuration RTCManager with %s:%s\033[0m"%(mgrhost, mgrport))
374 os.environ[
'ORBInitRef'] =
'NameService=corbaloc:iiop:%s:%s/NameService' % \
378 orb = CORBA.ORB_init(sys.argv, CORBA.ORB_ID)
379 nameserver = orb.resolve_initial_references(
"NameService")
380 rootnc = nameserver._narrow(CosNaming.NamingContext)
381 except omniORB.CORBA.ORB.InvalidName:
382 _, e, _ = sys.exc_info()
383 sys.exit(
'[ERROR] Invalide Name (hostname=%s).\n' % (nshost) +
384 'Make sure the hostname is correct.\n' + str(e))
385 except omniORB.CORBA.TRANSIENT:
387 nameserver = orb.string_to_object(
'corbaloc:iiop:%s:%s/NameService'%(nshost, nsport))
388 rootnc = nameserver._narrow(CosNaming.NamingContext)
390 _, e, _ = sys.exc_info()
391 sys.exit(
'[ERROR] Connection Failed with the Nameserver (hostname=%s port=%s).\n' % (nshost, nsport) +
392 'Make sure the hostname is correct and the Nameserver is running.\n' + str(e))
394 _, e, _ = sys.exc_info()
405 props = System.getProperties()
407 args = [
"-ORBInitRef", corbaloc]
408 orb = ORB.init(args, props)
410 nameserver = orb.resolve_initial_references(
"NameService")
411 return NamingContextHelper.narrow(nameserver)
422 nc = CosNaming.NameComponent(name, kind)
427 print(
"[ERROR] findObject(%r,kind=%r,rnc=%r) rootnc is not found" % (name, kind, rnc))
428 return rnc.resolve(path)
439 print(
"[ERROR] findRTCmanager(hostname=%r,rnc=%r) rootnc is not defined, need to call initCORBA()" % \
445 hostname = socket.gethostname()
447 socket.gethostbyaddr(hostname)
449 _, e, _ = sys.exc_info()
450 sys.exit(
'[ERROR] %s\n' % (str(e)) +
'[ERROR] Could not get hostname for %s\n' % (hostname) +
451 '[ERROR] Make sure that you set the target hostname and address in DNS or /etc/hosts in linux/unix.')
454 def getManagerFromNS(hostname, mgr=None):
462 def getManagerDirectly(hostname, mgr=None):
464 corbaloc =
"corbaloc:iiop:" + hostname +
":" + str(mgrport) +
"/manager" 465 print(
"\033[34m[rtm.py] trying to findRTCManager on port" + str(mgrport) +
"\033[0m")
467 obj = orb.string_to_object(corbaloc)
476 print(
'import CORBA failed in findRTCmanager and neglect it for old python environment.')
479 hostnames = [hostname, hostname.split(
".")[0],
480 socket.gethostbyaddr(hostname)[0],
481 socket.gethostbyaddr(hostname)[0].split(
".")[0]]
483 mgr = getManagerDirectly(h)
or getManagerFromNS(h)
486 print(
"Manager not found")
503 rtc =
RTcomponent(obj._narrow(RTC.DataFlowComponent))
504 cxts = rtc.ref.get_participating_contexts()
518 ports = rtc.get_ports()
519 cprof = rtc.get_component_profile()
520 portname = cprof.instance_name +
"." + name
522 prof = p.get_port_profile()
523 if prof.name == portname:
539 if not ec._is_equivalent(rtc.ec):
542 if ec.add_component(rtc.ref) == RTC.RTC_OK:
545 print(
'error in add_component()')
547 print(rtc.name() +
'is already serialized')
549 _, e, _ = sys.exc_info()
550 print(
"[rtm.py] \033[31m error in serialize %s of %s %s\033[0m" % (rtc.name(), [[r, r.name()]
for r
in rtcs], str(e)))
559 op = outP.get_port_profile()
560 for con_prof
in op.connector_profiles:
561 ports = con_prof.ports
562 if len(ports) == 2
and outP._is_equivalent(ports[0])
and \
563 inP._is_equivalent(ports[1]):
574 op = outP.get_port_profile()
575 iname = inP.get_port_profile().name
576 for con_prof
in op.connector_profiles:
577 ports = con_prof.ports
579 pname = ports[1].get_port_profile().name
581 print(
'[rtm.py] Disconnect %s - %s' %(op.name, iname))
582 outP.disconnect(con_prof.connector_id)
592 prof = port.get_port_profile()
593 prop = prof.properties
595 if p.name ==
"dataport.data_type":
596 return any.from_any(p.value)
608 def connectPorts(outP, inPs, subscription="flush", dataflow="Push", bufferlength=1, rate=1000, pushpolicy="new", interfaceType="corba_cdr"):
609 if not isinstance(inPs, list):
612 print(
'[rtm.py] \033[31m Failed to connect %s to %s(%s)\033[0m' % \
613 (outP, [inP.get_port_profile().name
if inP
else inP
for inP
in inPs], inPs))
617 print(
'[rtm.py] \033[31m Failed to connect %s to %s(%s)\033[0m' % \
618 (outP.get_port_profile().name, inP, inPs))
621 print(
'[rtm.py] %s and %s are already connected' % \
622 (outP.get_port_profile().name, inP.get_port_profile().name))
625 print(
'[rtm.py] \033[31m %s and %s have different data types\033[0m' % \
626 (outP.get_port_profile().name, inP.get_port_profile().name))
628 nv1 = SDOPackage.NameValue(
"dataport.interface_type", any.to_any(interfaceType))
629 nv2 = SDOPackage.NameValue(
"dataport.dataflow_type", any.to_any(dataflow))
630 nv3 = SDOPackage.NameValue(
"dataport.subscription_type", any.to_any(subscription))
631 nv4 = SDOPackage.NameValue(
"dataport.buffer.length", any.to_any(str(bufferlength)))
632 nv5 = SDOPackage.NameValue(
"dataport.publisher.push_rate", any.to_any(str(rate)))
633 nv6 = SDOPackage.NameValue(
"dataport.publisher.push_policy", any.to_any(pushpolicy))
634 nv7 = SDOPackage.NameValue(
"dataport.data_type", any.to_any(
dataTypeOfPort(outP)))
635 con_prof = RTC.ConnectorProfile(
"connector0",
"", [outP, inP],
636 [nv1, nv2, nv3, nv4, nv5, nv6, nv7])
637 print(
'[rtm.py] Connect ' + outP.get_port_profile().name +
' - ' + \
638 inP.get_port_profile().name+
' (dataflow_type='+dataflow+
', subscription_type='+ subscription+
', bufferlength='+str(bufferlength)+
', push_rate='+str(rate)+
', push_policy='+pushpolicy+
')')
639 ret, prof = inP.connect(con_prof)
640 if ret != RTC.RTC_OK:
641 print(
"failed to connect")
645 print(
"connet() returned RTC_OK, but not connected")
653 return cdrMarshal(any.to_any(data).typecode(), data,
True)
661 component_path = fullname.split(
'.')
662 package_name = component_path[0]
663 component_path = component_path[1:]
665 while component_path:
666 class_name = component_path[0]
667 component_path = component_path[1:]
669 attr = getattr(attr, class_name)
671 __import__(str(package_name))
672 attr = getattr(sys.modules[package_name], class_name)
682 return cdrUnmarshal(any.to_any(
classFromString(classname)).typecode(), cdr,
True)
698 global connector_list, orb
700 connector_name =
"writeDataPort" 706 for p
in connector_list:
707 if p[
"port"]._is_equivalent(port):
708 if port.get_connector_profile(p[
"prof"].connector_id).name == connector_name:
711 connector_list.remove(p)
717 nv1 = SDOPackage.NameValue(
"dataport.interface_type", any.to_any(
"corba_cdr"))
718 nv2 = SDOPackage.NameValue(
"dataport.dataflow_type", any.to_any(
"Push"))
719 nv3 = SDOPackage.NameValue(
"dataport.subscription_type", any.to_any(
"flush"))
720 con_prof = RTC.ConnectorProfile(connector_name,
"", [port], [nv1, nv2, nv3])
722 ret, prof = port.connect(con_prof)
724 if ret != RTC.RTC_OK:
725 print(
"failed to connect")
727 connector_list.append({
"port":port,
"prof":prof})
730 for p
in prof.properties:
731 if p.name ==
'dataport.corba_cdr.inport_ior':
732 ior = any.from_any(p.value)
733 obj = orb.string_to_object(ior)
734 inport = obj._narrow(InPortCdr)
736 if inport.put(cdr) != OpenRTM.PORT_OK:
737 print(
"failed to put")
740 port.disconnect(prof.connector_id)
741 for p
in connector_list:
742 if prof.connector_id == p[
"prof"].connector_id:
743 connector_list.remove(p)
745 return prof.connector_id
757 global connector_list, orb
760 connector_name =
"readDataPort" 762 for p
in connector_list:
763 if p[
"port"]._is_equivalent(port):
764 if port.get_connector_profile(p[
"prof"].connector_id).name == connector_name:
767 connector_list.remove(p)
774 pprof = port.get_port_profile()
775 for prop
in pprof.properties:
776 if prop.name ==
"dataport.data_type":
777 classname = any.from_any(prop.value)
781 nv1 = SDOPackage.NameValue(
"dataport.interface_type", any.to_any(
"corba_cdr"))
782 nv2 = SDOPackage.NameValue(
"dataport.dataflow_type", any.to_any(
"Pull"))
783 nv3 = SDOPackage.NameValue(
"dataport.subscription_type", any.to_any(
"flush"))
784 con_prof = RTC.ConnectorProfile(connector_name,
"", [port], [nv1, nv2, nv3])
786 ret, prof = port.connect(con_prof)
788 if ret != RTC.RTC_OK:
789 print(
"failed to connect")
792 connector_list.append({
"port":port,
"prof":prof})
794 for p
in prof.properties:
795 if p.name ==
'dataport.corba_cdr.outport_ior':
796 ior = any.from_any(p.value)
797 obj = orb.string_to_object(ior)
798 outport = obj._narrow(OutPortCdr)
802 ret, data = outport.get()
803 if ret == OpenRTM.PORT_OK:
805 port.disconnect(prof.connector_id)
806 for p
in connector_list:
807 if prof.connector_id == p[
"prof"].connector_id:
808 connector_list.remove(p)
810 tokens = classname.split(
':')
812 classname = tokens[1].replace(
'/',
'.')
827 global connector_list
828 for port
in connector_list:
829 port[
"port"].disconnect(port[
"prof"].connector_id)
830 del connector_list[:]
842 prof = rtc.ref.get_component_profile()
844 port_prof = prof.port_profiles
846 p = rtc.port(port_name)
848 print(
"can't find a port named" + port_name)
851 port_prof = [p.get_port_profile()]
859 if aif.instance_name == instance_name
and \
860 (type_name ==
"" or aif.type_name == type_name)
and \
861 aif.polarity == PROVIDED:
864 print(
"can't find a service named", instance_name)
866 con_prof = RTC.ConnectorProfile(
"noname",
"", [port], [])
867 ret, con_prof = port.connect(con_prof)
868 ior = any.from_any(con_prof.properties[0].value)
869 return orb.string_to_object(ior)
879 cfg = rtc.get_configuration()
880 cfgsets = cfg.get_configuration_sets()
881 if len(cfgsets) == 0:
882 print(
"configuration set is not found")
889 for d
in cfgset.configuration_data:
891 d.value = any.to_any(value)
892 cfg.set_configuration_set_values(cfgset)
897 cfg.activate_configuration_set(
'default')
906 cfg = rtc.get_configuration()
907 cfgsets = cfg.get_configuration_sets()
908 if len(cfgsets) == 0:
909 print(
"configuration set is not found")
912 for nv
in cfgsets[0].configuration_data:
913 ret[nv.name] = any.from_any(nv.value)
923 def narrow(ior, klass, package="OpenHRP"):
924 return ior._narrow(getattr(sys.modules[package], klass))
931 return sys.version.count(
"GCC") == 0
934 if __name__ ==
'__main__':
def getRootNamingContext(corbaloc)
get root naming context
def isActive(self, ec=None)
check the main execution context is active or not
def findService(rtc, port_name, type_name, instance_name)
get a service of RT component
def getProperties(self)
get name-value list of the default configuration set
def readDataPort(port, timeout=1.0, disconnect=True)
read data from a data port
wrapper class of RT component
def disconnectPorts(outP, inP)
disconnect ports
def narrow(ior, klass, package="OpenHRP")
narrow ior
def setConfiguration(rtc, nvlist)
update default configuration set
def start(self, ec=None, timeout=3.0)
activate this component
def load(self, basename, initfunc="")
load RT component factory
def classFromString(fullname)
get class object from class name
def delete(self, name)
create an instance of RT component
def stop(self, ec=None, timeout=3.0)
deactivate this component
def getLifeCycleState(self, ec=None)
get life cycle state of the main execution context
def writeDataPort(port, data, tm=1.0, disconnect=True)
write data to a data port
def findRTCmanager(hostname=None, rnc=None)
get RTCmanager
def port(self, name)
get IOR of port
def get_factory_names(self)
get list of factory names
def getConfiguration(rtc)
get default configuration set
def findObject(name, kind="", rnc=None)
get IOR of the object
def isConnected(outP, inP)
check two ports are connected or not
def unbindObject(name, kind)
unbind an object reference
def create(self, module, name=None)
create an instance of RT component
wrapper class of RTCmanager
def cdr2data(cdr, classname)
convert data from CDR format
def reset(self, ec=None, timeout=3.0)
reset this component
def initCORBA()
initialize ORB
def get_components(self)
get list of components
def findPort(rtc, name)
get a port of RT component
def __init__(self, ref)
constructor
def findRTC(name, rnc=None)
get RT component
def data2cdr(data)
convert data into CDR format
def setConfiguration(self, nvlist)
update default configuration set
def name(self)
get instance name
def isInactive(self, ec=None)
check the main execution context is inactive or not
def restart(self)
restart Manager
def serializeComponents(rtcs, stopEC=True)
set up execution context of the first RTC so that RTCs are executed sequentially
def connectPorts(outP, inPs, subscription="flush", dataflow="Push", bufferlength=1, rate=1000, pushpolicy="new", interfaceType="corba_cdr")
connect ports
def isJython()
check if jython or python
def dataTypeOfPort(port)
get data type of a port
def setProperty(self, name, value)
update value of the default configuration set
def service(self, instance_name, type_name="", port_name="")
get IOR of the service
def __init__(self, ref)
constructor
def getProperty(self, name)
get value of the property in the default configuration set