test_ConnectorListener.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # -*- Python -*-
3 
4 
17 
18 import sys
19 sys.path.insert(1,"../")
20 
21 import unittest
22 import OpenRTM_aist
23 from ConnectorListener import *
24 
25 import RTC, RTC__POA
26 import OpenRTM
27 
28 from omniORB import *
29 from omniORB import any
30 
32  def __init__(self, name):
33  self._name = name
34  self._data = None
35  return
36 
37  def __del__(self):
38  return
39 
40  def __call__(self, info, cdrdata):
41  data = OpenRTM_aist.ConnectorDataListenerT.__call__(self, info, cdrdata, RTC.TimedLong(RTC.Time(0,0),0))
42  print "------------------------------"
43  print "Listener: ", self._name
44  print "------------------------------"
45  self._data = data
46  return
47 
48  def get_data(self):
49  tmp = self._data
50  self._data = None
51  return tmp
52 
53 
54 
56  def __init__(self, name):
57  self._name = name
58  return
59 
60  def __del__(self):
61  return
62 
63  def __call__(self, info):
64  print "------------------------------"
65  print "Listener: ", self._name
66  print "------------------------------"
67  return
68 
69 
71  if type == OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_WRITE:
72  return "ON_BUFFER_WRITE"
73 
74  elif type == OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_FULL:
75  return "ON_BUFFER_FULL"
76 
77  elif type == OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_WRITE_TIMEOUT:
78  return "ON_BUFFER_WRITE_TIMEOUT"
79 
80  elif type == OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_OVERWRITE:
81  return "ON_BUFFER_OVERWRITE"
82 
83  elif type == OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_READ:
84  return "ON_BUFFER_READ"
85 
86  elif type == OpenRTM_aist.ConnectorDataListenerType.ON_SEND:
87  return "ON_SEND"
88 
89  elif type == OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVED:
90  return "ON_RECEIVED"
91 
92  elif type == OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVER_FULL:
93  return "ON_RECEIVER_FULL"
94 
95  elif type == OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVER_TIMEOUT:
96  return "ON_RECEIVER_TIMEOUT"
97 
98  elif type == OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVER_ERROR:
99  return "ON_RECEIVER_ERROR"
100 
101  else:
102  return "UNKNOWN"
103 
104 
106  if type == OpenRTM_aist.ConnectorListenerType.ON_BUFFER_EMPTY:
107  return "ON_BUFFER_EMPTY"
108 
109  elif type == OpenRTM_aist.ConnectorListenerType.ON_BUFFER_READ_TIMEOUT:
110  return "ON_BUFFER_READ_TIMEOUT"
111 
112  elif type == OpenRTM_aist.ConnectorListenerType.ON_SENDER_EMPTY:
113  return "ON_SENDER_EMPTY"
114 
115  elif type == OpenRTM_aist.ConnectorListenerType.ON_SENDER_TIMEOUT:
116  return "ON_SENDER_TIMEOUT"
117 
118  elif type == OpenRTM_aist.ConnectorListenerType.ON_SENDER_ERROR:
119  return "ON_SENDER_ERROR"
120 
121  else:
122  return "UNKNOWN"
123 
124 
125 class TestConnectorListener(unittest.TestCase):
126  def setUp(self):
127  self._connectorListeners = ConnectorListeners()
129  "id",
130  [],
132 
133  self._datalisteners = []
134  self._listeners = []
135 
136  for i in range(OpenRTM_aist.ConnectorDataListenerType.CONNECTOR_DATA_LISTENER_NUM):
137  self._datalisteners.append(DataListener(typeToStringDataListener(i)))
138 
139  for i in range(OpenRTM_aist.ConnectorListenerType.CONNECTOR_LISTENER_NUM):
140  self._listeners.append(Listener(typeToStringListener(i)))
141 
142  return
143 
144 
146  # add DataListener.
147  for i in range(OpenRTM_aist.ConnectorDataListenerType.CONNECTOR_DATA_LISTENER_NUM):
148  self._connectorListeners.connectorData_[i].addListener(self._datalisteners[i],True)
149 
150  # test for ConnectorDataListenerT.__call__() with little endian data.
151  for i in range(OpenRTM_aist.ConnectorDataListenerType.CONNECTOR_DATA_LISTENER_NUM):
152  data = RTC.TimedLong(RTC.Time(0,0),i)
153  cdr_data = cdrMarshal(any.to_any(data).typecode(), data, True) # little endian
154  self._connectorListeners.connectorData_[i].notify(self._info, cdr_data)
155  self.assertEqual(self._datalisteners[i].get_data().data, i)
156 
157 
158  # test for ConnectorDataListenerT.__call__() with big endian data.
159  info = OpenRTM_aist.ConnectorInfo("name",
160  "id",
161  [],
163  info.properties.setProperty("serializer.cdr.endian","big")
164 
165  for i in range(OpenRTM_aist.ConnectorDataListenerType.CONNECTOR_DATA_LISTENER_NUM):
166  data = RTC.TimedLong(RTC.Time(0,0),i)
167  cdr_data = cdrMarshal(any.to_any(data).typecode(), data, False) # big endian
168  self._connectorListeners.connectorData_[i].notify(info, cdr_data)
169  self.assertEqual(self._datalisteners[i].get_data().data, i)
170 
171 
172  # remove DataListener.
173  for i in range(OpenRTM_aist.ConnectorDataListenerType.CONNECTOR_DATA_LISTENER_NUM):
174  self._connectorListeners.connectorData_[i].removeListener(self._datalisteners[i])
175 
176  for i in range(OpenRTM_aist.ConnectorDataListenerType.CONNECTOR_DATA_LISTENER_NUM):
177  data = RTC.TimedLong(RTC.Time(0,0),i)
178  cdr_data = cdrMarshal(any.to_any(data).typecode(), data, True)
179  self._connectorListeners.connectorData_[i].notify(self._info, cdr_data)
180  # get_data() return None, because removeListener was called.
181  self.assertEqual(self._datalisteners[i].get_data(), None)
182 
183  return
184 
185 
187  # add Listener.
188  for i in range(OpenRTM_aist.ConnectorListenerType.CONNECTOR_LISTENER_NUM):
189  self._connectorListeners.connector_[i].addListener(self._listeners[i],True)
190 
191  # test for ConnectorListener.__call__()
192  for i in range(OpenRTM_aist.ConnectorListenerType.CONNECTOR_LISTENER_NUM):
193  self._connectorListeners.connector_[i].notify(self._info)
194 
195  # remove Listener.
196  for i in range(OpenRTM_aist.ConnectorListenerType.CONNECTOR_LISTENER_NUM):
197  self._connectorListeners.connector_[i].removeListener(self._listeners[i])
198 
199  for i in range(OpenRTM_aist.ConnectorListenerType.CONNECTOR_LISTENER_NUM):
200  self._connectorListeners.connector_[i].notify(self._info)
201 
202  return
203 
204 
205 
206 
207 
208 ############### test #################
209 if __name__ == '__main__':
210  unittest.main()
211 
The Properties class represents a persistent set of properties.
Definition: Properties.py:83
This class is abstract base class for listener classes that provides callbacks for various events in ...
This class is abstract base class for listener classes that provides callbacks for various events in ...


openrtm_aist_python
Author(s): Shinji Kurihara
autogenerated on Thu Jun 6 2019 19:11:34