AutoTestOut.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # -*- coding: utf-8 -*-
00003 # -*- Python -*-
00004 
00005 """
00006  \file AutoTestOut.py
00007  \brief ModuleDescription
00008  \date $Date$
00009 
00010 
00011 """
00012 import sys
00013 import string
00014 import time
00015 sys.path.append(".")
00016 
00017 # Import RTM module
00018 import RTC
00019 import OpenRTM_aist
00020 
00021 # Import Service implementation class
00022 # <rtc-template block="service_impl">
00023 
00024 # </rtc-template>
00025 
00026 # Import Service stub modules
00027 # <rtc-template block="consumer_import">
00028 import AutoTest, AutoTest__POA
00029 from omniORB import CORBA
00030 
00031 #import csv
00032 
00033 # </rtc-template>
00034 
00035 
00036 # This module's spesification
00037 # <rtc-template block="module_spec">
00038 AutoTestOut_spec = ["implementation_id", "AutoTestOut", 
00039                     "type_name",         "AutoTestOut", 
00040                     "description",       "ModuleDescription", 
00041                     "version",           "1.0.0", 
00042                     "vendor",            "HarumiMiyamoto", 
00043                     "category",          "example", 
00044                     "activity_type",     "STATIC", 
00045                     "max_instance",      "1", 
00046                     "language",          "Python", 
00047                     "lang_type",         "SCRIPT",
00048                     "exec_cxt.periodic.rate", "1.0",
00049                     ""]
00050 
00051 
00052 class AutoTestOut(OpenRTM_aist.DataFlowComponentBase):
00053   
00054   """
00055   \class AutoTestOut
00056   \brief ModuleDescription
00057   """
00058   def __init__(self, manager):
00059     OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
00060     return
00061         
00062   def onInitialize(self):
00063     self._d_dp_vOut = RTC.TimedFloat(RTC.Time(0,0),0)
00064     self._d_dp_vSeqOut = RTC.TimedFloatSeq(RTC.Time(0,0),[])
00065     
00066     self._OutOut = OpenRTM_aist.OutPort("out", self._d_dp_vOut, OpenRTM_aist.RingBuffer(8))
00067     self._SeqOutOut = OpenRTM_aist.OutPort("seqout", self._d_dp_vSeqOut, OpenRTM_aist.RingBuffer(8))
00068   
00069     # Set OutPort buffers
00070     self.addOutPort("out", self._OutOut)
00071     self.addOutPort("seqout", self._SeqOutOut)
00072         
00073     self._MyServicePort = OpenRTM_aist.CorbaPort("MyService")
00074     self._myservice0_var = OpenRTM_aist.CorbaConsumer(interfaceType=AutoTest.MyService)
00075     
00076     # Set service consumers to Ports
00077     self._MyServicePort.registerConsumer("myservice0", "MyService", self._myservice0_var)
00078     
00079     # Set CORBA Service Ports
00080     self.addPort(self._MyServicePort)
00081   
00082     return RTC.RTC_OK
00083   
00084   def onActivated(self, ec_id):
00085     try:
00086       self._file=open('original-data')
00087     except:
00088       print "Can not open 'original-data' file."
00089       return RTC.RTC_ERROR
00090 
00091     return RTC.RTC_OK
00092   
00093   def onDeactivated(self, ec_id): 
00094     self._file.close()
00095     return RTC.RTC_OK
00096   
00097 
00098   def onExecute(self, ec_id):
00099     floatSeq=[]
00100     str1 = self._file.readline()
00101     str2 = self._file.readline()
00102     str3 = self._file.readline()
00103     
00104     if str1:  
00105       self._d_dp_vOut.data = float(str1)
00106 
00107       if str2:
00108         str2 = str2.split(' ')
00109         floatSeq.append(float(str2[0]))
00110         floatSeq.append(float(str2[1]))
00111         floatSeq.append(float(str2[2]))
00112         floatSeq.append(float(str2[3]))
00113         floatSeq.append(float(str2[4]))
00114         self._d_dp_vSeqOut.data = floatSeq
00115 
00116         if str3:
00117           if self._myservice0_var._ptr():
00118             # Write out data
00119             self._OutOut.write()
00120             self._SeqOutOut.write()
00121 
00122             # invoking echo operation
00123             retmsg = self._myservice0_var._ptr().echo(str3.rstrip('\r\n'))
00124           
00125     return RTC.RTC_OK
00126         
00127 
00128 def AutoTestOutInit(manager):
00129   profile = OpenRTM_aist.Properties(defaults_str=AutoTestOut_spec)
00130   manager.registerFactory(profile,
00131                           AutoTestOut,
00132                           OpenRTM_aist.Delete)
00133 
00134 def MyModuleInit(manager):
00135   AutoTestOutInit(manager)
00136 
00137   # Create a component
00138   comp = manager.createComponent("AutoTestOut")
00139 
00140 
00141 
00142 def main():
00143   mgr = OpenRTM_aist.Manager.init(len(sys.argv), sys.argv)
00144   mgr.setModuleInitProc(MyModuleInit)
00145   mgr.activateManager()
00146   mgr.runManager()
00147 
00148 if __name__ == "__main__":
00149   
00150   main()
00151 


openrtm_aist_python
Author(s): Shinji Kurihara
autogenerated on Thu Aug 27 2015 14:17:28