AutoTestIn.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # -*- coding:utf-8 -*-
00003 
00004 ##
00005 # @file AutoTestIn.py
00006 # @brief ModuleDescription
00007 # @date $Date$
00008 #
00009 
00010 import sys
00011 import string
00012 import time
00013 sys.path.append(".")
00014 
00015 # Import RTM module
00016 import RTC
00017 import OpenRTM_aist
00018 import math
00019 import AutoTest, AutoTest__POA
00020 import os.path
00021 
00022 # </rtc-template>
00023 
00024 # Import Service stub modules
00025 # <rtc-template block="consumer_import">
00026 # </rtc-template>
00027 
00028 
00029 # This module's spesification
00030 # <rtc-template block="module_spec">
00031 AutoTestIn_spec = ["implementation_id", "AutoTestIn", 
00032                    "type_name",         "AutoTestIn", 
00033                    "description",       "ModuleDescription", 
00034                    "version",           "1.0.0", 
00035                    "vendor",            "HarumiMiyamoto", 
00036                    "category",          "example", 
00037                    "activity_type",     "STATIC", 
00038                    "max_instance",      "1", 
00039                    "language",          "Python", 
00040                    "lang_type",         "SCRIPT",
00041                    "exec_cxt.periodic.rate", "1.0",
00042                    ""]
00043 # </rtc-template>
00044 
00045 
00046 # Class implementing IDL interface MyService(MyService.idl)
00047 class MyServiceSVC_impl(AutoTest__POA.MyService):
00048   def __init__(self):
00049     self._echoList = []
00050     self._valueList = []
00051     self._value = 0
00052     self.__echo_msg= ""
00053     self._isNew = False
00054 
00055   def __del__(self):
00056     pass
00057 
00058   def echo(self, msg):
00059     OpenRTM_aist.CORBA_SeqUtil.push_back(self._echoList, msg)
00060     self.__echo_msg = msg
00061     if self._isNew:
00062       #print "echo's message was overwritten !!!"
00063       pass
00064     self._isNew = True
00065     return msg
00066 
00067   def get_echo(self):
00068     if self._isNew:
00069       self._isNew = False
00070       echomsg = self.__echo_msg
00071       return echomsg
00072 
00073     return ""
00074 
00075   def reset_message(self):
00076     self._isNew = False
00077     self.__echo_msg = ""
00078 
00079 class AutoTestIn(OpenRTM_aist.DataFlowComponentBase):
00080   
00081   """
00082   \class AutoTestIn
00083   \brief ModuleDescription
00084   
00085   """
00086   def __init__(self, manager):
00087     """
00088     \brief constructor
00089     \param manager Maneger Object
00090     """
00091     OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
00092     self._cnt=0
00093     self._flag=0
00094 
00095   def onInitialize(self):
00096     self._d_dp_vIn = RTC.TimedFloat(RTC.Time(0,0),0)
00097     self._d_dp_vSeqIn = RTC.TimedFloatSeq(RTC.Time(0,0),[])
00098 
00099     self._InIn = OpenRTM_aist.InPort("in", self._d_dp_vIn, OpenRTM_aist.RingBuffer(8))
00100     self._SeqInIn = OpenRTM_aist.InPort("seqin", self._d_dp_vSeqIn, OpenRTM_aist.RingBuffer(8))
00101     
00102 
00103     # Set InPort buffers
00104     self.addInPort("in",self._InIn)
00105     self.addInPort("seqin",self._SeqInIn)
00106     
00107     # Set OutPort buffers
00108     self._MyServicePort = OpenRTM_aist.CorbaPort("MyService")
00109     #self._myservice0_var = MyService_i()
00110     
00111     # initialization of Provider
00112     self._myservice0_var = MyServiceSVC_impl()
00113 
00114     # Set service provider to Ports
00115     self._MyServicePort.registerProvider("myservice0", "MyService", self._myservice0_var)
00116     
00117     # Set CORBA Service Ports
00118     self.addPort(self._MyServicePort)
00119 
00120     return RTC.RTC_OK
00121 
00122   
00123   def onActivated(self, ec_id):
00124     self._file=open('received-data','w')
00125     self._msg = ""
00126     return RTC.RTC_OK
00127   
00128   def onDeactivated(self, ec_id): 
00129     self._file.close()
00130     self._myservice0_var.reset_message()
00131     return RTC.RTC_OK
00132   
00133   def onExecute(self, ec_id):
00134     if not self._msg:
00135       self._msg = self._myservice0_var.get_echo()
00136       if self._msg:
00137         self._msg += "\n"
00138 
00139     if self._InIn.isNew() and self._SeqInIn.isNew() and self._msg:
00140       floatdata = self._InIn.read()
00141       fdata = "%.6f\n" % floatdata.data
00142       print "fdata:", fdata
00143       self._file.write(fdata)
00144       
00145       seqfloatdata = self._SeqInIn.read()
00146       
00147       t_sdata = tuple(d for d in seqfloatdata.data)
00148       sdata = "%.6f %.6f %.6f %.6f %.6f\n" % t_sdata
00149 
00150       print "sdata: ", sdata
00151       self._file.write(sdata)
00152       
00153       self._file.write(self._msg)
00154       self._msg = ""
00155 
00156       return RTC.RTC_OK
00157     else:
00158       return RTC.RTC_OK
00159   
00160 
00161 
00162 
00163 def AutoTestInInit(manager):
00164   profile = OpenRTM_aist.Properties(defaults_str=AutoTestIn_spec)
00165   manager.registerFactory(profile,
00166                           AutoTestIn,
00167                           OpenRTM_aist.Delete)
00168 
00169 def MyModuleInit(manager):
00170   AutoTestInInit(manager)
00171 
00172   # Create a component
00173   comp = manager.createComponent("AutoTestIn")
00174 
00175 
00176 
00177 def main():
00178   mgr = OpenRTM_aist.Manager.init(len(sys.argv), sys.argv)
00179   mgr.setModuleInitProc(MyModuleInit)
00180   mgr.activateManager()
00181   mgr.runManager()
00182 
00183 if __name__ == "__main__":
00184   main()
00185 


openrtm_aist_python
Author(s): Shinji Kurihara
autogenerated on Thu Aug 27 2015 14:17:28