MyServiceProvider.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 # -*- Python -*-
4 
5 import sys
6 import string
7 import time
8 
9 import RTC
10 import SimpleService, SimpleService__POA
11 import OpenRTM_aist
12 
13 # Module specification
14 myserviceprovider_spec = ["implementation_id", "MyServiceProvider",
15  "type_name", "MyServiceProvider",
16  "description", "MyService Provider Sample component",
17  "version", "1.0",
18  "vendor", "Shinji Kurihara",
19  "category", "example",
20  "activity_type", "DataFlowComponent",
21  "max_instance", "10",
22  "language", "Python",
23  "lang_type", "script",
24  ""]
25 
26 # functor class to print sequence data
27 class seq_print:
28  def __init__(self):
29  self._cnt = 0
30  return
31 
32  def __call__(self, val):
33  print self._cnt, ": ", val
34  self._cnt += 1
35  return
36 
37 
38 # Class implementing IDL interface MyService(MyService.idl)
39 class MyServiceSVC_impl(SimpleService__POA.MyService):
40  def __init__(self):
41  self._echoList = []
42  self._valueList = []
43  self._value = 0
44  return
45 
46  def __del__(self):
47  pass
48 
49  def echo(self, msg):
50  OpenRTM_aist.CORBA_SeqUtil.push_back(self._echoList, msg)
51  print "MyService::echo() was called."
52  for i in range(10):
53  print "Message: ", msg
54  time.sleep(1)
55  print "MyService::echo() was finished."
56  return msg
57 
58  def get_echo_history(self):
59  print "MyService::get_echo_history() was called."
61  return self._echoList
62 
63  def set_value(self, value):
65  self._value = value
66  print "MyService::set_value() was called."
67  print "Current value: ", self._value
68  return
69 
70  def get_value(self):
71  print "MyService::get_value() was called."
72  print "Current value: ", self._value
73  return float(self._value)
74 
75  def get_value_history(self):
76  print "MyService::get_value_history() was called."
78 
79  return self._valueList
80 
81 
82 
84  def __init__(self, manager):
85  OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
86 
87  return
88 
89 
90  def onInitialize(self):
91  # initialization of CORBA Port
93 
94  # initialization of Provider
96 
97  # Set service providers to Ports
98  self._myServicePort.registerProvider("myservice0", "MyService", self._myservice0)
99 
100  # Set CORBA Service Ports
101  self.addPort(self._myServicePort)
102 
103  return RTC.RTC_OK
104 
105 
107  profile = OpenRTM_aist.Properties(defaults_str=myserviceprovider_spec)
108  manager.registerFactory(profile,
109  MyServiceProvider,
110  OpenRTM_aist.Delete)
111  return
112 
113 
114 def MyModuleInit(manager):
115  MyServiceProviderInit(manager)
116 
117  # Create a component
118  comp = manager.createComponent("MyServiceProvider")
119 
120  """
121  rtobj = manager.getPOA().servant_to_reference(comp)._narrow(RTC.RTObject)
122 
123  ecs = rtobj.get_execution_context_services()
124  ecs[0].activate_component(rtobj)
125  """
126  return
127 
128 
129 def main():
130  # Initialize manager
131  mgr = OpenRTM_aist.Manager.init(sys.argv)
132 
133  # Set module initialization proceduer
134  # This procedure will be invoked in activateManager() function.
135  mgr.setModuleInitProc(MyModuleInit)
136 
137  # Activate manager and register to naming service
138  mgr.activateManager()
139 
140  # run the manager in blocking mode
141  # runManager(False) is the default
142  mgr.runManager()
143 
144  # If you want to run the manager in non-blocking mode, do like this
145  # mgr.runManager(True)
146 
147 
148 if __name__ == "__main__":
149  main()
RT Conponent CORBA service/consumer Port.
Definition: CorbaPort.py:604
def push_back(seq, elem)
Push the new element back to the CORBA sequence.
def for_each(seq, f)
Apply the functor to all CORBA sequence elements.
The Properties class represents a persistent set of properties.
Definition: Properties.py:83


openrtm_aist_python
Author(s): Shinji Kurihara
autogenerated on Mon Feb 28 2022 23:01:06