AutoTestOut.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 # -*- Python -*-
4 
5 """
6  \file AutoTestOut.py
7  \brief ModuleDescription
8  \date $Date$
9 
10 
11 """
12 import sys
13 import string
14 import time
15 sys.path.append(".")
16 
17 # Import RTM module
18 import RTC
19 import OpenRTM_aist
20 
21 # Import Service implementation class
22 # <rtc-template block="service_impl">
23 
24 # </rtc-template>
25 
26 # Import Service stub modules
27 # <rtc-template block="consumer_import">
28 import AutoTest, AutoTest__POA
29 from omniORB import CORBA
30 
31 #import csv
32 
33 # </rtc-template>
34 
35 
36 # This module's spesification
37 # <rtc-template block="module_spec">
38 AutoTestOut_spec = ["implementation_id", "AutoTestOut",
39  "type_name", "AutoTestOut",
40  "description", "ModuleDescription",
41  "version", "1.0.0",
42  "vendor", "HarumiMiyamoto",
43  "category", "example",
44  "activity_type", "STATIC",
45  "max_instance", "1",
46  "language", "Python",
47  "lang_type", "SCRIPT",
48  "exec_cxt.periodic.rate", "1.0",
49  ""]
50 
51 
53 
54  """
55  \class AutoTestOut
56  \brief ModuleDescription
57  """
58  def __init__(self, manager):
59  OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
60  return
61 
62  def onInitialize(self):
63  self._d_dp_vOut = RTC.TimedFloat(RTC.Time(0,0),0)
64  self._d_dp_vSeqOut = RTC.TimedFloatSeq(RTC.Time(0,0),[])
65 
68 
69  # Set OutPort buffers
70  self.addOutPort("out", self._OutOut)
71  self.addOutPort("seqout", self._SeqOutOut)
72 
74  self._myservice0_var = OpenRTM_aist.CorbaConsumer(interfaceType=AutoTest.MyService)
75 
76  # Set service consumers to Ports
77  self._MyServicePort.registerConsumer("myservice0", "MyService", self._myservice0_var)
78 
79  # Set CORBA Service Ports
80  self.addPort(self._MyServicePort)
81 
82  return RTC.RTC_OK
83 
84  def onActivated(self, ec_id):
85  try:
86  self._file=open('original-data')
87  except:
88  print("Can not open 'original-data' file.")
89  return RTC.RTC_ERROR
90 
91  return RTC.RTC_OK
92 
93  def onDeactivated(self, ec_id):
94  self._file.close()
95  return RTC.RTC_OK
96 
97 
98  def onExecute(self, ec_id):
99  floatSeq=[]
100  str1 = self._file.readline()
101  str2 = self._file.readline()
102  str3 = self._file.readline()
103 
104  if str1:
105  self._d_dp_vOut.data = float(str1)
106 
107  if str2:
108  str2 = str2.split(' ')
109  floatSeq.append(float(str2[0]))
110  floatSeq.append(float(str2[1]))
111  floatSeq.append(float(str2[2]))
112  floatSeq.append(float(str2[3]))
113  floatSeq.append(float(str2[4]))
114  self._d_dp_vSeqOut.data = floatSeq
115 
116  if str3:
117  if self._myservice0_var._ptr():
118  # Write out data
119  self._OutOut.write()
120  self._SeqOutOut.write()
121 
122  # invoking echo operation
123  retmsg = self._myservice0_var._ptr().echo(str3.rstrip('\r\n'))
124 
125  return RTC.RTC_OK
126 
127 
128 def AutoTestOutInit(manager):
129  profile = OpenRTM_aist.Properties(defaults_str=AutoTestOut_spec)
130  manager.registerFactory(profile,
131  AutoTestOut,
132  OpenRTM_aist.Delete)
133 
134 def MyModuleInit(manager):
135  AutoTestOutInit(manager)
136 
137  # Create a component
138  comp = manager.createComponent("AutoTestOut")
139 
140 
141 
142 def main():
143  mgr = OpenRTM_aist.Manager.init(len(sys.argv), sys.argv)
144  mgr.setModuleInitProc(MyModuleInit)
145  mgr.activateManager()
146  mgr.runManager()
147 
148 if __name__ == "__main__":
149 
150  main()
151 
OpenRTM_aist.examples.AutoTest.AutoTestOut.AutoTestOut.onExecute
def onExecute(self, ec_id)
Definition: AutoTestOut.py:98
OpenRTM_aist.examples.AutoTest.AutoTestOut.AutoTestOut
Definition: AutoTestOut.py:52
OpenRTM_aist.examples.AutoTest.AutoTestOut.AutoTestOut.__init__
def __init__(self, manager)
Constructor.
Definition: AutoTestOut.py:58
OpenRTM_aist.RTObject.RTObject_impl.addPort
def addPort(self, port)
Definition: RTObject.py:2667
OpenRTM_aist.examples.AutoTest.AutoTestOut.main
def main()
Definition: AutoTestOut.py:142
OpenRTM_aist.examples.AutoTest.AutoTestOut.AutoTestOut._myservice0_var
_myservice0_var
Definition: AutoTestOut.py:74
OpenRTM_aist.RingBuffer.RingBuffer
Definition: RingBuffer.py:41
OpenRTM_aist.examples.AutoTest.AutoTestOut.AutoTestOut.onInitialize
def onInitialize(self)
Definition: AutoTestOut.py:62
OpenRTM_aist.examples.AutoTest.AutoTestOut.AutoTestOut.onDeactivated
def onDeactivated(self, ec_id)
Definition: AutoTestOut.py:93
OpenRTM_aist.examples.AutoTest.AutoTestOut.AutoTestOut.onActivated
def onActivated(self, ec_id)
Definition: AutoTestOut.py:84
OpenRTM_aist.examples.AutoTest.AutoTestOut.MyModuleInit
def MyModuleInit(manager)
Definition: AutoTestOut.py:134
OpenRTM_aist.DataFlowComponentBase.DataFlowComponentBase
DataFlowComponentBase class.
Definition: DataFlowComponentBase.py:35
OpenRTM_aist.CorbaConsumer.CorbaConsumer
Definition: CorbaConsumer.py:187
OpenRTM_aist.examples.AutoTest.AutoTestOut.AutoTestOutInit
def AutoTestOutInit(manager)
Definition: AutoTestOut.py:128
OpenRTM_aist.examples.AutoTest.AutoTestOut.AutoTestOut._d_dp_vSeqOut
_d_dp_vSeqOut
Definition: AutoTestOut.py:64
OpenRTM_aist.examples.AutoTest.AutoTestOut.AutoTestOut._SeqOutOut
_SeqOutOut
Definition: AutoTestOut.py:67
OpenRTM_aist.examples.AutoTest.AutoTestOut.AutoTestOut._OutOut
_OutOut
Definition: AutoTestOut.py:66
OpenRTM_aist.Properties.Properties
Definition: Properties.py:83
hoge.echo
def echo(*args)
Definition: hoge.py:20
OpenRTM_aist.OutPort.OutPort
Definition: OutPort.py:69
OpenRTM_aist.RTObject.RTObject_impl.addOutPort
def addOutPort(self, name, outport)
Definition: RTObject.py:2765
OpenRTM_aist.examples.AutoTest.AutoTestOut.AutoTestOut._MyServicePort
_MyServicePort
Definition: AutoTestOut.py:73
OpenRTM_aist.examples.AutoTest.AutoTestOut.AutoTestOut._file
_file
Definition: AutoTestOut.py:86
OpenRTM_aist.CorbaPort.CorbaPort
RT Conponent CORBA service/consumer Port.
Definition: CorbaPort.py:604
OpenRTM_aist.examples.AutoTest.AutoTestOut.AutoTestOut._d_dp_vOut
_d_dp_vOut
Definition: AutoTestOut.py:63


openrtm_aist_python
Author(s): Shinji Kurihara
autogenerated on Mon Apr 21 2025 02:45:06