test-seqio.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 
00003 PKG = 'hrpsys_ros_bridge'
00004 import roslib; roslib.load_manifest(PKG)
00005 
00006 import unittest
00007 
00008 
00009 # rtm modules
00010 import sys, time, signal, os
00011 import RTC
00012 import OpenRTM_aist
00013 
00014 global mgr
00015 
00016 class TestComp(OpenRTM_aist.DataFlowComponentBase):
00017     def __init__ (self, manager):
00018         OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
00019         return
00020 
00021     def onInitialize(self):
00022         port = "Short"
00023         self.inport = OpenRTM_aist.InPort(port, RTC.TimedShort)
00024         self.registerInPort(port, self.inport)
00025         self.data_received = 0
00026         self.data_previous = 0
00027 
00028         return RTC.RTC_OK
00029 
00030     def onExecute(self, ec_id):
00031         if self.inport.isNew():
00032             data = self.inport.read()
00033             print data
00034             if ( self.data_received > 0 ) :
00035                 print "prev data = ", self.data_previous, ", current data = ", data.data
00036                 assert data.data == self.data_previous + 1
00037             self.data_previous=data.data
00038             self.data_received += 1
00039             print signal.SIGINT, os.getpid()
00040             if ( self.data_received > 10 ) :
00041                 os.kill(os.getpid(), signal.SIGINT)
00042 
00043         return RTC.RTC_OK
00044 
00045 module_spec = ["implementation_id", "test_seqio",
00046                "type_name",         "test_seqio",
00047                "description",       "Dataport ROS bridge component",
00048                "version",           "1.0",
00049                "vendor",            "Kei Okada",
00050                "category",          "example",
00051                "activity_type",     "DataFlowComponent",
00052                "max_instance",      "10",
00053                "language",          "Python",
00054                "lang_type",         "script",
00055                ""]
00056 
00057 def TestInit(manager):
00058     profile = OpenRTM_aist.Properties(defaults_str=module_spec)
00059     print  profile
00060     manager.registerFactory(profile,
00061                             TestComp,
00062                             OpenRTM_aist.Delete)
00063     comp = manager.createComponent("test_seqio")
00064 
00065 class TestSeqIO(unittest.TestCase):
00066     def test_SeqIo(self):
00067         mgr = OpenRTM_aist.Manager.init(sys.argv)
00068         # signal.signal(signal.SIGINT, handler)
00069         mgr.setModuleInitProc(TestInit)
00070         mgr.activateManager()
00071         mgr.runManager()
00072 
00073 #suite = unittest.TestSuite()
00074 #suite.addTest(unittest.FunctionTestCase(test_seqio))
00075 #unittest.TextTestRunner(verbosity=2).run(suite)
00076 import rosunit
00077 rosunit.unitrun(PKG, 'test_hronx_ros_bridge', TestSeqIO)
00078 
00079 
00080 
00081 
00082 
00083 
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Properties Friends Defines


openrtm
Author(s): AIST, ros package is maintained by Kei Okada
autogenerated on Tue Jul 23 2013 11:48:21