Go to the documentation of this file.00001
00002
00003 PKG = 'hrpsys_ros_bridge'
00004 import roslib; roslib.load_manifest(PKG)
00005
00006 import unittest
00007
00008
00009
00010 import sys, time, signal, os
00011 import RTC
00012 import OpenRTM_aist
00013
00014 global mgr
00015
00016 class TestComp(OpenRTM_aist.DataFlowComponentBase):
00017 def __init__ (self, manager):
00018 OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
00019 return
00020
00021 def onInitialize(self):
00022 port = "Short"
00023 self.inport = OpenRTM_aist.InPort(port, RTC.TimedShort)
00024 self.registerInPort(port, self.inport)
00025 self.data_received = 0
00026 self.data_previous = 0
00027
00028 return RTC.RTC_OK
00029
00030 def onExecute(self, ec_id):
00031 if self.inport.isNew():
00032 data = self.inport.read()
00033 print data
00034 if ( self.data_received > 0 ) :
00035 print "prev data = ", self.data_previous, ", current data = ", data.data
00036 assert data.data == self.data_previous + 1
00037 self.data_previous=data.data
00038 self.data_received += 1
00039 print signal.SIGINT, os.getpid()
00040 if ( self.data_received > 10 ) :
00041 os.kill(os.getpid(), signal.SIGINT)
00042
00043 return RTC.RTC_OK
00044
00045 module_spec = ["implementation_id", "test_seqio",
00046 "type_name", "test_seqio",
00047 "description", "Dataport ROS bridge component",
00048 "version", "1.0",
00049 "vendor", "Kei Okada",
00050 "category", "example",
00051 "activity_type", "DataFlowComponent",
00052 "max_instance", "10",
00053 "language", "Python",
00054 "lang_type", "script",
00055 ""]
00056
00057 def TestInit(manager):
00058 profile = OpenRTM_aist.Properties(defaults_str=module_spec)
00059 print profile
00060 manager.registerFactory(profile,
00061 TestComp,
00062 OpenRTM_aist.Delete)
00063 comp = manager.createComponent("test_seqio")
00064
00065 class TestSeqIO(unittest.TestCase):
00066 def test_SeqIo(self):
00067 mgr = OpenRTM_aist.Manager.init(sys.argv)
00068
00069 mgr.setModuleInitProc(TestInit)
00070 mgr.activateManager()
00071 mgr.runManager()
00072
00073
00074
00075
00076 import rosunit
00077 rosunit.unitrun(PKG, 'test_hronx_ros_bridge', TestSeqIO)
00078
00079
00080
00081
00082
00083