Go to the documentation of this file.00001
00002
00003
00004
00005 """
00006 \file AutoTestOut.py
00007 \brief ModuleDescription
00008 \date $Date$
00009
00010
00011 """
00012 import sys
00013 import string
00014 import time
00015 sys.path.append(".")
00016
00017
00018 import RTC
00019 import OpenRTM_aist
00020
00021
00022
00023
00024
00025
00026
00027
00028 import AutoTest, AutoTest__POA
00029 from omniORB import CORBA
00030
00031
00032
00033
00034
00035
00036
00037
00038 AutoTestOut_spec = ["implementation_id", "AutoTestOut",
00039 "type_name", "AutoTestOut",
00040 "description", "ModuleDescription",
00041 "version", "1.0.0",
00042 "vendor", "HarumiMiyamoto",
00043 "category", "example",
00044 "activity_type", "STATIC",
00045 "max_instance", "1",
00046 "language", "Python",
00047 "lang_type", "SCRIPT",
00048 "exec_cxt.periodic.rate", "1.0",
00049 ""]
00050
00051
00052 class AutoTestOut(OpenRTM_aist.DataFlowComponentBase):
00053
00054 """
00055 \class AutoTestOut
00056 \brief ModuleDescription
00057 """
00058 def __init__(self, manager):
00059 OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
00060 return
00061
00062 def onInitialize(self):
00063 self._d_dp_vOut = RTC.TimedFloat(RTC.Time(0,0),0)
00064 self._d_dp_vSeqOut = RTC.TimedFloatSeq(RTC.Time(0,0),[])
00065
00066 self._OutOut = OpenRTM_aist.OutPort("out", self._d_dp_vOut, OpenRTM_aist.RingBuffer(8))
00067 self._SeqOutOut = OpenRTM_aist.OutPort("seqout", self._d_dp_vSeqOut, OpenRTM_aist.RingBuffer(8))
00068
00069
00070 self.addOutPort("out", self._OutOut)
00071 self.addOutPort("seqout", self._SeqOutOut)
00072
00073 self._MyServicePort = OpenRTM_aist.CorbaPort("MyService")
00074 self._myservice0_var = OpenRTM_aist.CorbaConsumer(interfaceType=AutoTest.MyService)
00075
00076
00077 self._MyServicePort.registerConsumer("myservice0", "MyService", self._myservice0_var)
00078
00079
00080 self.addPort(self._MyServicePort)
00081
00082 return RTC.RTC_OK
00083
00084 def onActivated(self, ec_id):
00085 try:
00086 self._file=open('original-data')
00087 except:
00088 print "Can not open 'original-data' file."
00089 return RTC.RTC_ERROR
00090
00091 return RTC.RTC_OK
00092
00093 def onDeactivated(self, ec_id):
00094 self._file.close()
00095 return RTC.RTC_OK
00096
00097
00098 def onExecute(self, ec_id):
00099 floatSeq=[]
00100 str1 = self._file.readline()
00101 str2 = self._file.readline()
00102 str3 = self._file.readline()
00103
00104 if str1:
00105 self._d_dp_vOut.data = float(str1)
00106
00107 if str2:
00108 str2 = str2.split(' ')
00109 floatSeq.append(float(str2[0]))
00110 floatSeq.append(float(str2[1]))
00111 floatSeq.append(float(str2[2]))
00112 floatSeq.append(float(str2[3]))
00113 floatSeq.append(float(str2[4]))
00114 self._d_dp_vSeqOut.data = floatSeq
00115
00116 if str3:
00117 if self._myservice0_var._ptr():
00118
00119 self._OutOut.write()
00120 self._SeqOutOut.write()
00121
00122
00123 retmsg = self._myservice0_var._ptr().echo(str3.rstrip('\r\n'))
00124
00125 return RTC.RTC_OK
00126
00127
00128 def AutoTestOutInit(manager):
00129 profile = OpenRTM_aist.Properties(defaults_str=AutoTestOut_spec)
00130 manager.registerFactory(profile,
00131 AutoTestOut,
00132 OpenRTM_aist.Delete)
00133
00134 def MyModuleInit(manager):
00135 AutoTestOutInit(manager)
00136
00137
00138 comp = manager.createComponent("AutoTestOut")
00139
00140
00141
00142 def main():
00143 mgr = OpenRTM_aist.Manager.init(len(sys.argv), sys.argv)
00144 mgr.setModuleInitProc(MyModuleInit)
00145 mgr.activateManager()
00146 mgr.runManager()
00147
00148 if __name__ == "__main__":
00149
00150 main()
00151