AutoTestIn.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # -*- coding:utf-8 -*-
3 
4 
9 
10 import sys
11 import string
12 import time
13 sys.path.append(".")
14 
15 # Import RTM module
16 import RTC
17 import OpenRTM_aist
18 import math
19 import AutoTest, AutoTest__POA
20 import os.path
21 
22 # </rtc-template>
23 
24 # Import Service stub modules
25 # <rtc-template block="consumer_import">
26 # </rtc-template>
27 
28 
29 # This module's spesification
30 # <rtc-template block="module_spec">
31 AutoTestIn_spec = ["implementation_id", "AutoTestIn",
32  "type_name", "AutoTestIn",
33  "description", "ModuleDescription",
34  "version", "1.0.0",
35  "vendor", "HarumiMiyamoto",
36  "category", "example",
37  "activity_type", "STATIC",
38  "max_instance", "1",
39  "language", "Python",
40  "lang_type", "SCRIPT",
41  "exec_cxt.periodic.rate", "1.0",
42  ""]
43 # </rtc-template>
44 
45 
46 # Class implementing IDL interface MyService(MyService.idl)
47 class MyServiceSVC_impl(AutoTest__POA.MyService):
48  def __init__(self):
49  self._echoList = []
50  self._valueList = []
51  self._value = 0
52  self.__echo_msg= ""
53  self._isNew = False
54 
55  def __del__(self):
56  pass
57 
58  def echo(self, msg):
59  OpenRTM_aist.CORBA_SeqUtil.push_back(self._echoList, msg)
60  self.__echo_msg = msg
61  if self._isNew:
62  #print "echo's message was overwritten !!!"
63  pass
64  self._isNew = True
65  return msg
66 
67  def get_echo(self):
68  if self._isNew:
69  self._isNew = False
70  echomsg = self.__echo_msg
71  return echomsg
72 
73  return ""
74 
75  def reset_message(self):
76  self._isNew = False
77  self.__echo_msg = ""
78 
80 
81  """
82  \class AutoTestIn
83  \brief ModuleDescription
84 
85  """
86  def __init__(self, manager):
87  """
88  \brief constructor
89  \param manager Maneger Object
90  """
91  OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
92  self._cnt=0
93  self._flag=0
94 
95  def onInitialize(self):
96  self._d_dp_vIn = RTC.TimedFloat(RTC.Time(0,0),0)
97  self._d_dp_vSeqIn = RTC.TimedFloatSeq(RTC.Time(0,0),[])
98 
101 
102 
103  # Set InPort buffers
104  self.addInPort("in",self._InIn)
105  self.addInPort("seqin",self._SeqInIn)
106 
107  # Set OutPort buffers
109  #self._myservice0_var = MyService_i()
110 
111  # initialization of Provider
113 
114  # Set service provider to Ports
115  self._MyServicePort.registerProvider("myservice0", "MyService", self._myservice0_var)
116 
117  # Set CORBA Service Ports
118  self.addPort(self._MyServicePort)
119 
120  return RTC.RTC_OK
121 
122 
123  def onActivated(self, ec_id):
124  self._file=open('received-data','w')
125  self._msg = ""
126  return RTC.RTC_OK
127 
128  def onDeactivated(self, ec_id):
129  self._file.close()
130  self._myservice0_var.reset_message()
131  return RTC.RTC_OK
132 
133  def onExecute(self, ec_id):
134  if not self._msg:
135  self._msg = self._myservice0_var.get_echo()
136  if self._msg:
137  self._msg += "\n"
138 
139  if self._InIn.isNew() and self._SeqInIn.isNew() and self._msg:
140  floatdata = self._InIn.read()
141  fdata = "%.6f\n" % floatdata.data
142  print("fdata:", fdata)
143  self._file.write(fdata)
144 
145  seqfloatdata = self._SeqInIn.read()
146 
147  t_sdata = tuple(d for d in seqfloatdata.data)
148  sdata = "%.6f %.6f %.6f %.6f %.6f\n" % t_sdata
149 
150  print("sdata: ", sdata)
151  self._file.write(sdata)
152 
153  self._file.write(self._msg)
154  self._msg = ""
155 
156  return RTC.RTC_OK
157  else:
158  return RTC.RTC_OK
159 
160 
161 
162 
163 def AutoTestInInit(manager):
164  profile = OpenRTM_aist.Properties(defaults_str=AutoTestIn_spec)
165  manager.registerFactory(profile,
166  AutoTestIn,
167  OpenRTM_aist.Delete)
168 
169 def MyModuleInit(manager):
170  AutoTestInInit(manager)
171 
172  # Create a component
173  comp = manager.createComponent("AutoTestIn")
174 
175 
176 
177 def main():
178  mgr = OpenRTM_aist.Manager.init(len(sys.argv), sys.argv)
179  mgr.setModuleInitProc(MyModuleInit)
180  mgr.activateManager()
181  mgr.runManager()
182 
183 if __name__ == "__main__":
184  main()
185 
OpenRTM_aist.examples.AutoTest.AutoTestIn.MyServiceSVC_impl.echo
def echo(self, msg)
Definition: AutoTestIn.py:58
OpenRTM_aist.examples.AutoTest.AutoTestIn.AutoTestIn._msg
_msg
Definition: AutoTestIn.py:125
OpenRTM_aist.examples.AutoTest.AutoTestIn.AutoTestIn._flag
_flag
Definition: AutoTestIn.py:93
OpenRTM_aist.examples.AutoTest.AutoTestIn.AutoTestIn.__init__
def __init__(self, manager)
Definition: AutoTestIn.py:86
OpenRTM_aist.examples.AutoTest.AutoTestIn.AutoTestIn.onExecute
def onExecute(self, ec_id)
Definition: AutoTestIn.py:133
OpenRTM_aist.RTObject.RTObject_impl.addPort
def addPort(self, port)
Definition: RTObject.py:2667
OpenRTM_aist.examples.AutoTest.AutoTestIn.AutoTestIn
Definition: AutoTestIn.py:79
OpenRTM_aist.examples.AutoTest.AutoTestIn.AutoTestIn.onDeactivated
def onDeactivated(self, ec_id)
Definition: AutoTestIn.py:128
OpenRTM_aist.CORBA_SeqUtil.push_back
def push_back(seq, elem)
Push the new element back to the CORBA sequence.
Definition: CORBA_SeqUtil.py:113
OpenRTM_aist.examples.AutoTest.AutoTestIn.AutoTestIn._file
_file
Definition: AutoTestIn.py:124
OpenRTM_aist.examples.AutoTest.AutoTestIn.main
def main()
Definition: AutoTestIn.py:177
OpenRTM_aist.examples.AutoTest.AutoTestIn.MyServiceSVC_impl.__echo_msg
__echo_msg
Definition: AutoTestIn.py:52
OpenRTM_aist.examples.AutoTest.AutoTestIn.AutoTestIn._d_dp_vSeqIn
_d_dp_vSeqIn
Definition: AutoTestIn.py:97
OpenRTM_aist.RingBuffer.RingBuffer
Definition: RingBuffer.py:41
OpenRTM_aist.examples.AutoTest.AutoTestIn.AutoTestIn._SeqInIn
_SeqInIn
Definition: AutoTestIn.py:100
OpenRTM_aist.examples.AutoTest.AutoTestIn.AutoTestIn._InIn
_InIn
Definition: AutoTestIn.py:99
OpenRTM_aist.examples.AutoTest.AutoTestIn.AutoTestInInit
def AutoTestInInit(manager)
Definition: AutoTestIn.py:163
OpenRTM_aist.examples.AutoTest.AutoTestIn.AutoTestIn.onInitialize
def onInitialize(self)
Definition: AutoTestIn.py:95
OpenRTM_aist.examples.AutoTest.AutoTestIn.AutoTestIn.onActivated
def onActivated(self, ec_id)
Definition: AutoTestIn.py:123
OpenRTM_aist.DataFlowComponentBase.DataFlowComponentBase
DataFlowComponentBase class.
Definition: DataFlowComponentBase.py:35
OpenRTM_aist.examples.AutoTest.AutoTestIn.MyServiceSVC_impl
Definition: AutoTestIn.py:47
OpenRTM_aist.examples.AutoTest.AutoTestIn.AutoTestIn._d_dp_vIn
_d_dp_vIn
Definition: AutoTestIn.py:96
OpenRTM_aist.examples.AutoTest.AutoTestIn.MyServiceSVC_impl._echoList
_echoList
Definition: AutoTestIn.py:49
OpenRTM_aist.examples.AutoTest.AutoTestIn.MyServiceSVC_impl._valueList
_valueList
Definition: AutoTestIn.py:50
OpenRTM_aist.examples.AutoTest.AutoTestIn.AutoTestIn._MyServicePort
_MyServicePort
Definition: AutoTestIn.py:108
OpenRTM_aist.InPort.InPort
InPort template class.
Definition: InPort.py:58
OpenRTM_aist.examples.AutoTest.AutoTestIn.AutoTestIn._myservice0_var
_myservice0_var
Definition: AutoTestIn.py:112
OpenRTM_aist.examples.AutoTest.AutoTestIn.MyServiceSVC_impl.reset_message
def reset_message(self)
Definition: AutoTestIn.py:75
OpenRTM_aist.Properties.Properties
Definition: Properties.py:83
OpenRTM_aist.examples.AutoTest.AutoTestIn.MyServiceSVC_impl.__del__
def __del__(self)
Definition: AutoTestIn.py:55
OpenRTM_aist.examples.AutoTest.AutoTestIn.MyServiceSVC_impl._isNew
_isNew
Definition: AutoTestIn.py:53
OpenRTM_aist.examples.AutoTest.AutoTestIn.AutoTestIn._cnt
_cnt
Definition: AutoTestIn.py:92
OpenRTM_aist.RTObject.RTObject_impl.addInPort
def addInPort(self, name, inport)
Definition: RTObject.py:2721
OpenRTM_aist.examples.AutoTest.AutoTestIn.MyModuleInit
def MyModuleInit(manager)
Definition: AutoTestIn.py:169
OpenRTM_aist.examples.AutoTest.AutoTestIn.MyServiceSVC_impl._value
_value
Definition: AutoTestIn.py:51
OpenRTM_aist.examples.AutoTest.AutoTestIn.MyServiceSVC_impl.__init__
def __init__(self)
Definition: AutoTestIn.py:48
OpenRTM_aist.examples.AutoTest.AutoTestIn.MyServiceSVC_impl.get_echo
def get_echo(self)
Definition: AutoTestIn.py:67
OpenRTM_aist.CorbaPort.CorbaPort
RT Conponent CORBA service/consumer Port.
Definition: CorbaPort.py:604


openrtm_aist_python
Author(s): Shinji Kurihara
autogenerated on Mon Apr 21 2025 02:45:06