AutoTestIn.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # -*- coding:utf-8 -*-
3 
4 
9 
10 import sys
11 import string
12 import time
13 sys.path.append(".")
14 
15 # Import RTM module
16 import RTC
17 import OpenRTM_aist
18 import math
19 import AutoTest, AutoTest__POA
20 import os.path
21 
22 # </rtc-template>
23 
24 # Import Service stub modules
25 # <rtc-template block="consumer_import">
26 # </rtc-template>
27 
28 
29 # This module's spesification
30 # <rtc-template block="module_spec">
31 AutoTestIn_spec = ["implementation_id", "AutoTestIn",
32  "type_name", "AutoTestIn",
33  "description", "ModuleDescription",
34  "version", "1.0.0",
35  "vendor", "HarumiMiyamoto",
36  "category", "example",
37  "activity_type", "STATIC",
38  "max_instance", "1",
39  "language", "Python",
40  "lang_type", "SCRIPT",
41  "exec_cxt.periodic.rate", "1.0",
42  ""]
43 # </rtc-template>
44 
45 
46 # Class implementing IDL interface MyService(MyService.idl)
47 class MyServiceSVC_impl(AutoTest__POA.MyService):
48  def __init__(self):
49  self._echoList = []
50  self._valueList = []
51  self._value = 0
52  self.__echo_msg= ""
53  self._isNew = False
54 
55  def __del__(self):
56  pass
57 
58  def echo(self, msg):
59  OpenRTM_aist.CORBA_SeqUtil.push_back(self._echoList, msg)
60  self.__echo_msg = msg
61  if self._isNew:
62  #print "echo's message was overwritten !!!"
63  pass
64  self._isNew = True
65  return msg
66 
67  def get_echo(self):
68  if self._isNew:
69  self._isNew = False
70  echomsg = self.__echo_msg
71  return echomsg
72 
73  return ""
74 
75  def reset_message(self):
76  self._isNew = False
77  self.__echo_msg = ""
78 
80 
81  """
82  \class AutoTestIn
83  \brief ModuleDescription
84 
85  """
86  def __init__(self, manager):
87  """
88  \brief constructor
89  \param manager Maneger Object
90  """
91  OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
92  self._cnt=0
93  self._flag=0
94 
95  def onInitialize(self):
96  self._d_dp_vIn = RTC.TimedFloat(RTC.Time(0,0),0)
97  self._d_dp_vSeqIn = RTC.TimedFloatSeq(RTC.Time(0,0),[])
98 
101 
102 
103  # Set InPort buffers
104  self.addInPort("in",self._InIn)
105  self.addInPort("seqin",self._SeqInIn)
106 
107  # Set OutPort buffers
109  #self._myservice0_var = MyService_i()
110 
111  # initialization of Provider
113 
114  # Set service provider to Ports
115  self._MyServicePort.registerProvider("myservice0", "MyService", self._myservice0_var)
116 
117  # Set CORBA Service Ports
118  self.addPort(self._MyServicePort)
119 
120  return RTC.RTC_OK
121 
122 
123  def onActivated(self, ec_id):
124  self._file=open('received-data','w')
125  self._msg = ""
126  return RTC.RTC_OK
127 
128  def onDeactivated(self, ec_id):
129  self._file.close()
130  self._myservice0_var.reset_message()
131  return RTC.RTC_OK
132 
133  def onExecute(self, ec_id):
134  if not self._msg:
135  self._msg = self._myservice0_var.get_echo()
136  if self._msg:
137  self._msg += "\n"
138 
139  if self._InIn.isNew() and self._SeqInIn.isNew() and self._msg:
140  floatdata = self._InIn.read()
141  fdata = "%.6f\n" % floatdata.data
142  print "fdata:", fdata
143  self._file.write(fdata)
144 
145  seqfloatdata = self._SeqInIn.read()
146 
147  t_sdata = tuple(d for d in seqfloatdata.data)
148  sdata = "%.6f %.6f %.6f %.6f %.6f\n" % t_sdata
149 
150  print "sdata: ", sdata
151  self._file.write(sdata)
152 
153  self._file.write(self._msg)
154  self._msg = ""
155 
156  return RTC.RTC_OK
157  else:
158  return RTC.RTC_OK
159 
160 
161 
162 
163 def AutoTestInInit(manager):
164  profile = OpenRTM_aist.Properties(defaults_str=AutoTestIn_spec)
165  manager.registerFactory(profile,
166  AutoTestIn,
167  OpenRTM_aist.Delete)
168 
169 def MyModuleInit(manager):
170  AutoTestInInit(manager)
171 
172  # Create a component
173  comp = manager.createComponent("AutoTestIn")
174 
175 
176 
177 def main():
178  mgr = OpenRTM_aist.Manager.init(len(sys.argv), sys.argv)
179  mgr.setModuleInitProc(MyModuleInit)
180  mgr.activateManager()
181  mgr.runManager()
182 
183 if __name__ == "__main__":
184  main()
185 
RT Conponent CORBA service/consumer Port.
Definition: CorbaPort.py:604
def push_back(seq, elem)
Push the new element back to the CORBA sequence.
The Properties class represents a persistent set of properties.
Definition: Properties.py:83
def addInPort(self, name, inport)
Definition: RTObject.py:2721
InPort template class.
Definition: InPort.py:58


openrtm_aist_python
Author(s): Shinji Kurihara
autogenerated on Mon Feb 28 2022 23:01:06