MyServiceConsumer.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 # -*- Python -*-
4 
5 import sys
6 import string
7 
8 import RTC
9 import SimpleService
10 import OpenRTM_aist
11 from omniORB import CORBA
12 
13 myserviceconsumer_spec = ["implementation_id", "MyServiceConsumer",
14  "type_name", "MyServiceConsumer",
15  "description", "MyService Consumer Sample component",
16  "version", "1.0",
17  "vendor", "Shinji Kurihara",
18  "category", "example",
19  "activity_type", "DataFlowComponent",
20  "max_instance", "10",
21  "language", "Python",
22  "lang_type", "script",
23  ""]
24 
26  def __init__(self, msg, result):
27  self._msg = msg
28  self._result = result
29  return
30 
31  def __call__(self, obj):
32  try:
33  if CORBA.is_nil(obj):
34  print "No service connected."
35  else:
36  self._result[0] = obj.echo(self._msg)
37  except:
38  pass
39 
40 
42  # constructor
43  def __init__(self, manager):
44  OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
45 
46  self._async_echo = None
47  self._result = [None]
48  return
49 
50  def onInitialize(self):
51  # initialization of CORBA Port
53 
54  # initialization of Consumer
55  self._myservice0 = OpenRTM_aist.CorbaConsumer(interfaceType=SimpleService.MyService)
56 
57  # Set service consumers to Ports
58  self._myServicePort.registerConsumer("myservice0", "MyService", self._myservice0)
59 
60  # Set CORBA Service Ports
61  self.addPort(self._myServicePort)
62 
63  return RTC.RTC_OK
64 
65  # The execution action that is invoked periodically
66  def onExecute(self, ec_id):
67  print "\n"
68  print "Command list: "
69  print " echo [msg] : echo message."
70  print " set_value [value]: set value."
71  print " get_value : get current value."
72  print " get_echo_history : get input messsage history."
73  print " get_value_history: get input value history."
74  print "> ",
75 
76  args = str(sys.stdin.readline())
77  argv = string.split(args)
78  argv[-1] = argv[-1].rstrip("\n")
79 
80  if self._async_echo and self._async_echo.finished():
81  print "echo() finished: ", self._result[0]
82  self._async_echo = None
83 
84  if argv[0] == "echo" and len(argv) > 1:
85  if not self._async_echo:
86  retmsg = ""
87  func = echo_functor(argv[1],self._result)
88  self._async_echo = OpenRTM_aist.Async_tInvoker(self._myservice0._ptr(),
89  func)
90  self._async_echo.invoke()
91  else:
92  print "echo() still invoking"
93 
94  return RTC.RTC_OK
95 
96  if argv[0] == "set_value" and len(argv) > 1:
97  val = float(argv[1])
98  self._myservice0._ptr().set_value(val)
99  print "Set remote value: ", val
100  return RTC.RTC_OK
101 
102  if argv[0] == "get_value":
103  retval = self._myservice0._ptr().get_value()
104  print "Current remote value: ", retval
105  return RTC.RTC_OK;
106 
107  if argv[0] == "get_echo_history":
108  OpenRTM_aist.CORBA_SeqUtil.for_each(self._myservice0._ptr().get_echo_history(),
109  self.seq_print())
110  return RTC.RTC_OK
111 
112  if argv[0] == "get_value_history":
113  OpenRTM_aist.CORBA_SeqUtil.for_each(self._myservice0._ptr().get_value_history(),
114  self.seq_print())
115  return RTC.RTC_OK
116 
117  print "Invalid command or argument(s)."
118 
119  return RTC.RTC_OK
120 
121 
122  # functor class to print sequence data
123  class seq_print:
124  def __init__(self):
125  self._cnt = 0
126  return
127 
128  def __call__(self, val):
129  print self._cnt, ": ", val
130  self._cnt += 1
131  return
132 
133 
135  profile = OpenRTM_aist.Properties(defaults_str=myserviceconsumer_spec)
136  manager.registerFactory(profile,
137  MyServiceConsumer,
138  OpenRTM_aist.Delete)
139 
140 
141 def MyModuleInit(manager):
142  MyServiceConsumerInit(manager)
143 
144  # Create a component
145  comp = manager.createComponent("MyServiceConsumer")
146  return
147 
148 
149 def main():
150  # Initialize manager
151  mgr = OpenRTM_aist.Manager.init(sys.argv)
152 
153  # Set module initialization proceduer
154  # This procedure will be invoked in activateManager() function.
155  mgr.setModuleInitProc(MyModuleInit)
156 
157  # Activate manager and register to naming service
158  mgr.activateManager()
159 
160  # run the manager in blocking mode
161  # runManager(False) is the default
162  mgr.runManager()
163 
164  # If you want to run the manager in non-blocking mode, do like this
165  # mgr.runManager(True)
166  return
167 
168 
169 if __name__ == "__main__":
170  main()
RT Conponent CORBA service/consumer Port.
Definition: CorbaPort.py:604
def for_each(seq, f)
Apply the functor to all CORBA sequence elements.
The Properties class represents a persistent set of properties.
Definition: Properties.py:83


openrtm_aist_python
Author(s): Shinji Kurihara
autogenerated on Mon Feb 28 2022 23:01:06