python/rtm.py
Go to the documentation of this file.
1 from omniORB import CORBA, any, cdrUnmarshal, cdrMarshal
2 import CosNaming
3 
4 import OpenRTM_aist
5 import RTC, OpenRTM, SDOPackage, RTM
6 from OpenRTM import CdrData, OutPortCdr, InPortCdr
7 from RTC import *
8 
9 import sys
10 import string, math, socket
11 import os
12 import time
13 import re
14 
15 
18 rootnc = None
19 nshost = None
20 nsport = None
21 mgrhost = None
22 mgrport = None
23 
24 
28 
32  def __init__(self, ref):
33  self.ref = ref
34  self.owned_ecs = ref.get_owned_contexts()
35  self.ec = self.owned_ecs[0]
36  self.ports = {}
37  ports = self.ref.get_ports()
38  for p in ports:
39  prof = p.get_port_profile()
40  name = prof.name.split('.')[1]
41  self.ports[name] = p
42 
43 
48  def port(self, name):
49  try:
50  p = self.ports[unicode(name)]
51  except KeyError:
52  p = findPort(self.ref, name)
53  self.ports[unicode(name)] = p
54  return p
55 
56 
63  def service(self, instance_name, type_name="", port_name=""):
64  return findService(self, port_name, type_name, instance_name)
65 
66 
71  def setConfiguration(self, nvlist):
72  return setConfiguration(self.ref, nvlist)
73 
74 
80  def setProperty(self, name, value):
81  return self.setConfiguration([[name, value]])
82 
83 
87  def getProperties(self):
88  return getConfiguration(self.ref)
89 
90 
95  def getProperty(self, name):
96  cfg = self.ref.get_configuration()
97  cfgsets = cfg.get_configuration_sets()
98  if len(cfgsets) == 0:
99  print("configuration set is not found")
100  return None
101  cfgset = cfgsets[0]
102  for d in cfgset.configuration_data:
103  if d.name == name:
104  return any.from_any(d.value)
105  return None
106 
107 
113  def start(self, ec=None, timeout=3.0):
114  if ec == None:
115  ec = self.ec
116  if ec != None:
117  if self.isActive(ec):
118  return True
119  ret = ec.activate_component(self.ref)
120  if ret != RTC.RTC_OK:
121  print ('[rtm.py] \033[31m Failed to start %s(%s)\033[0m' % \
122  (self.name(), ret))
123  return False
124  tm = 0
125  while tm < timeout:
126  if self.isActive(ec):
127  return True
128  time.sleep(0.01)
129  tm += 0.01
130  print ('[rtm.py] \033[31m Failed to start %s(timeout)\033[0m' % \
131  self.name())
132  return False
133 
134 
140  def stop(self, ec=None, timeout=3.0):
141  if ec == None:
142  ec = self.ec
143  if ec != None:
144  if self.isInactive(ec):
145  return True
146  ret = ec.deactivate_component(self.ref)
147  if ret != RTC.RTC_OK:
148  print ('[rtm.py] \033[31m Failed to stop %s(%s)\033[0m' % \
149  (self.name(), ret))
150  return False
151  tm = 0
152  while tm < timeout:
153  if self.isInactive(ec):
154  return True
155  time.sleep(0.01)
156  tm += 0.01
157  print ('[rtm.py] \033[31m Failed to stop %s(timeout)\033[0m' % \
158  self.name())
159  return False
160 
161 
166  def reset(self, ec=None, timeout=3.0):
167  if self.getLifeCycleState(ec) != RTC.ERROR_STATE:
168  return True
169  if ec == None:
170  ec = self.ec
171  return ec.reset_component(self.ref) == RTC.RTC_OK
172 
177  def getLifeCycleState(self, ec=None):
178  if ec == None:
179  ec = self.ec
180  if ec != None:
181  return ec.get_component_state(self.ref)
182  else:
183  return None
184 
185 
190  def isActive(self, ec=None):
191  return RTC.ACTIVE_STATE == self.getLifeCycleState(ec)
192 
193 
198  def isInactive(self, ec=None):
199  return RTC.INACTIVE_STATE == self.getLifeCycleState(ec)
200 
201 
204  def name(self):
205  cprof = self.ref.get_component_profile()
206  return cprof.instance_name
207 
208 
212 
216  def __init__(self, ref):
217  self.ref = ref
218  uname = os.uname()[0]
219  if uname == "Darwin":
220  self.soext = ".dylib"
221  else:
222  self.soext = ".so"
223 
224 
230  def load(self, basename, initfunc=""):
231  path = basename + self.soext
232  if initfunc == "":
233  basename + "Init"
234  try:
235  self.ref.load_module(path, initfunc)
236  except:
237  print("failed to load", path)
238 
239 
245  def create(self, module, name=None):
246  if name != None:
247  rtc = findRTC(name)
248  if rtc != None:
249  print('RTC named "' + name + '" already exists.')
250  return rtc
251  args = module
252  if name != None:
253  args += '?instance_name=' + name
254  ref = self.ref.create_component(args)
255  if ref == None:
256  return None
257  else:
258  return RTcomponent(ref)
259 
260 
264  def delete(self, name):
265  # ref = self.ref.delete_component(name)
266  ref = findRTC(name).ref.exit() # delte_component did not actually kill component, so use rtc.exit https://github.com/fkanehiro/hrpsys-base/pull/512#issuecomment-80430387
267  if ref == RTC_OK:
268  return True
269  else:
270  return False
271 
272 
275  def get_factory_names(self):
276  fs = []
277  fps = self.ref.get_factory_profiles()
278  for afp in fps:
279  for p in afp.properties:
280  if p.name == "implementation_id":
281  fs.append(any.from_any(p.value))
282  return fs
283 
284 
287  def get_components(self):
288  cs = []
289  crefs = self.ref.get_components()
290  for cref in crefs:
291  c = RTcomponent(cref)
292  cs.append(c)
293  return cs
294 
295 
297  def restart(self):
298  self.ref.shutdown()
299  time.sleep(1)
300 
301 
306 def unbindObject(name, kind):
307  nc = NameComponent(name, kind)
308  path = [nc]
309  rootnc.unbind(path)
310  return None
311 
312 
315 def initCORBA():
316  global rootnc, orb, nshost, nsport, mgrhost, mgrport
317 
318  # from omniorb's document
319  # When CORBA::ORB_init() is called, the value for each configuration
320  # parameter is searched for in the following order:
321  # Command line arguments
322  # Environment variables
323  # so init_CORBA will follow this order
324  # first check command line argument
325 
326  rtm_argv = [sys.argv[0]] # extract rtm related args only
327  for i in range(len(sys.argv)):
328  if sys.argv[i] == '-o':
329  rtm_argv.extend(['-o', sys.argv[i+1]])
330 
331  mc = OpenRTM_aist.ManagerConfig();
332  mc.parseArgs(rtm_argv)
333 
334  if nshost != None: # these values can be set via other script like "import rtm; rtm.nshost=XXX"
335  print("\033[34m[rtm.py] nshost already set as " + str(nshost) + "\033[0m")
336  else:
337  try:
338  nshost = mc._argprop.getProperty("corba.nameservers").split(":")[0]
339  if not nshost:
340  raise
341  except:
342  nshost = socket.gethostname() # default
343  print("\033[34m[rtm.py] Failed to parse corba.nameservers, use " + str(nshost) + " as nshost \033[0m")
344 
345  if nsport != None:
346  print("\033[34m[rtm.py] nsport already set as " + str(nsport) + "\033[0m")
347  else:
348  try:
349  nsport = int(mc._argprop.getProperty("corba.nameservers").split(":")[1])
350  if not nsport:
351  raise
352  except:
353  nsport = 15005 # default
354  print("\033[34m[rtm.py] Failed to parse corba.nameservers, use " + str(nsport) + " as nsport \033[0m")
355 
356  if mgrhost != None:
357  print("\033[34m[rtm.py] mgrhost already set as " + str(mgrhost) + "\033[0m")
358  else:
359  mgrhost = nshost
360 
361  if mgrport != None:
362  print("\033[34m[rtm.py] mgrport already set as " + str(mgrport) + "\033[0m")
363  else:
364  try:
365  mgrport = int(mc._argprop.getProperty("corba.master_manager").split(":")[1])
366  if not mgrport:
367  raise
368  except:
369  mgrport = 2810 # default
370  print("\033[34m[rtm.py] Failed to parse corba.master_manager, use " + str(mgrport) + "\033[0m")
371 
372  print("\033[34m[rtm.py] configuration ORB with %s:%s\033[0m"%(nshost, nsport))
373  print("\033[34m[rtm.py] configuration RTCManager with %s:%s\033[0m"%(mgrhost, mgrport))
374  os.environ['ORBInitRef'] = 'NameService=corbaloc:iiop:%s:%s/NameService' % \
375  (nshost, nsport)
376 
377  try:
378  orb = CORBA.ORB_init(sys.argv, CORBA.ORB_ID)
379  nameserver = orb.resolve_initial_references("NameService")
380  rootnc = nameserver._narrow(CosNaming.NamingContext)
381  except omniORB.CORBA.ORB.InvalidName:
382  _, e, _ = sys.exc_info()
383  sys.exit('[ERROR] Invalide Name (hostname=%s).\n' % (nshost) +
384  'Make sure the hostname is correct.\n' + str(e))
385  except omniORB.CORBA.TRANSIENT:
386  try:
387  nameserver = orb.string_to_object('corbaloc:iiop:%s:%s/NameService'%(nshost, nsport))
388  rootnc = nameserver._narrow(CosNaming.NamingContext)
389  except:
390  _, e, _ = sys.exc_info()
391  sys.exit('[ERROR] Connection Failed with the Nameserver (hostname=%s port=%s).\n' % (nshost, nsport) +
392  'Make sure the hostname is correct and the Nameserver is running.\n' + str(e))
393  except Exception:
394  _, e, _ = sys.exc_info()
395  print(str(e))
396 
397  return None
398 
399 
404 def getRootNamingContext(corbaloc):
405  props = System.getProperties()
406 
407  args = ["-ORBInitRef", corbaloc]
408  orb = ORB.init(args, props)
409 
410  nameserver = orb.resolve_initial_references("NameService")
411  return NamingContextHelper.narrow(nameserver)
412 
413 
421 def findObject(name, kind="", rnc=None):
422  nc = CosNaming.NameComponent(name, kind)
423  path = [nc]
424  if not rnc:
425  rnc = rootnc
426  if not rnc:
427  print("[ERROR] findObject(%r,kind=%r,rnc=%r) rootnc is not found" % (name, kind, rnc))
428  return rnc.resolve(path)
429 
430 
437 def findRTCmanager(hostname=None, rnc=None):
438  if not rootnc:
439  print("[ERROR] findRTCmanager(hostname=%r,rnc=%r) rootnc is not defined, need to call initCORBA()" % \
440  (hostname, rnc))
441  if not hostname:
442  hostname = nshost
443  cxt = None
444  if not hostname:
445  hostname = socket.gethostname()
446  try:
447  socket.gethostbyaddr(hostname)
448  except Exception:
449  _, e, _ = sys.exc_info()
450  sys.exit('[ERROR] %s\n' % (str(e)) + '[ERROR] Could not get hostname for %s\n' % (hostname) +
451  '[ERROR] Make sure that you set the target hostname and address in DNS or /etc/hosts in linux/unix.')
452 
453 
454  def getManagerFromNS(hostname, mgr=None):
455  try:
456  obj = findObject("manager", "mgr", findObject(hostname, "host_cxt", rnc))
457  mgr = RTCmanager(obj._narrow(RTM.Manager))
458  except:
459  mgr = None
460  return mgr
461 
462  def getManagerDirectly(hostname, mgr=None):
463  global orb, mgrport
464  corbaloc = "corbaloc:iiop:" + hostname + ":" + str(mgrport) + "/manager"
465  print("\033[34m[rtm.py] trying to findRTCManager on port" + str(mgrport) + "\033[0m")
466  try:
467  obj = orb.string_to_object(corbaloc)
468  mgr = RTCmanager(obj._narrow(RTM.Manager))
469  except:
470  mgr = None
471  return mgr
472 
473  try:
474  import CORBA
475  except:
476  print('import CORBA failed in findRTCmanager and neglect it for old python environment.')
477  # fqdn
478  mgr = None
479  hostnames = [hostname, hostname.split(".")[0],
480  socket.gethostbyaddr(hostname)[0],
481  socket.gethostbyaddr(hostname)[0].split(".")[0]]
482  for h in hostnames:
483  mgr = getManagerDirectly(h) or getManagerFromNS(h)
484  if mgr:
485  return mgr
486  print("Manager not found")
487  return None
488 
489 
490 
497 def findRTC(name, rnc=None):
498  try:
499  obj = findObject(name, "rtc", rnc)
500  try:
501  rtc = RTcomponent(obj._narrow(RTC.RTObject))
502  except TypeError:
503  rtc = RTcomponent(obj._narrow(RTC.DataFlowComponent))
504  cxts = rtc.ref.get_participating_contexts()
505  if len(cxts) > 0:
506  rtc.ec = cxts[0]
507  return rtc
508  except:
509  return None
510 
511 
517 def findPort(rtc, name):
518  ports = rtc.get_ports()
519  cprof = rtc.get_component_profile()
520  portname = cprof.instance_name + "." + name
521  for p in ports:
522  prof = p.get_port_profile()
523  if prof.name == portname:
524  return p
525  return None
526 
527 
533 def serializeComponents(rtcs, stopEC=True):
534  if len(rtcs) < 2:
535  return
536  ec = rtcs[0].ec
537  for rtc in rtcs[1:]:
538  try:
539  if not ec._is_equivalent(rtc.ec):
540  if stopEC:
541  rtc.ec.stop()
542  if ec.add_component(rtc.ref) == RTC.RTC_OK:
543  rtc.ec = ec
544  else:
545  print('error in add_component()')
546  else:
547  print(rtc.name() + 'is already serialized')
548  except Exception:
549  _, e, _ = sys.exc_info()
550  print("[rtm.py] \033[31m error in serialize %s of %s %s\033[0m" % (rtc.name(), [[r, r.name()] for r in rtcs], str(e)))
551  raise e
552 
553 
558 def isConnected(outP, inP):
559  op = outP.get_port_profile()
560  for con_prof in op.connector_profiles:
561  ports = con_prof.ports
562  if len(ports) == 2 and outP._is_equivalent(ports[0]) and \
563  inP._is_equivalent(ports[1]):
564  return True
565  return False
566 
567 
573 def disconnectPorts(outP, inP):
574  op = outP.get_port_profile()
575  iname = inP.get_port_profile().name
576  for con_prof in op.connector_profiles:
577  ports = con_prof.ports
578  if len(ports) == 2:
579  pname = ports[1].get_port_profile().name
580  if pname == iname:
581  print('[rtm.py] Disconnect %s - %s' %(op.name, iname))
582  outP.disconnect(con_prof.connector_id)
583  return True
584  return False
585 
586 
591 def dataTypeOfPort(port):
592  prof = port.get_port_profile()
593  prop = prof.properties
594  for p in prop:
595  if p.name == "dataport.data_type":
596  return any.from_any(p.value)
597  return None
598 
599 
608 def connectPorts(outP, inPs, subscription="flush", dataflow="Push", bufferlength=1, rate=1000, pushpolicy="new", interfaceType="corba_cdr"):
609  if not isinstance(inPs, list):
610  inPs = [inPs]
611  if not outP:
612  print('[rtm.py] \033[31m Failed to connect %s to %s(%s)\033[0m' % \
613  (outP, [inP.get_port_profile().name if inP else inP for inP in inPs], inPs))
614  return
615  for inP in inPs:
616  if not inP:
617  print('[rtm.py] \033[31m Failed to connect %s to %s(%s)\033[0m' % \
618  (outP.get_port_profile().name, inP, inPs))
619  continue
620  if isConnected(outP, inP) == True:
621  print('[rtm.py] %s and %s are already connected' % \
622  (outP.get_port_profile().name, inP.get_port_profile().name))
623  continue
624  if dataTypeOfPort(outP) != dataTypeOfPort(inP):
625  print('[rtm.py] \033[31m %s and %s have different data types\033[0m' % \
626  (outP.get_port_profile().name, inP.get_port_profile().name))
627  continue
628  nv1 = SDOPackage.NameValue("dataport.interface_type", any.to_any(interfaceType))
629  nv2 = SDOPackage.NameValue("dataport.dataflow_type", any.to_any(dataflow))
630  nv3 = SDOPackage.NameValue("dataport.subscription_type", any.to_any(subscription))
631  nv4 = SDOPackage.NameValue("dataport.buffer.length", any.to_any(str(bufferlength)))
632  nv5 = SDOPackage.NameValue("dataport.publisher.push_rate", any.to_any(str(rate)))
633  nv6 = SDOPackage.NameValue("dataport.publisher.push_policy", any.to_any(pushpolicy))
634  nv7 = SDOPackage.NameValue("dataport.data_type", any.to_any(dataTypeOfPort(outP)))
635  con_prof = RTC.ConnectorProfile("connector0", "", [outP, inP],
636  [nv1, nv2, nv3, nv4, nv5, nv6, nv7])
637  print('[rtm.py] Connect ' + outP.get_port_profile().name + ' - ' + \
638  inP.get_port_profile().name+' (dataflow_type='+dataflow+', subscription_type='+ subscription+', bufferlength='+str(bufferlength)+', push_rate='+str(rate)+', push_policy='+pushpolicy+')')
639  ret, prof = inP.connect(con_prof)
640  if ret != RTC.RTC_OK:
641  print("failed to connect")
642  continue
643  # confirm connection
644  if isConnected(outP, inP) == False:
645  print("connet() returned RTC_OK, but not connected")
646 
647 
652 def data2cdr(data):
653  return cdrMarshal(any.to_any(data).typecode(), data, True)
654 
655 
660 def classFromString(fullname):
661  component_path = fullname.split('.')
662  package_name = component_path[0]
663  component_path = component_path[1:]
664  attr = None
665  while component_path:
666  class_name = component_path[0]
667  component_path = component_path[1:]
668  if attr:
669  attr = getattr(attr, class_name)
670  else:
671  __import__(str(package_name))
672  attr = getattr(sys.modules[package_name], class_name)
673  return attr
674 
675 
681 def cdr2data(cdr, classname):
682  return cdrUnmarshal(any.to_any(classFromString(classname)).typecode(), cdr, True)
683 
684 
685 
686 connector_list = []
687 
688 
697 def writeDataPort(port, data, tm=1.0, disconnect=True):
698  global connector_list, orb
699 
700  connector_name = "writeDataPort"
701 
702 
703 
704  prof = None
705 
706  for p in connector_list:
707  if p["port"]._is_equivalent(port):
708  if port.get_connector_profile(p["prof"].connector_id).name == connector_name:
709  prof = p["prof"]
710  else:
711  connector_list.remove(p)
712 
713 
714 
715 
716  if prof is None:
717  nv1 = SDOPackage.NameValue("dataport.interface_type", any.to_any("corba_cdr"))
718  nv2 = SDOPackage.NameValue("dataport.dataflow_type", any.to_any("Push"))
719  nv3 = SDOPackage.NameValue("dataport.subscription_type", any.to_any("flush"))
720  con_prof = RTC.ConnectorProfile(connector_name, "", [port], [nv1, nv2, nv3])
721 
722  ret, prof = port.connect(con_prof)
723 
724  if ret != RTC.RTC_OK:
725  print("failed to connect")
726  return None
727  connector_list.append({"port":port,"prof":prof})
728 
729 
730  for p in prof.properties:
731  if p.name == 'dataport.corba_cdr.inport_ior':
732  ior = any.from_any(p.value)
733  obj = orb.string_to_object(ior)
734  inport = obj._narrow(InPortCdr)
735  cdr = data2cdr(data)
736  if inport.put(cdr) != OpenRTM.PORT_OK:
737  print("failed to put")
738  if disconnect:
739  time.sleep(tm)
740  port.disconnect(prof.connector_id)
741  for p in connector_list:
742  if prof.connector_id == p["prof"].connector_id:
743  connector_list.remove(p)
744  else:
745  return prof.connector_id
746  return None
747 
748 
749 
750 
756 def readDataPort(port, timeout=1.0, disconnect=True):
757  global connector_list, orb
758 
759 
760  connector_name = "readDataPort"
761  prof = None
762  for p in connector_list:
763  if p["port"]._is_equivalent(port):
764  if port.get_connector_profile(p["prof"].connector_id).name == connector_name:
765  prof = p["prof"]
766  else:
767  connector_list.remove(p)
768 
769 
770 
771 
772 
773 
774  pprof = port.get_port_profile()
775  for prop in pprof.properties:
776  if prop.name == "dataport.data_type":
777  classname = any.from_any(prop.value)
778 
779  if prof is None:
780 
781  nv1 = SDOPackage.NameValue("dataport.interface_type", any.to_any("corba_cdr"))
782  nv2 = SDOPackage.NameValue("dataport.dataflow_type", any.to_any("Pull"))
783  nv3 = SDOPackage.NameValue("dataport.subscription_type", any.to_any("flush"))
784  con_prof = RTC.ConnectorProfile(connector_name, "", [port], [nv1, nv2, nv3])
785 
786  ret, prof = port.connect(con_prof)
787 
788  if ret != RTC.RTC_OK:
789  print("failed to connect")
790  return None
791 
792  connector_list.append({"port":port,"prof":prof})
793 
794  for p in prof.properties:
795  if p.name == 'dataport.corba_cdr.outport_ior':
796  ior = any.from_any(p.value)
797  obj = orb.string_to_object(ior)
798  outport = obj._narrow(OutPortCdr)
799  tm = 0
800  while tm < timeout:
801  try:
802  ret, data = outport.get()
803  if ret == OpenRTM.PORT_OK:
804  if disconnect:
805  port.disconnect(prof.connector_id)
806  for p in connector_list:
807  if prof.connector_id == p["prof"].connector_id:
808  connector_list.remove(p)
809 
810  tokens = classname.split(':')
811  if len(tokens) == 3: # for 1.1?
812  classname = tokens[1].replace('/', '.')
813  return cdr2data(data, classname)
814  except:
815  pass
816  time.sleep(0.1)
817  tm = tm + 0.1
818 
819 
820 
821  return None
822 
823 
827  global connector_list
828  for port in connector_list:
829  port["port"].disconnect(port["prof"].connector_id)
830  del connector_list[:]
831 
832 
840 def findService(rtc, port_name, type_name, instance_name):
841  if port_name == "":
842  prof = rtc.ref.get_component_profile()
843  # print("RTC name:",prof.instance_name)
844  port_prof = prof.port_profiles
845  else:
846  p = rtc.port(port_name)
847  if p == None:
848  print("can't find a port named" + port_name)
849  return None
850  else:
851  port_prof = [p.get_port_profile()]
852  port = None
853  for pp in port_prof:
854  # print("name:", pp.name)
855  ifs = pp.interfaces
856  for aif in ifs:
857  #print("IF name:", aif.instance_name)
858  #print("IF type:", aif.type_name)
859  if aif.instance_name == instance_name and \
860  (type_name == "" or aif.type_name == type_name) and \
861  aif.polarity == PROVIDED:
862  port = pp.port_ref
863  if port == None:
864  print("can't find a service named", instance_name)
865  return None
866  con_prof = RTC.ConnectorProfile("noname", "", [port], [])
867  ret, con_prof = port.connect(con_prof)
868  ior = any.from_any(con_prof.properties[0].value)
869  return orb.string_to_object(ior)
870 
871 
877 def setConfiguration(rtc, nvlist):
878  ret = True
879  cfg = rtc.get_configuration()
880  cfgsets = cfg.get_configuration_sets()
881  if len(cfgsets) == 0:
882  print("configuration set is not found")
883  return
884  cfgset = cfgsets[0]
885  for nv in nvlist:
886  name = nv[0]
887  value = nv[1]
888  found = False
889  for d in cfgset.configuration_data:
890  if d.name == name:
891  d.value = any.to_any(value)
892  cfg.set_configuration_set_values(cfgset)
893  found = True
894  break
895  if not found:
896  ret = False
897  cfg.activate_configuration_set('default')
898  return ret
899 
900 
906  cfg = rtc.get_configuration()
907  cfgsets = cfg.get_configuration_sets()
908  if len(cfgsets) == 0:
909  print("configuration set is not found")
910  return None
911  ret = {}
912  for nv in cfgsets[0].configuration_data:
913  ret[nv.name] = any.from_any(nv.value)
914  return ret
915 
916 
917 
923 def narrow(ior, klass, package="OpenHRP"):
924  return ior._narrow(getattr(sys.modules[package], klass))
925 
926 
930 def isJython():
931  return sys.version.count("GCC") == 0
932 
933 
934 if __name__ == '__main__':
935  initCORBA()
def getRootNamingContext(corbaloc)
get root naming context
Definition: python/rtm.py:404
def isActive(self, ec=None)
check the main execution context is active or not
Definition: python/rtm.py:190
def findService(rtc, port_name, type_name, instance_name)
get a service of RT component
Definition: python/rtm.py:840
def getProperties(self)
get name-value list of the default configuration set
Definition: python/rtm.py:87
def readDataPort(port, timeout=1.0, disconnect=True)
read data from a data port
Definition: python/rtm.py:756
wrapper class of RT component
Definition: python/rtm.py:27
def disconnectPorts(outP, inP)
disconnect ports
Definition: python/rtm.py:573
def narrow(ior, klass, package="OpenHRP")
narrow ior
Definition: python/rtm.py:923
def setConfiguration(rtc, nvlist)
update default configuration set
Definition: python/rtm.py:877
def start(self, ec=None, timeout=3.0)
activate this component
Definition: python/rtm.py:113
def load(self, basename, initfunc="")
load RT component factory
Definition: python/rtm.py:230
def classFromString(fullname)
get class object from class name
Definition: python/rtm.py:660
def delete(self, name)
create an instance of RT component
Definition: python/rtm.py:264
def stop(self, ec=None, timeout=3.0)
deactivate this component
Definition: python/rtm.py:140
def getLifeCycleState(self, ec=None)
get life cycle state of the main execution context
Definition: python/rtm.py:177
def writeDataPort(port, data, tm=1.0, disconnect=True)
write data to a data port
Definition: python/rtm.py:697
def findRTCmanager(hostname=None, rnc=None)
get RTCmanager
Definition: python/rtm.py:437
def port(self, name)
get IOR of port
Definition: python/rtm.py:48
def get_factory_names(self)
get list of factory names
Definition: python/rtm.py:275
def getConfiguration(rtc)
get default configuration set
Definition: python/rtm.py:905
def findObject(name, kind="", rnc=None)
get IOR of the object
Definition: python/rtm.py:421
def isConnected(outP, inP)
check two ports are connected or not
Definition: python/rtm.py:558
def unbindObject(name, kind)
unbind an object reference
Definition: python/rtm.py:306
def create(self, module, name=None)
create an instance of RT component
Definition: python/rtm.py:245
wrapper class of RTCmanager
Definition: python/rtm.py:211
def cdr2data(cdr, classname)
convert data from CDR format
Definition: python/rtm.py:681
def reset(self, ec=None, timeout=3.0)
reset this component
Definition: python/rtm.py:166
def initCORBA()
initialize ORB
Definition: python/rtm.py:315
def get_components(self)
get list of components
Definition: python/rtm.py:287
def deleteAllConnector()
Definition: python/rtm.py:826
def findPort(rtc, name)
get a port of RT component
Definition: python/rtm.py:517
def __init__(self, ref)
constructor
Definition: python/rtm.py:216
def findRTC(name, rnc=None)
get RT component
Definition: python/rtm.py:497
def data2cdr(data)
convert data into CDR format
Definition: python/rtm.py:652
def setConfiguration(self, nvlist)
update default configuration set
Definition: python/rtm.py:71
def name(self)
get instance name
Definition: python/rtm.py:204
def isInactive(self, ec=None)
check the main execution context is inactive or not
Definition: python/rtm.py:198
def restart(self)
restart Manager
Definition: python/rtm.py:297
def serializeComponents(rtcs, stopEC=True)
set up execution context of the first RTC so that RTCs are executed sequentially
Definition: python/rtm.py:533
def connectPorts(outP, inPs, subscription="flush", dataflow="Push", bufferlength=1, rate=1000, pushpolicy="new", interfaceType="corba_cdr")
connect ports
Definition: python/rtm.py:608
def isJython()
check if jython or python
Definition: python/rtm.py:930
def dataTypeOfPort(port)
get data type of a port
Definition: python/rtm.py:591
def setProperty(self, name, value)
update value of the default configuration set
Definition: python/rtm.py:80
def service(self, instance_name, type_name="", port_name="")
get IOR of the service
Definition: python/rtm.py:63
def __init__(self, ref)
constructor
Definition: python/rtm.py:32
def getProperty(self, name)
get value of the property in the default configuration set
Definition: python/rtm.py:95


hrpsys
Author(s): AIST, Fumio Kanehiro
autogenerated on Thu May 6 2021 02:41:51