COCTestRTC.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 # -*- Python -*-
4 
5 import sys
6 
7 import RTC
8 import OpenRTM_aist
9 
10 coctestrtc_spec = ["implementation_id", "COCTestRTC",
11  "type_name", "COCTestRTC",
12  "description", "Console input component",
13  "version", "1.0",
14  "vendor", "Shinji Kurihara",
15  "category", "example",
16  "activity_type", "DataFlowComponent",
17  "max_instance", "10",
18  "language", "Python",
19  "lang_type", "script",
20  ""]
21 
22 
24  def __init__(self, name):
25  self._name = name
26 
27  def __del__(self):
28  print("dtor of ", self._name)
29 
30  def __call__(self, info, cdrdata):
31  data = OpenRTM_aist.ConnectorDataListenerT.__call__(self, info, cdrdata, RTC.TimedLong(RTC.Time(0,0),0))
32  print("------------------------------")
33  print("Listener: ", self._name)
34  print("Profile::name: ", info.name)
35  print("Profile::id: ", info.id)
36  print("Data: ", data.data)
37  print("------------------------------")
38 
40  def __init__(self, name):
41  self._name = name
42 
43  def __del__(self):
44  print("dtor of ", self._name)
45 
46  def __call__(self, info):
47  print("------------------------------")
48  print("Listener: ", self._name)
49  print("Profile::name: ", info.name)
50  print("Profile::id: ", info.id)
51  print("------------------------------")
52 
53 
55  def __init__(self, manager):
56  OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
57  return
58 
59  def onInitialize(self):
60  self._data = RTC.TimedLong(RTC.Time(0,0),0)
61  self._outport = OpenRTM_aist.OutPort("out", self._data)
62  # Set OutPort buffer
63  self.addOutPort("out", self._outport)
64  self._outport.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_WRITE,
65  DataListener("ON_BUFFER_WRITE"))
66  self._outport.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_FULL,
67  DataListener("ON_BUFFER_FULL"))
68  self._outport.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_WRITE_TIMEOUT,
69  DataListener("ON_BUFFER_WRITE_TIMEOUT"))
70  self._outport.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_OVERWRITE,
71  DataListener("ON_BUFFER_OVERWRITE"))
72  self._outport.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_READ,
73  DataListener("ON_BUFFER_READ"))
74  self._outport.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_SEND,
75  DataListener("ON_SEND"))
76  self._outport.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVED,
77  DataListener("ON_RECEIVED"))
78  self._outport.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVER_FULL,
79  DataListener("ON_RECEIVER_FULL"))
80  self._outport.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVER_TIMEOUT,
81  DataListener("ON_RECEIVER_TIMEOUT"))
82  self._outport.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVER_ERROR,
83  DataListener("ON_RECEIVER_ERROR"))
84 
85  self._outport.addConnectorListener(OpenRTM_aist.ConnectorListenerType.ON_CONNECT,
86  ConnListener("ON_CONNECT"))
87  self._outport.addConnectorListener(OpenRTM_aist.ConnectorListenerType.ON_DISCONNECT,
88  ConnListener("ON_DISCONNECT"))
89 
90  return RTC.RTC_OK
91 
92 
93  def onExecute(self, ec_id):
94  print("Please input number: ", end=' ')
95  self._data.data = int(sys.stdin.readline())
96  OpenRTM_aist.setTimestamp(self._data)
97  print("Sending to subscriber: ", self._data.data)
98  self._outport.write()
99  return RTC.RTC_OK
100 
101 
102 def COCTestRTCInit(manager):
103  profile = OpenRTM_aist.Properties(defaults_str=coctestrtc_spec)
104  manager.registerFactory(profile,
105  COCTestRTC,
106  OpenRTM_aist.Delete)
107 
108 
109 def MyModuleInit(manager):
110  COCTestRTCInit(manager)
111 
112  # Create a component
113  comp = manager.createComponent("COCTestRTC")
114 
115 def main():
116  # Initialize manager
117  mgr = OpenRTM_aist.Manager.init(sys.argv)
118 
119  # Set module initialization proceduer
120  # This procedure will be invoked in activateManager() function.
121  mgr.setModuleInitProc(MyModuleInit)
122 
123  # Activate manager and register to naming service
124  mgr.activateManager()
125 
126  # run the manager in blocking mode
127  # runManager(False) is the default
128  mgr.runManager()
129 
130  # If you want to run the manager in non-blocking mode, do like this
131  # mgr.runManager(True)
132 
133 if __name__ == "__main__":
134  main()
OpenRTM_aist.ext.sdo.observer.COCTestRTC.COCTestRTC.onInitialize
def onInitialize(self)
Definition: COCTestRTC.py:59
OpenRTM_aist.ext.sdo.observer.COCTestRTC.COCTestRTC._data
_data
Definition: COCTestRTC.py:60
OpenRTM_aist.ext.sdo.observer.COCTestRTC.COCTestRTC
Definition: COCTestRTC.py:54
OpenRTM_aist.ConnectorListener.ConnectorListener
Definition: ConnectorListener.py:452
OpenRTM_aist.ext.sdo.observer.COCTestRTC.COCTestRTC._outport
_outport
Definition: COCTestRTC.py:61
OpenRTM_aist.ext.sdo.observer.COCTestRTC.DataListener.__init__
def __init__(self, name)
Definition: COCTestRTC.py:24
OpenRTM_aist.ext.sdo.observer.COCTestRTC.DataListener._name
_name
Definition: COCTestRTC.py:25
OpenRTM_aist.ext.sdo.observer.COCTestRTC.ConnListener._name
_name
Definition: COCTestRTC.py:41
OpenRTM_aist.ext.sdo.observer.COCTestRTC.ConnListener.__del__
def __del__(self)
Definition: COCTestRTC.py:43
OpenRTM_aist.ext.sdo.observer.COCTestRTC.main
def main()
Definition: COCTestRTC.py:115
OpenRTM_aist.ext.sdo.observer.COCTestRTC.COCTestRTC.onExecute
def onExecute(self, ec_id)
Definition: COCTestRTC.py:93
OpenRTM_aist.ext.sdo.observer.COCTestRTC.ConnListener.__call__
def __call__(self, info)
Definition: COCTestRTC.py:46
OpenRTM_aist.DataFlowComponentBase.DataFlowComponentBase
DataFlowComponentBase class.
Definition: DataFlowComponentBase.py:35
OpenRTM_aist.ext.sdo.observer.COCTestRTC.DataListener.__del__
def __del__(self)
Definition: COCTestRTC.py:27
OpenRTM_aist.ConnectorListener.ConnectorDataListenerT
Definition: ConnectorListener.py:256
OpenRTM_aist.ext.sdo.observer.COCTestRTC.DataListener
Definition: COCTestRTC.py:23
OpenRTM_aist.ext.sdo.observer.COCTestRTC.ConnListener.__init__
def __init__(self, name)
Definition: COCTestRTC.py:40
OpenRTM_aist.Properties.Properties
Definition: Properties.py:83
OpenRTM_aist.ext.sdo.observer.COCTestRTC.COCTestRTC.__init__
def __init__(self, manager)
Constructor.
Definition: COCTestRTC.py:55
OpenRTM_aist.OutPort.OutPort
Definition: OutPort.py:69
OpenRTM_aist.RTObject.RTObject_impl.addOutPort
def addOutPort(self, name, outport)
Definition: RTObject.py:2765
OpenRTM_aist.ext.sdo.observer.COCTestRTC.COCTestRTCInit
def COCTestRTCInit(manager)
Definition: COCTestRTC.py:102
OpenRTM_aist.ext.sdo.observer.COCTestRTC.MyModuleInit
def MyModuleInit(manager)
Definition: COCTestRTC.py:109
OpenRTM_aist.ext.sdo.observer.COCTestRTC.ConnListener
Definition: COCTestRTC.py:39
OpenRTM_aist.ext.sdo.observer.COCTestRTC.DataListener.__call__
def __call__(self, info, cdrdata)
Definition: COCTestRTC.py:30


openrtm_aist_python
Author(s): Shinji Kurihara
autogenerated on Mon Apr 21 2025 02:45:06