19 sys.path.insert(1,
"../")
20 sys.path.insert(1,
"../RTM_IDL")
26 from omniORB
import CORBA, PortableServer
27 from omniORB
import any
30 configsample_spec = [
"implementation_id",
"TestComp",
31 "type_name",
"TestComp",
32 "description",
"Test example component",
34 "vendor",
"Shinji Kurihara, AIST",
35 "category",
"example",
36 "activity_type",
"DataFlowComponent",
39 "lang_type",
"script",
41 "conf.default.int_param0",
"0",
42 "conf.default.int_param1",
"1",
43 "conf.default.double_param0",
"0.11",
44 "conf.default.double_param1",
"9.9",
45 "conf.default.str_param0",
"hoge",
46 "conf.default.str_param1",
"dara",
47 "conf.default.vector_param0",
"0.0,1.0,2.0,3.0,4.0",
54 OpenRTM_aist.RTObject_impl.__init__(self, orb=orb_, poa=poa_)
105 manager.registerFactory(profile,
108 com = manager.createComponent(
"TestComp")
120 def init(self, rtobj, profile):
122 self._profile = profile
135 class TestRTObject_impl(unittest.TestCase):
137 self.
_orb = CORBA.ORB_init(sys.argv)
138 self.
_poa = self.
_orb.resolve_initial_references(
"RootPOA")
139 self.
_poa._get_the_POAManager().activate()
147 OpenRTM_aist.Manager.instance().shutdownManager()
153 ec = rtobj.getObjRef().get_context(0)
154 self.assertEqual(ec,
None)
155 ec_args =
"PeriodicExecutionContext"+
"?" +
"rate=1000" 156 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
157 ec.bindComponent(rtobj)
158 self.assertNotEqual(rtobj.getObjRef().get_owned_contexts(),[])
159 self.assertEqual(rtobj.is_alive(ec.getObjRef()),
True)
160 ec.remove_component(rtobj.getObjRef())
168 self.assertEqual(rtobj.getObjRef().get_owned_contexts(),[])
169 ec_args =
"PeriodicExecutionContext"+
"?" +
"rate=1000" 170 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
171 ec.bindComponent(rtobj)
172 self.assertNotEqual(rtobj.getObjRef().get_owned_contexts(),[])
173 ec.remove_component(rtobj.getObjRef())
181 self.assertEqual(rtobj.getObjRef().get_participating_contexts(),[])
186 print rtobj.getObjRef().get_context(0)
191 rtobj.setInstanceName(
"TestComp0")
192 prof = rtobj.getObjRef().get_component_profile()
193 self.assertEqual(prof.instance_name,
"TestComp0")
198 self.assertEqual(rtobj.getObjRef().get_ports(), [])
205 id = rtobj.getObjRef().attach_context(ec.getObjRef())
206 print "attach_context: ", id
207 print rtobj.getObjRef().detach_context(id)
208 poa = OpenRTM_aist.Manager.instance().getPOA()
209 poa.deactivate_object(poa.servant_to_id(ec))
214 self.assertEqual(rtobj.getObjRef().get_owned_organizations(),[])
219 rtobj.setInstanceName(
"TestComp0")
220 self.assertEqual(rtobj.getObjRef().get_sdo_id(),
"TestComp0")
226 rtobj.setProperties(prop)
227 self.assertEqual(rtobj.getObjRef().get_sdo_type(),
"Test example component")
232 prof = rtobj.getObjRef().get_device_profile()
233 self.assertEqual(prof.device_type,
"")
238 self.assertEqual(rtobj.getObjRef().get_service_profiles(),[])
252 rtobj =
TestComp(self._orb, self._poa)
253 print rtobj.getObjRef().get_configuration()
261 rtobj = TestComp(self._orb, self._poa)
262 self.assertEqual(rtobj.getObjRef().get_organizations(), [])
267 self.assertEqual(rtobj.getObjRef().get_status_list(), [])
275 rtobj = TestComp(self._orb, self._poa)
276 self.assertEqual(rtobj.getInstanceName(),
"")
278 rtobj.setInstanceName(
"TestComp0")
279 rtobj.setProperties(prop)
280 self.assertEqual(rtobj.getInstanceName(),
"TestComp0")
281 self.assertEqual(rtobj.getTypeName(),
"TestComp")
282 self.assertEqual(rtobj.getDescription(),
"Test example component")
283 self.assertEqual(rtobj.getVersion(),
"1.0")
284 self.assertEqual(rtobj.getVendor(),
"Shinji Kurihara, AIST")
285 self.assertEqual(rtobj.getCategory(),
"example")
286 self.assertNotEqual(rtobj.getNamingNames(),[
"TestComp0.rtc"])
291 rtobj.setObjRef(
"test")
292 self.assertEqual(rtobj.getObjRef(),
"test")
298 self.assertEqual(rtobj.bindParameter(
"config", conf_, 0),
True)
299 rtobj.updateParameters(
"")
306 rtobj.registerOutPort(
"out",outp)
310 rtobj.registerInPort(
"in",inp)
312 rtobj.deletePort(outp)
313 rtobj.deletePort(inp)
315 rtobj.finalizePorts()
321 ec_args =
"PeriodicExecutionContext"+
"?" +
"rate=1000" 322 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
323 ec.bindComponent(rtobj)
324 self.assertNotEqual(rtobj.getExecutionContext(0),
None)
329 ec_args =
"PeriodicExecutionContext"+
"?" +
"rate=1000" 330 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
331 ec.bindComponent(rtobj)
332 self.assertEqual(rtobj.getExecutionRate(0),1000.0)
337 ec_args =
"PeriodicExecutionContext"+
"?" +
"rate=1000" 338 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
339 ec.bindComponent(rtobj)
340 self.assertEqual(rtobj.setExecutionRate(0,10000),RTC.RTC_OK)
341 self.assertEqual(rtobj.getExecutionRate(0),10000.0)
346 ec_args =
"PeriodicExecutionContext"+
"?" +
"rate=1000" 347 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
348 ec.bindComponent(rtobj)
349 self.assertEqual(rtobj.isOwnExecutionContext(0),
True)
354 ec_args =
"PeriodicExecutionContext"+
"?" +
"rate=1000" 355 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
357 ec.bindComponent(rtobj)
358 self.assertEqual(rtobj.activate(0),RTC.RTC_OK)
361 ret = rtobj.deactivate(0)
363 self.assertEqual(ret,RTC.RTC_OK)
369 ec_args =
"PeriodicExecutionContext"+
"?" +
"rate=1000" 370 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
371 ec.bindComponent(rtobj)
372 self.assertEqual(rtobj.activate(0),RTC.RTC_OK)
374 ec._comps[0]._sm._sm.goTo(RTC.ERROR_STATE)
376 self.assertEqual(rtobj.reset(0),RTC.RTC_OK)
382 prof = SDOPackage.ServiceProfile(
"id",
"interface_type",
384 SDOPackage.SDOService._nil)
386 prov.init(rtobj,prof)
387 self.assertEqual(rtobj.addSdoServiceProvider(prof, prov),
True)
388 self.assertEqual(rtobj.removeSdoServiceProvider(
"id"),
True)
392 import MySdoServiceConsumer
393 OpenRTM_aist.Manager.instance().load(
"MySdoServiceConsumer.py",
394 "MySdoServiceConsumerInit")
396 prof = SDOPackage.ServiceProfile(OpenRTM_aist.toTypename(OpenRTM.ComponentObserver),OpenRTM_aist.toTypename(OpenRTM.ComponentObserver),
398 SDOPackage.SDOService._nil)
399 self.assertEqual(rtobj.addSdoServiceConsumer(prof),
True)
400 self.assertEqual(rtobj.removeSdoServiceConsumer(OpenRTM_aist.toTypename(OpenRTM.ComponentObserver)),
True)
404 print "prelistenerFunc called !!!!" 410 rtobj.addPreComponentActionListener(OpenRTM_aist.PreComponentActionListenerType.PRE_ON_INITIALIZE,
413 rtobj.removePreComponentActionListener(OpenRTM_aist.PreComponentActionListenerType.PRE_ON_INITIALIZE,
416 rtobj.addPreComponentActionListener(OpenRTM_aist.PreComponentActionListenerType.PRE_ON_FINALIZE,
419 rtobj.removePreComponentActionListener(OpenRTM_aist.PreComponentActionListenerType.PRE_ON_FINALIZE,
424 print "postlistenerFunc called !!!!" 430 rtobj.addPostComponentActionListener(OpenRTM_aist.PostComponentActionListenerType.POST_ON_FINALIZE,
433 rtobj.removePostComponentActionListener(OpenRTM_aist.PostComponentActionListenerType.POST_ON_FINALIZE,
440 rtobj.addPortActionListener(OpenRTM_aist.PortActionListenerType.ADD_PORT,
443 rtobj.removePortActionListener(OpenRTM_aist.PortActionListenerType.ADD_PORT,
450 rtobj.addExecutionContextActionListener(OpenRTM_aist.ExecutionContextActionListenerType.EC_ATTACHED,
453 rtobj.removeExecutionContextActionListener(OpenRTM_aist.ExecutionContextActionListenerType.EC_ATTACHED,
460 rtobj.addPortConnectListener(OpenRTM_aist.PortConnectListenerType.ON_NOTIFY_CONNECT,
463 rtobj.removePortConnectListener(OpenRTM_aist.PortConnectListenerType.ON_NOTIFY_CONNECT,
468 print "portconretlistenerFunc called !!!!" 474 rtobj.addPortConnectRetListener(OpenRTM_aist.PortConnectRetListenerType.ON_CONNECTED,
477 rtobj.removePortConnectRetListener(OpenRTM_aist.PortConnectRetListenerType.ON_CONNECTED,
482 print "configparamlistenerFunc called !!!!" 488 rtobj.addConfigurationParamListener(OpenRTM_aist.ConfigurationParamListenerType.ON_UPDATE_CONFIG_PARAM,
491 rtobj.removeConfigurationParamListener(OpenRTM_aist.ConfigurationParamListenerType.ON_UPDATE_CONFIG_PARAM,
498 rtobj.addConfigurationSetListener(OpenRTM_aist.ConfigurationSetListenerType.ON_SET_CONFIG_SET,
501 rtobj.removeConfigurationSetListener(OpenRTM_aist.ConfigurationSetListenerType.ON_SET_CONFIG_SET,
508 rtobj.addConfigurationSetNameListener(OpenRTM_aist.ConfigurationSetNameListenerType.ON_UPDATE_CONFIG_SET,
511 rtobj.removeConfigurationSetNameListener(OpenRTM_aist.ConfigurationSetNameListenerType.ON_UPDATE_CONFIG_SET,
519 rtobj = TestComp(self._orb, self._poa)
520 ec_args =
"PeriodicExecutionContext"+
"?" +
"rate=1000" 521 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
522 ec.bindComponent(rtobj)
523 print "preOnInitialize()" 524 rtobj.preOnInitialize(0)
529 ec_args =
"PeriodicExecutionContext"+
"?" +
"rate=1000" 530 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
531 ec.bindComponent(rtobj)
532 rtobj.preOnFinalize(0)
537 ec_args =
"PeriodicExecutionContext"+
"?" +
"rate=1000" 538 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
539 ec.bindComponent(rtobj)
540 rtobj.preOnStartup(0)
545 ec_args =
"PeriodicExecutionContext"+
"?" +
"rate=1000" 546 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
547 ec.bindComponent(rtobj)
548 rtobj.preOnShutdown(0)
553 ec_args =
"PeriodicExecutionContext"+
"?" +
"rate=1000" 554 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
555 ec.bindComponent(rtobj)
556 rtobj.preOnActivated(0)
561 ec_args =
"PeriodicExecutionContext"+
"?" +
"rate=1000" 562 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
563 ec.bindComponent(rtobj)
564 rtobj.preOnDeactivated(0)
569 ec_args =
"PeriodicExecutionContext"+
"?" +
"rate=1000" 570 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
571 ec.bindComponent(rtobj)
572 rtobj.preOnAborting(0)
577 ec_args =
"PeriodicExecutionContext"+
"?" +
"rate=1000" 578 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
579 ec.bindComponent(rtobj)
585 ec_args =
"PeriodicExecutionContext"+
"?" +
"rate=1000" 586 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
587 ec.bindComponent(rtobj)
593 ec_args =
"PeriodicExecutionContext"+
"?" +
"rate=1000" 594 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
595 ec.bindComponent(rtobj)
596 rtobj.preOnExecute(0)
601 ec_args =
"PeriodicExecutionContext"+
"?" +
"rate=1000" 602 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
603 ec.bindComponent(rtobj)
604 rtobj.preOnStateUpdate(0)
609 ec_args =
"PeriodicExecutionContext"+
"?" +
"rate=1000" 610 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
611 ec.bindComponent(rtobj)
612 rtobj.preOnRateChanged(0)
617 ec_args =
"PeriodicExecutionContext"+
"?" +
"rate=1000" 618 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
619 ec.bindComponent(rtobj)
620 rtobj.postOnInitialize(0,
True)
625 ec_args =
"PeriodicExecutionContext"+
"?" +
"rate=1000" 626 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
627 ec.bindComponent(rtobj)
628 rtobj.postOnFinalize(0,
True)
633 ec_args =
"PeriodicExecutionContext"+
"?" +
"rate=1000" 634 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
635 ec.bindComponent(rtobj)
636 rtobj.postOnStartup(0,
True)
641 ec_args =
"PeriodicExecutionContext"+
"?" +
"rate=1000" 642 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
643 ec.bindComponent(rtobj)
644 rtobj.postOnShutdown(0,
True)
649 ec_args =
"PeriodicExecutionContext"+
"?" +
"rate=1000" 650 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
651 ec.bindComponent(rtobj)
652 rtobj.postOnActivated(0,
True)
657 ec_args =
"PeriodicExecutionContext"+
"?" +
"rate=1000" 658 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
659 ec.bindComponent(rtobj)
660 rtobj.postOnDeactivated(0,
True)
665 ec_args =
"PeriodicExecutionContext"+
"?" +
"rate=1000" 666 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
667 ec.bindComponent(rtobj)
668 rtobj.postOnAborting(0,
True)
673 ec_args =
"PeriodicExecutionContext"+
"?" +
"rate=1000" 674 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
675 ec.bindComponent(rtobj)
676 rtobj.postOnError(0,
True)
681 ec_args =
"PeriodicExecutionContext"+
"?" +
"rate=1000" 682 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
683 ec.bindComponent(rtobj)
684 rtobj.postOnReset(0,
True)
689 ec_args =
"PeriodicExecutionContext"+
"?" +
"rate=1000" 690 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
691 ec.bindComponent(rtobj)
692 rtobj.postOnExecute(0,
True)
697 ec_args =
"PeriodicExecutionContext"+
"?" +
"rate=1000" 698 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
699 ec.bindComponent(rtobj)
700 rtobj.postOnStateUpdate(0,
True)
705 ec_args =
"PeriodicExecutionContext"+
"?" +
"rate=1000" 706 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
707 ec.bindComponent(rtobj)
708 rtobj.postOnRateChanged(0,
True)
713 ec_args =
"PeriodicExecutionContext"+
"?" +
"rate=1000" 714 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
715 ec.bindComponent(rtobj)
721 ec_args =
"PeriodicExecutionContext"+
"?" +
"rate=1000" 722 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
723 ec.bindComponent(rtobj)
724 rtobj.onRemovePort(0)
729 ec_args =
"PeriodicExecutionContext"+
"?" +
"rate=1000" 730 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
731 ec.bindComponent(rtobj)
732 rtobj.onAttachExecutionContext(0)
737 ec_args =
"PeriodicExecutionContext"+
"?" +
"rate=1000" 738 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
739 ec.bindComponent(rtobj)
740 rtobj.onDetachExecutionContext(0)
745 if __name__ ==
'__main__':
def test_get_configuration(self)
def test_preOnShutdown(self)
def test_preOnInitialize(self)
def onExecute(self, ec_id)
def test_addRemoveSdoServiceConsumer(self)
def test_onRemovePort(self)
def reinit(self, profile)
def onActivated(self, ec_id)
def test_preOnExecute(self)
def test_addRemovePreComponentActionListener(self)
def test_postOnStateUpdate(self)
def onAborting(self, ec_id)
def test_postOnFinalize(self)
def test_get_service_profile(self)
def test_get_status_list(self)
def test_onAttachExecutionContext(self)
def onShutdown(self, ec_id)
def TestCompInit(manager)
def test_preOnReset(self)
def test_postOnStartup(self)
The Properties class represents a persistent set of properties.
def init(self, rtobj, profile)
def test_getExecutionRate(self)
def test_preOnFinalize(self)
PeriodicExecutionContext class.
def test_get_organizations(self)
def test_addRemoveConfigurationSetNameListener(self)
def test_get_status(self)
def test_get_component_profile(self)
def test_addRemovePortConnectListener(self)
def test_PortTestCase(self)
def test_postOnShutdown(self)
def onDeactivated(self, ec_id)
def test_get_device_profile(self)
def test_addRemovePostComponentActionListener(self)
def test_attach_context(self)
def test_get_sdo_id(self)
def test_get_context(self)
def test_getPropTestCase(self)
def onRateChanged(self, ec_id)
def test_get_owned_contexts(self)
def test_postOnExecute(self)
def test_postOnReset(self)
def postlistenerFunc(self, id, ret)
def test_get_sdo_type(self)
def test_preOnRateChanged(self)
def test_onDetachExecutionContext(self)
def test_isOwnExecutionContext(self)
def test_postOnRateChanged(self)
def test_get_monitoring(self)
def test_get_service_profiles(self)
def __init__(self, orb_, poa_)
def test_getExecutionContext(self)
def test_preOnDeactivated(self)
def test_preOnActivated(self)
def test_postOnAborting(self)
def test_addRemoveConfigurationSetListener(self)
def test_addRemoveSdoServiceProvider(self)
def onStateUpdate(self, ec_id)
def test_get_sdo_service(self)
def test_get_participating_contexts(self)
def test_postOnDeactivated(self)
def test_postOnInitialize(self)
def test_addRemoveConfigurationParamListener(self)
def test_postOnError(self)
def test_preOnError(self)
def portconretlistenerFunc(self, pname, cprof, ret)
def test_addRemovePortConnectRetListener(self)
def test_get_owned_organizations(self)
def test_addRemovePortActionListener(self)
def test_preOnStartup(self)
def test_preOnAborting(self)
def onStartup(self, ec_id)
def test_setExecutionRate(self)
def test_bindParameter(self)
def test_activate_deactivate(self)
def test_postOnActivated(self)
def prelistenerFunc(self, id)
def test_addRemoveExecutionContextActionListener(self)
def newNV(name, value)
Create NameVale.
def configparamlistenerFunc(self, pname, cprof, ret)
def test_preOnStateUpdate(self)