AutoTestOut.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 # -*- Python -*-
4 
5 """
6  \file AutoTestOut.py
7  \brief ModuleDescription
8  \date $Date$
9 
10 
11 """
12 import sys
13 import string
14 import time
15 sys.path.append(".")
16 
17 # Import RTM module
18 import RTC
19 import OpenRTM_aist
20 
21 # Import Service implementation class
22 # <rtc-template block="service_impl">
23 
24 # </rtc-template>
25 
26 # Import Service stub modules
27 # <rtc-template block="consumer_import">
28 import AutoTest, AutoTest__POA
29 from omniORB import CORBA
30 
31 #import csv
32 
33 # </rtc-template>
34 
35 
36 # This module's spesification
37 # <rtc-template block="module_spec">
38 AutoTestOut_spec = ["implementation_id", "AutoTestOut",
39  "type_name", "AutoTestOut",
40  "description", "ModuleDescription",
41  "version", "1.0.0",
42  "vendor", "HarumiMiyamoto",
43  "category", "example",
44  "activity_type", "STATIC",
45  "max_instance", "1",
46  "language", "Python",
47  "lang_type", "SCRIPT",
48  "exec_cxt.periodic.rate", "1.0",
49  ""]
50 
51 
53 
54  """
55  \class AutoTestOut
56  \brief ModuleDescription
57  """
58  def __init__(self, manager):
59  OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
60  return
61 
62  def onInitialize(self):
63  self._d_dp_vOut = RTC.TimedFloat(RTC.Time(0,0),0)
64  self._d_dp_vSeqOut = RTC.TimedFloatSeq(RTC.Time(0,0),[])
65 
68 
69  # Set OutPort buffers
70  self.addOutPort("out", self._OutOut)
71  self.addOutPort("seqout", self._SeqOutOut)
72 
74  self._myservice0_var = OpenRTM_aist.CorbaConsumer(interfaceType=AutoTest.MyService)
75 
76  # Set service consumers to Ports
77  self._MyServicePort.registerConsumer("myservice0", "MyService", self._myservice0_var)
78 
79  # Set CORBA Service Ports
80  self.addPort(self._MyServicePort)
81 
82  return RTC.RTC_OK
83 
84  def onActivated(self, ec_id):
85  try:
86  self._file=open('original-data')
87  except:
88  print "Can not open 'original-data' file."
89  return RTC.RTC_ERROR
90 
91  return RTC.RTC_OK
92 
93  def onDeactivated(self, ec_id):
94  self._file.close()
95  return RTC.RTC_OK
96 
97 
98  def onExecute(self, ec_id):
99  floatSeq=[]
100  str1 = self._file.readline()
101  str2 = self._file.readline()
102  str3 = self._file.readline()
103 
104  if str1:
105  self._d_dp_vOut.data = float(str1)
106 
107  if str2:
108  str2 = str2.split(' ')
109  floatSeq.append(float(str2[0]))
110  floatSeq.append(float(str2[1]))
111  floatSeq.append(float(str2[2]))
112  floatSeq.append(float(str2[3]))
113  floatSeq.append(float(str2[4]))
114  self._d_dp_vSeqOut.data = floatSeq
115 
116  if str3:
117  if self._myservice0_var._ptr():
118  # Write out data
119  self._OutOut.write()
120  self._SeqOutOut.write()
121 
122  # invoking echo operation
123  retmsg = self._myservice0_var._ptr().echo(str3.rstrip('\r\n'))
124 
125  return RTC.RTC_OK
126 
127 
128 def AutoTestOutInit(manager):
129  profile = OpenRTM_aist.Properties(defaults_str=AutoTestOut_spec)
130  manager.registerFactory(profile,
131  AutoTestOut,
132  OpenRTM_aist.Delete)
133 
134 def MyModuleInit(manager):
135  AutoTestOutInit(manager)
136 
137  # Create a component
138  comp = manager.createComponent("AutoTestOut")
139 
140 
141 
142 def main():
143  mgr = OpenRTM_aist.Manager.init(len(sys.argv), sys.argv)
144  mgr.setModuleInitProc(MyModuleInit)
145  mgr.activateManager()
146  mgr.runManager()
147 
148 if __name__ == "__main__":
149 
150  main()
151 
RT Conponent CORBA service/consumer Port.
Definition: CorbaPort.py:604
The Properties class represents a persistent set of properties.
Definition: Properties.py:83
def addOutPort(self, name, outport)
Definition: RTObject.py:2765
def echo(args)
Definition: hoge.py:20


openrtm_aist_python
Author(s): Shinji Kurihara
autogenerated on Mon Feb 28 2022 23:01:06