MyServiceConsumer.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # -*- coding: utf-8 -*-
00003 # -*- Python -*-
00004 
00005 import sys
00006 import string
00007 
00008 import RTC
00009 import SimpleService
00010 import OpenRTM_aist
00011 from omniORB import CORBA
00012 
00013 myserviceconsumer_spec = ["implementation_id", "MyServiceConsumer",
00014                           "type_name",         "MyServiceConsumer",
00015                           "description",       "MyService Consumer Sample component",
00016                           "version",           "1.0",
00017                           "vendor",            "Shinji Kurihara",
00018                           "category",          "example",
00019                           "activity_type",     "DataFlowComponent",
00020                           "max_instance",      "10",
00021                           "language",          "Python",
00022                           "lang_type",         "script",
00023                           ""]
00024 
00025 class echo_functor:
00026   def __init__(self, msg, result):
00027     self._msg = msg
00028     self._result = result
00029     return
00030 
00031   def __call__(self, obj):
00032     try:
00033       if CORBA.is_nil(obj):
00034         print "No service connected."
00035       else:
00036         self._result[0] = obj.echo(self._msg)
00037     except:
00038       pass
00039 
00040 
00041 class MyServiceConsumer(OpenRTM_aist.DataFlowComponentBase):
00042   # constructor
00043   def __init__(self, manager):
00044     OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
00045 
00046     self._async_echo = None
00047     self._result = [None]
00048     return
00049 
00050   def onInitialize(self):
00051     # initialization of CORBA Port
00052     self._myServicePort = OpenRTM_aist.CorbaPort("MyService")
00053 
00054     # initialization of Consumer
00055     self._myservice0 = OpenRTM_aist.CorbaConsumer(interfaceType=SimpleService.MyService)
00056         
00057     # Set service consumers to Ports
00058     self._myServicePort.registerConsumer("myservice0", "MyService", self._myservice0)
00059 
00060     # Set CORBA Service Ports
00061     self.addPort(self._myServicePort)
00062 
00063     return RTC.RTC_OK
00064 
00065   # The execution action that is invoked periodically
00066   def onExecute(self, ec_id):
00067     print "\n"
00068     print "Command list: "
00069     print " echo [msg]       : echo message."
00070     print " set_value [value]: set value."
00071     print " get_value        : get current value."
00072     print " get_echo_history : get input messsage history."
00073     print " get_value_history: get input value history."
00074     print "> ",
00075 
00076     args = str(sys.stdin.readline())
00077     argv = string.split(args)
00078     argv[-1] = argv[-1].rstrip("\n")
00079 
00080     if self._async_echo and self._async_echo.finished():
00081       print "echo() finished: ", self._result[0]
00082       self._async_echo = None
00083 
00084     if argv[0] == "echo" and len(argv) > 1:
00085       if not self._async_echo:
00086         retmsg = ""
00087         func = echo_functor(argv[1],self._result)
00088         self._async_echo = OpenRTM_aist.Async_tInvoker(self._myservice0._ptr(),
00089                                                                func)
00090         self._async_echo.invoke()
00091       else:
00092         print "echo() still invoking"
00093 
00094       return RTC.RTC_OK
00095 
00096     if argv[0] == "set_value" and len(argv) > 1:
00097       val = float(argv[1])
00098       self._myservice0._ptr().set_value(val)
00099       print "Set remote value: ", val
00100       return RTC.RTC_OK
00101       
00102     if argv[0] == "get_value":
00103       retval = self._myservice0._ptr().get_value()
00104       print "Current remote value: ", retval
00105       return RTC.RTC_OK;
00106       
00107     if argv[0] == "get_echo_history":
00108       OpenRTM_aist.CORBA_SeqUtil.for_each(self._myservice0._ptr().get_echo_history(),
00109                                           self.seq_print())
00110       return RTC.RTC_OK
00111       
00112     if argv[0] == "get_value_history":
00113       OpenRTM_aist.CORBA_SeqUtil.for_each(self._myservice0._ptr().get_value_history(),
00114                                           self.seq_print())
00115       return RTC.RTC_OK
00116       
00117     print "Invalid command or argument(s)."
00118 
00119     return RTC.RTC_OK
00120 
00121 
00122   # functor class to print sequence data
00123   class seq_print:
00124     def __init__(self):
00125       self._cnt = 0
00126       return
00127 
00128     def __call__(self, val):
00129       print self._cnt, ": ", val
00130       self._cnt += 1
00131       return
00132 
00133 
00134 def MyServiceConsumerInit(manager):
00135   profile = OpenRTM_aist.Properties(defaults_str=myserviceconsumer_spec)
00136   manager.registerFactory(profile,
00137                           MyServiceConsumer,
00138                           OpenRTM_aist.Delete)
00139 
00140 
00141 def MyModuleInit(manager):
00142   MyServiceConsumerInit(manager)
00143 
00144   # Create a component
00145   comp = manager.createComponent("MyServiceConsumer")
00146   return
00147 
00148 
00149 def main():
00150   # Initialize manager
00151   mgr = OpenRTM_aist.Manager.init(sys.argv)
00152 
00153   # Set module initialization proceduer
00154   # This procedure will be invoked in activateManager() function.
00155   mgr.setModuleInitProc(MyModuleInit)
00156 
00157   # Activate manager and register to naming service
00158   mgr.activateManager()
00159 
00160   # run the manager in blocking mode
00161   # runManager(False) is the default
00162   mgr.runManager()
00163 
00164   # If you want to run the manager in non-blocking mode, do like this
00165   # mgr.runManager(True)
00166   return
00167 
00168 
00169 if __name__ == "__main__":
00170   main()


openrtm_aist_python
Author(s): Shinji Kurihara
autogenerated on Thu Aug 27 2015 14:17:28