MyServiceConsumer.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 # -*- Python -*-
4 
5 import sys
6 import string
7 
8 import RTC
9 import SimpleService
10 import OpenRTM_aist
11 from omniORB import CORBA
12 
13 myserviceconsumer_spec = ["implementation_id", "MyServiceConsumer",
14  "type_name", "MyServiceConsumer",
15  "description", "MyService Consumer Sample component",
16  "version", "1.0",
17  "vendor", "Shinji Kurihara",
18  "category", "example",
19  "activity_type", "DataFlowComponent",
20  "max_instance", "10",
21  "language", "Python",
22  "lang_type", "script",
23  ""]
24 
26  def __init__(self, msg, result):
27  self._msg = msg
28  self._result = result
29  return
30 
31  def __call__(self, obj):
32  try:
33  if CORBA.is_nil(obj):
34  print("No service connected.")
35  else:
36  self._result[0] = obj.echo(self._msg)
37  except:
38  pass
39 
40 
42  # constructor
43  def __init__(self, manager):
44  OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
45 
46  self._async_echo = None
47  self._result = [None]
48  return
49 
50  def onInitialize(self):
51  # initialization of CORBA Port
53 
54  # initialization of Consumer
55  self._myservice0 = OpenRTM_aist.CorbaConsumer(interfaceType=SimpleService.MyService)
56 
57  # Set service consumers to Ports
58  self._myServicePort.registerConsumer("myservice0", "MyService", self._myservice0)
59 
60  # Set CORBA Service Ports
61  self.addPort(self._myServicePort)
62 
63  return RTC.RTC_OK
64 
65  # The execution action that is invoked periodically
66  def onExecute(self, ec_id):
67  print("\n")
68  print("Command list: ")
69  print(" echo [msg] : echo message.")
70  print(" set_value [value]: set value.")
71  print(" get_value : get current value.")
72  print(" get_echo_history : get input messsage history.")
73  print(" get_value_history: get input value history.")
74  print("> ", end=' ')
75 
76  args = str(sys.stdin.readline())
77  argv = string.split(args)
78  argv[-1] = argv[-1].rstrip("\n")
79 
80  if self._async_echo and self._async_echo.finished():
81  print("echo() finished: ", self._result[0])
82  self._async_echo = None
83 
84  if argv[0] == "echo" and len(argv) > 1:
85  if not self._async_echo:
86  retmsg = ""
87  func = echo_functor(argv[1],self._result)
88  self._async_echo = OpenRTM_aist.Async_tInvoker(self._myservice0._ptr(),
89  func)
90  self._async_echo.invoke()
91  else:
92  print("echo() still invoking")
93 
94  return RTC.RTC_OK
95 
96  if argv[0] == "set_value" and len(argv) > 1:
97  val = float(argv[1])
98  self._myservice0._ptr().set_value(val)
99  print("Set remote value: ", val)
100  return RTC.RTC_OK
101 
102  if argv[0] == "get_value":
103  retval = self._myservice0._ptr().get_value()
104  print("Current remote value: ", retval)
105  return RTC.RTC_OK;
106 
107  if argv[0] == "get_echo_history":
108  OpenRTM_aist.CORBA_SeqUtil.for_each(self._myservice0._ptr().get_echo_history(),
109  self.seq_print())
110  return RTC.RTC_OK
111 
112  if argv[0] == "get_value_history":
113  OpenRTM_aist.CORBA_SeqUtil.for_each(self._myservice0._ptr().get_value_history(),
114  self.seq_print())
115  return RTC.RTC_OK
116 
117  print("Invalid command or argument(s).")
118 
119  return RTC.RTC_OK
120 
121 
122  # functor class to print sequence data
123  class seq_print:
124  def __init__(self):
125  self._cnt = 0
126  return
127 
128  def __call__(self, val):
129  print(self._cnt, ": ", val)
130  self._cnt += 1
131  return
132 
133 
135  profile = OpenRTM_aist.Properties(defaults_str=myserviceconsumer_spec)
136  manager.registerFactory(profile,
137  MyServiceConsumer,
138  OpenRTM_aist.Delete)
139 
140 
141 def MyModuleInit(manager):
142  MyServiceConsumerInit(manager)
143 
144  # Create a component
145  comp = manager.createComponent("MyServiceConsumer")
146  return
147 
148 
149 def main():
150  # Initialize manager
151  mgr = OpenRTM_aist.Manager.init(sys.argv)
152 
153  # Set module initialization proceduer
154  # This procedure will be invoked in activateManager() function.
155  mgr.setModuleInitProc(MyModuleInit)
156 
157  # Activate manager and register to naming service
158  mgr.activateManager()
159 
160  # run the manager in blocking mode
161  # runManager(False) is the default
162  mgr.runManager()
163 
164  # If you want to run the manager in non-blocking mode, do like this
165  # mgr.runManager(True)
166  return
167 
168 
169 if __name__ == "__main__":
170  main()
OpenRTM_aist.examples.SimpleService.MyServiceConsumer.echo_functor.__init__
def __init__(self, msg, result)
Definition: MyServiceConsumer.py:26
OpenRTM_aist.examples.SimpleService.MyServiceConsumer.MyServiceConsumer.seq_print.__init__
def __init__(self)
Definition: MyServiceConsumer.py:124
OpenRTM_aist.examples.SimpleService.MyServiceConsumer.MyServiceConsumer
Definition: MyServiceConsumer.py:41
OpenRTM_aist.RTObject.RTObject_impl.addPort
def addPort(self, port)
Definition: RTObject.py:2667
OpenRTM_aist.examples.SimpleService.MyServiceConsumer.echo_functor.__call__
def __call__(self, obj)
Definition: MyServiceConsumer.py:31
OpenRTM_aist.examples.SimpleService.MyServiceConsumer.MyServiceConsumer.onExecute
def onExecute(self, ec_id)
Definition: MyServiceConsumer.py:66
OpenRTM_aist.examples.SimpleService.MyServiceConsumer.MyModuleInit
def MyModuleInit(manager)
Definition: MyServiceConsumer.py:141
OpenRTM_aist.examples.SimpleService.MyServiceConsumer.MyServiceConsumer.seq_print._cnt
_cnt
Definition: MyServiceConsumer.py:125
OpenRTM_aist.examples.SimpleService.MyServiceConsumer.MyServiceConsumer._myServicePort
_myServicePort
Definition: MyServiceConsumer.py:52
OpenRTM_aist.CORBA_SeqUtil.for_each
def for_each(seq, f)
Apply the functor to all CORBA sequence elements.
Definition: CORBA_SeqUtil.py:47
OpenRTM_aist.examples.SimpleService.MyServiceConsumer.MyServiceConsumerInit
def MyServiceConsumerInit(manager)
Definition: MyServiceConsumer.py:134
OpenRTM_aist.examples.SimpleService.MyServiceConsumer.MyServiceConsumer.onInitialize
def onInitialize(self)
Definition: MyServiceConsumer.py:50
OpenRTM_aist.examples.SimpleService.MyServiceConsumer.echo_functor._result
_result
Definition: MyServiceConsumer.py:28
OpenRTM_aist.examples.SimpleService.MyServiceConsumer.MyServiceConsumer.seq_print
Definition: MyServiceConsumer.py:123
OpenRTM_aist.DataFlowComponentBase.DataFlowComponentBase
DataFlowComponentBase class.
Definition: DataFlowComponentBase.py:35
OpenRTM_aist.CorbaConsumer.CorbaConsumer
Definition: CorbaConsumer.py:187
OpenRTM_aist.examples.SimpleService.MyServiceConsumer.MyServiceConsumer._result
_result
Definition: MyServiceConsumer.py:47
OpenRTM_aist.examples.SimpleService.MyServiceConsumer.main
def main()
Definition: MyServiceConsumer.py:149
OpenRTM_aist.examples.SimpleService.MyServiceConsumer.MyServiceConsumer.seq_print.__call__
def __call__(self, val)
Definition: MyServiceConsumer.py:128
OpenRTM_aist.examples.SimpleService.MyServiceConsumer.echo_functor._msg
_msg
Definition: MyServiceConsumer.py:27
OpenRTM_aist.examples.SimpleService.MyServiceConsumer.echo_functor
Definition: MyServiceConsumer.py:25
OpenRTM_aist.Properties.Properties
Definition: Properties.py:83
OpenRTM_aist.examples.SimpleService.MyServiceConsumer.MyServiceConsumer._async_echo
_async_echo
Definition: MyServiceConsumer.py:46
OpenRTM_aist.examples.SimpleService.MyServiceConsumer.MyServiceConsumer.__init__
def __init__(self, manager)
Constructor.
Definition: MyServiceConsumer.py:43
OpenRTM_aist.examples.SimpleService.MyServiceConsumer.MyServiceConsumer._myservice0
_myservice0
Definition: MyServiceConsumer.py:55
OpenRTM_aist.CorbaPort.CorbaPort
RT Conponent CORBA service/consumer Port.
Definition: CorbaPort.py:604


openrtm_aist_python
Author(s): Shinji Kurihara
autogenerated on Mon Apr 21 2025 02:45:06