COCTestRTC.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # -*- coding: utf-8 -*-
00003 # -*- Python -*-
00004 
00005 import sys
00006 
00007 import RTC
00008 import OpenRTM_aist
00009 
00010 coctestrtc_spec = ["implementation_id", "COCTestRTC",
00011                   "type_name",         "COCTestRTC",
00012                   "description",       "Console input component",
00013                   "version",           "1.0",
00014                   "vendor",            "Shinji Kurihara",
00015                   "category",          "example",
00016                   "activity_type",     "DataFlowComponent",
00017                   "max_instance",      "10",
00018                   "language",          "Python",
00019                   "lang_type",         "script",
00020                   ""]
00021 
00022 
00023 class DataListener(OpenRTM_aist.ConnectorDataListenerT):
00024   def __init__(self, name):
00025     self._name = name
00026     
00027   def __del__(self):
00028     print "dtor of ", self._name
00029 
00030   def __call__(self, info, cdrdata):
00031     data = OpenRTM_aist.ConnectorDataListenerT.__call__(self, info, cdrdata, RTC.TimedLong(RTC.Time(0,0),0))
00032     print "------------------------------"
00033     print "Listener:       ", self._name
00034     print "Profile::name:  ", info.name
00035     print "Profile::id:    ", info.id
00036     print "Data:           ", data.data
00037     print "------------------------------"
00038     
00039 class ConnListener(OpenRTM_aist.ConnectorListener):
00040   def __init__(self, name):
00041     self._name = name
00042 
00043   def __del__(self):
00044     print "dtor of ", self._name
00045 
00046   def __call__(self, info):
00047     print "------------------------------"
00048     print "Listener:       ", self._name
00049     print "Profile::name:  ", info.name
00050     print "Profile::id:    ", info.id
00051     print "------------------------------"
00052 
00053 
00054 class COCTestRTC(OpenRTM_aist.DataFlowComponentBase):
00055   def __init__(self, manager):
00056     OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
00057     return
00058 
00059   def onInitialize(self):
00060     self._data = RTC.TimedLong(RTC.Time(0,0),0)
00061     self._outport = OpenRTM_aist.OutPort("out", self._data)
00062     # Set OutPort buffer
00063     self.addOutPort("out", self._outport)
00064     self._outport.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_WRITE,
00065                                            DataListener("ON_BUFFER_WRITE"))
00066     self._outport.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_FULL, 
00067                                            DataListener("ON_BUFFER_FULL"))
00068     self._outport.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_WRITE_TIMEOUT, 
00069                                            DataListener("ON_BUFFER_WRITE_TIMEOUT"))
00070     self._outport.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_OVERWRITE, 
00071                                            DataListener("ON_BUFFER_OVERWRITE"))
00072     self._outport.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_READ, 
00073                                            DataListener("ON_BUFFER_READ"))
00074     self._outport.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_SEND, 
00075                                            DataListener("ON_SEND"))
00076     self._outport.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVED,
00077                                            DataListener("ON_RECEIVED"))
00078     self._outport.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVER_FULL, 
00079                                            DataListener("ON_RECEIVER_FULL"))
00080     self._outport.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVER_TIMEOUT, 
00081                                            DataListener("ON_RECEIVER_TIMEOUT"))
00082     self._outport.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVER_ERROR,
00083                                            DataListener("ON_RECEIVER_ERROR"))
00084 
00085     self._outport.addConnectorListener(OpenRTM_aist.ConnectorListenerType.ON_CONNECT,
00086                                        ConnListener("ON_CONNECT"))
00087     self._outport.addConnectorListener(OpenRTM_aist.ConnectorListenerType.ON_DISCONNECT,
00088                                        ConnListener("ON_DISCONNECT"))
00089 
00090     return RTC.RTC_OK
00091 
00092         
00093   def onExecute(self, ec_id):
00094     print "Please input number: ",
00095     self._data.data = long(sys.stdin.readline())
00096     OpenRTM_aist.setTimestamp(self._data)
00097     print "Sending to subscriber: ", self._data.data
00098     self._outport.write()
00099     return RTC.RTC_OK
00100 
00101 
00102 def COCTestRTCInit(manager):
00103   profile = OpenRTM_aist.Properties(defaults_str=coctestrtc_spec)
00104   manager.registerFactory(profile,
00105                           COCTestRTC,
00106                           OpenRTM_aist.Delete)
00107 
00108 
00109 def MyModuleInit(manager):
00110   COCTestRTCInit(manager)
00111 
00112   # Create a component
00113   comp = manager.createComponent("COCTestRTC")
00114 
00115 def main():
00116   # Initialize manager
00117   mgr = OpenRTM_aist.Manager.init(sys.argv)
00118 
00119   # Set module initialization proceduer
00120   # This procedure will be invoked in activateManager() function.
00121   mgr.setModuleInitProc(MyModuleInit)
00122 
00123   # Activate manager and register to naming service
00124   mgr.activateManager()
00125 
00126   # run the manager in blocking mode
00127   # runManager(False) is the default
00128   mgr.runManager()
00129 
00130   # If you want to run the manager in non-blocking mode, do like this
00131   # mgr.runManager(True)
00132 
00133 if __name__ == "__main__":
00134   main()


openrtm_aist_python
Author(s): Shinji Kurihara
autogenerated on Thu Aug 27 2015 14:17:28