00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 import sys
00019 sys.path.insert(1,"../")
00020 sys.path.insert(1,"../RTM_IDL")
00021
00022 import OpenRTM_aist
00023 import RTC
00024
00025 import unittest
00026
00027
00028 configsample_spec = ["implementation_id", "TestComp",
00029 "type_name", "TestComp",
00030 "description", "Test example component",
00031 "version", "1.0",
00032 "vendor", "Shinji Kurihara, AIST",
00033 "category", "example",
00034 "activity_type", "DataFlowComponent",
00035 "max_instance", "10",
00036 "language", "Python",
00037 "lang_type", "script",
00038
00039 "conf.default.int_param0", "0",
00040 "conf.default.int_param1", "1",
00041 "conf.default.double_param0", "0.11",
00042 "conf.default.double_param1", "9.9",
00043 "conf.default.str_param0", "hoge",
00044 "conf.default.str_param1", "dara",
00045 "conf.default.vector_param0", "0.0,1.0,2.0,3.0,4.0",
00046 ""]
00047
00048 com = None
00049
00050
00051 class TestComp(OpenRTM_aist.DataFlowComponentBase):
00052
00053 def __init__(self, manager):
00054 OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
00055
00056 def onInitialize(self):
00057 print "onInitialize"
00058 return RTC.RTC_OK
00059
00060 def onFinalize(self):
00061 print "onFinalize"
00062 return RTC.RTC_OK
00063
00064 def onStartup(self, ec_id):
00065 print "onStartup"
00066 return RTC.RTC_OK
00067
00068 def onShutdown(self, ec_id):
00069 print "onSutdown"
00070 return RTC.RTC_OK
00071
00072 def onActivated(self, ec_id):
00073 print "onActivated"
00074 return RTC.RTC_OK
00075
00076 def onDeactivated(self, ec_id):
00077 print "onDeactivated"
00078 return RTC.RTC_OK
00079
00080 def onExecute(self, ec_id):
00081 print "onExecute"
00082 return RTC.RTC_OK
00083
00084 def onAborting(self, ec_id):
00085 print "onAborting"
00086 return RTC.RTC_OK
00087
00088 def onReset(self, ec_id):
00089 print "onReset"
00090 return RTC.RTC_OK
00091
00092 def onStateUpdate(self, ec_id):
00093 print "onStateUpdate"
00094 return RTC.RTC_OK
00095
00096 def onRateChanged(self, ec_id):
00097 print "onRateChanged"
00098 return RTC.RTC_OK
00099
00100
00101 def TestCompInit(manager):
00102 global com
00103 profile = OpenRTM_aist.Properties(defaults_str=configsample_spec)
00104 manager.registerFactory(profile,
00105 TestComp,
00106 OpenRTM_aist.Delete)
00107 com = manager.createComponent("TestComp")
00108
00109
00110 class TestPeriodicECSharedComposite(unittest.TestCase):
00111
00112 def setUp(self):
00113 self.manager = OpenRTM_aist.Manager.init(sys.argv)
00114 self.composite = OpenRTM_aist.PeriodicECSharedComposite(self.manager)
00115 return
00116
00117 def test_organization(self):
00118 global com
00119 self.manager.setModuleInitProc(TestCompInit)
00120 self.manager.activateManager()
00121 organization = OpenRTM_aist.PeriodicECOrganization(self.composite)
00122 self.assertEqual(organization.set_members([com.getObjRef()]),True)
00123
00124 self.assertEqual(organization.remove_member("TestComp0"),True)
00125 self.assertEqual(organization.add_members([com.getObjRef()]),True)
00126 organization.removeAllMembers()
00127 return
00128
00129
00130 def test_onInitialize(self):
00131 self.assertEqual(self.composite.onInitialize(), RTC.RTC_OK)
00132 return
00133
00134
00135 def test_onActivated(self):
00136 self.assertEqual(self.composite.onActivated(None), RTC.RTC_OK)
00137 return
00138
00139
00140 def test_onDeactivated(self):
00141 self.assertEqual(self.composite.onDeactivated(None), RTC.RTC_OK)
00142 return
00143
00144
00145 def test_onReset(self):
00146 self.assertEqual(self.composite.onReset(None), RTC.RTC_OK)
00147 return
00148
00149
00150 def test_delegatePort(self):
00151 return
00152
00153
00154
00155
00156 if __name__ == '__main__':
00157 unittest.main()