00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 import omniORB.any
00019 from omniORB import CORBA
00020 import threading
00021
00022 import sys
00023 sys.path.insert(1,"../")
00024 sys.path.insert(1,"../RTM_IDL")
00025
00026 import unittest
00027 import OpenRTM_aist
00028 import SDOPackage, SDOPackage__POA
00029
00030 from SdoOrganization import *
00031
00032
00033 class TestComp(OpenRTM_aist.DataFlowComponentBase):
00034 def __init__(self, manager):
00035 OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
00036
00037 def onInitialize(self):
00038 print "onInitialize"
00039 return RTC.RTC_OK
00040
00041 def onFinalize(self):
00042 print "onFinalize"
00043 return RTC.RTC_OK
00044
00045 def onStartup(self, ec_id):
00046 print "onStartup"
00047 return RTC.RTC_OK
00048
00049 def onShutdown(self, ec_id):
00050 print "onSutdown"
00051 return RTC.RTC_OK
00052
00053 def onActivated(self, ec_id):
00054 print "onActivated"
00055 return RTC.RTC_OK
00056
00057 def onDeactivated(self, ec_id):
00058 print "onDeactivated"
00059 return RTC.RTC_OK
00060
00061 def onExecute(self, ec_id):
00062 print "onExecute"
00063 return RTC.RTC_OK
00064
00065 def onAborting(self, ec_id):
00066 print "onAborting"
00067 return RTC.RTC_OK
00068
00069 def onReset(self, ec_id):
00070 print "onReset"
00071 return RTC.RTC_OK
00072
00073 def onStateUpdate(self, ec_id):
00074 print "onStateUpdate"
00075 return RTC.RTC_OK
00076
00077 def onRateChanged(self, ec_id):
00078 print "onRateChanged"
00079 return RTC.RTC_OK
00080
00081
00082 class TestOrganization_impl(unittest.TestCase):
00083 def setUp(self):
00084 rtc = TestComp(OpenRTM_aist.Manager.instance())
00085 self.org = Organization_impl(rtc.getObjRef())
00086
00087 def tearDown(self):
00088 OpenRTM_aist.Manager.instance().shutdownManager()
00089 return
00090
00091 def test_get_organization_id(self):
00092 print self.org.get_organization_id()
00093
00094 def test_get_organization_property(self):
00095 nv_list = [SDOPackage.NameValue("test", 100)]
00096 self.org.add_organization_property(SDOPackage.OrganizationProperty(nv_list))
00097 self.assertEqual(self.org.get_organization_property().properties[0].name, "test")
00098 self.org.remove_organization_property("test")
00099 ret_val = self.org.get_organization_property()
00100 self.assertEqual(len(ret_val.properties), 0)
00101
00102 self.assertRaises(SDOPackage.InvalidParameter, self.org.remove_organization_property, None )
00103
00104 def test_get_organization_property_value(self):
00105 self.org.set_organization_property_value("test", 100)
00106 self.assertRaises(SDOPackage.InvalidParameter, self.org.set_organization_property_value, None, None )
00107
00108 val = omniORB.any.from_any(self.org.get_organization_property_value("test"))
00109 self.assertEqual(val, 100)
00110
00111
00112
00113 self.assertRaises(SDOPackage.InvalidParameter, self.org.get_organization_property_value, "aaa" )
00114 self.assertRaises(SDOPackage.InvalidParameter, self.org.get_organization_property_value, None )
00115
00116 self.assertRaises(SDOPackage.InvalidParameter, self.org.set_organization_property_value, None, "aaa" )
00117
00118
00119 def test_get_owner(self):
00120 rtc = TestComp(OpenRTM_aist.Manager.instance())
00121 self.org.set_owner(rtc.getObjRef())
00122 self.assertNotEqual(self.org.get_owner(),None)
00123 self.assertRaises(SDOPackage.InvalidParameter, self.org.set_owner, None )
00124
00125 class sdo_test(OpenRTM_aist.RTObject_impl):
00126 def __init__(self):
00127 self._orb = CORBA.ORB_init()
00128 self._poa = self._orb.resolve_initial_references("RootPOA")
00129 OpenRTM_aist.RTObject_impl.__init__(self, orb=self._orb, poa=self._poa)
00130
00131
00132 def test_get_members(self):
00133
00134
00135 self.assertEqual(self.org.get_members(),[])
00136 member = self.sdo_test()
00137 member.setInstanceName("test0")
00138 self.org.set_members([member])
00139 member = self.sdo_test()
00140 member.setInstanceName("test1")
00141 self.org.add_members([member])
00142 self.assertEqual(len(self.org.get_members()),2)
00143 self.org.remove_member("test1")
00144 self.assertEqual(len(self.org.get_members()),1)
00145
00146 self.assertRaises(SDOPackage.InvalidParameter, self.org.set_members, None )
00147 self.assertRaises(SDOPackage.InvalidParameter, self.org.add_members, None )
00148 self.assertRaises(SDOPackage.InvalidParameter, self.org.remove_member, None )
00149
00150
00151 def test_get_dependency(self):
00152 self.org.set_dependency(SDOPackage.OWN)
00153 self.assertEqual(self.org.get_dependency(), SDOPackage.OWN)
00154
00155 self.assertRaises(SDOPackage.InvalidParameter, self.org.set_dependency, None )
00156
00157
00158
00159 if __name__ == '__main__':
00160 unittest.main()