test_ConnectorListener.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # -*- Python -*-
3 
4 
17 
18 import sys
19 sys.path.insert(1,"../")
20 
21 import unittest
22 import OpenRTM_aist
23 from ConnectorListener import *
24 
25 import RTC, RTC__POA
26 import OpenRTM
27 
28 from omniORB import *
29 from omniORB import any
30 
32  def __init__(self, name):
33  self._name = name
34  self._data = None
35  return
36 
37  def __del__(self):
38  return
39 
40  def __call__(self, info, cdrdata):
41  data = OpenRTM_aist.ConnectorDataListenerT.__call__(self, info, cdrdata, RTC.TimedLong(RTC.Time(0,0),0))
42  print "------------------------------"
43  print "Listener: ", self._name
44  print "------------------------------"
45  self._data = data
46  return
47 
48  def get_data(self):
49  tmp = self._data
50  self._data = None
51  return tmp
52 
53 
54 
56  def __init__(self, name):
57  self._name = name
58  return
59 
60  def __del__(self):
61  return
62 
63  def __call__(self, info):
64  print "------------------------------"
65  print "Listener: ", self._name
66  print "------------------------------"
67  return
68 
69 
71  if type == OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_WRITE:
72  return "ON_BUFFER_WRITE"
73 
74  elif type == OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_FULL:
75  return "ON_BUFFER_FULL"
76 
77  elif type == OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_WRITE_TIMEOUT:
78  return "ON_BUFFER_WRITE_TIMEOUT"
79 
80  elif type == OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_OVERWRITE:
81  return "ON_BUFFER_OVERWRITE"
82 
83  elif type == OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_READ:
84  return "ON_BUFFER_READ"
85 
86  elif type == OpenRTM_aist.ConnectorDataListenerType.ON_SEND:
87  return "ON_SEND"
88 
89  elif type == OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVED:
90  return "ON_RECEIVED"
91 
92  elif type == OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVER_FULL:
93  return "ON_RECEIVER_FULL"
94 
95  elif type == OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVER_TIMEOUT:
96  return "ON_RECEIVER_TIMEOUT"
97 
98  elif type == OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVER_ERROR:
99  return "ON_RECEIVER_ERROR"
100 
101  else:
102  return "UNKNOWN"
103 
104 
106  if type == OpenRTM_aist.ConnectorListenerType.ON_BUFFER_EMPTY:
107  return "ON_BUFFER_EMPTY"
108 
109  elif type == OpenRTM_aist.ConnectorListenerType.ON_BUFFER_READ_TIMEOUT:
110  return "ON_BUFFER_READ_TIMEOUT"
111 
112  elif type == OpenRTM_aist.ConnectorListenerType.ON_SENDER_EMPTY:
113  return "ON_SENDER_EMPTY"
114 
115  elif type == OpenRTM_aist.ConnectorListenerType.ON_SENDER_TIMEOUT:
116  return "ON_SENDER_TIMEOUT"
117 
118  elif type == OpenRTM_aist.ConnectorListenerType.ON_SENDER_ERROR:
119  return "ON_SENDER_ERROR"
120 
121  else:
122  return "UNKNOWN"
123 
124 
125 class TestConnectorListener(unittest.TestCase):
126  def setUp(self):
127  self._connectorListeners = ConnectorListeners()
129  "id",
130  [],
132 
133  self._datalisteners = []
134  self._listeners = []
135 
136  for i in range(OpenRTM_aist.ConnectorDataListenerType.CONNECTOR_DATA_LISTENER_NUM):
138 
139  for i in range(OpenRTM_aist.ConnectorListenerType.CONNECTOR_LISTENER_NUM):
141 
142  return
143 
144 
146  # add DataListener.
147  for i in range(OpenRTM_aist.ConnectorDataListenerType.CONNECTOR_DATA_LISTENER_NUM):
148  self._connectorListeners.connectorData_[i].addListener(self._datalisteners[i],True)
149 
150  # test for ConnectorDataListenerT.__call__() with little endian data.
151  for i in range(OpenRTM_aist.ConnectorDataListenerType.CONNECTOR_DATA_LISTENER_NUM):
152  data = RTC.TimedLong(RTC.Time(0,0),i)
153  cdr_data = cdrMarshal(any.to_any(data).typecode(), data, True) # little endian
154  self._connectorListeners.connectorData_[i].notify(self._info, cdr_data)
155  self.assertEqual(self._datalisteners[i].get_data().data, i)
156 
157 
158  # test for ConnectorDataListenerT.__call__() with big endian data.
159  info = OpenRTM_aist.ConnectorInfo("name",
160  "id",
161  [],
163  info.properties.setProperty("serializer.cdr.endian","big")
164 
165  for i in range(OpenRTM_aist.ConnectorDataListenerType.CONNECTOR_DATA_LISTENER_NUM):
166  data = RTC.TimedLong(RTC.Time(0,0),i)
167  cdr_data = cdrMarshal(any.to_any(data).typecode(), data, False) # big endian
168  self._connectorListeners.connectorData_[i].notify(info, cdr_data)
169  self.assertEqual(self._datalisteners[i].get_data().data, i)
170 
171 
172  # remove DataListener.
173  for i in range(OpenRTM_aist.ConnectorDataListenerType.CONNECTOR_DATA_LISTENER_NUM):
174  self._connectorListeners.connectorData_[i].removeListener(self._datalisteners[i])
175 
176  for i in range(OpenRTM_aist.ConnectorDataListenerType.CONNECTOR_DATA_LISTENER_NUM):
177  data = RTC.TimedLong(RTC.Time(0,0),i)
178  cdr_data = cdrMarshal(any.to_any(data).typecode(), data, True)
179  self._connectorListeners.connectorData_[i].notify(self._info, cdr_data)
180  # get_data() return None, because removeListener was called.
181  self.assertEqual(self._datalisteners[i].get_data(), None)
182 
183  return
184 
185 
187  # add Listener.
188  for i in range(OpenRTM_aist.ConnectorListenerType.CONNECTOR_LISTENER_NUM):
189  self._connectorListeners.connector_[i].addListener(self._listeners[i],True)
190 
191  # test for ConnectorListener.__call__()
192  for i in range(OpenRTM_aist.ConnectorListenerType.CONNECTOR_LISTENER_NUM):
193  self._connectorListeners.connector_[i].notify(self._info)
194 
195  # remove Listener.
196  for i in range(OpenRTM_aist.ConnectorListenerType.CONNECTOR_LISTENER_NUM):
197  self._connectorListeners.connector_[i].removeListener(self._listeners[i])
198 
199  for i in range(OpenRTM_aist.ConnectorListenerType.CONNECTOR_LISTENER_NUM):
200  self._connectorListeners.connector_[i].notify(self._info)
201 
202  return
203 
204 
205 
206 
207 
208 
209 if __name__ == '__main__':
210  unittest.main()
211 
The Properties class represents a persistent set of properties.
Definition: Properties.py:83
This class is abstract base class for listener classes that provides callbacks for various events in ...
def append(dest, src)
Definition: NVUtil.py:386
This class is abstract base class for listener classes that provides callbacks for various events in ...


openrtm_aist_python
Author(s): Shinji Kurihara
autogenerated on Mon Feb 28 2022 23:01:06