Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 import sys
00019 sys.path.insert(1,"../")
00020
00021 import unittest
00022 import OpenRTM_aist
00023 from ConnectorListener import *
00024
00025 import RTC, RTC__POA
00026 import OpenRTM
00027
00028 from omniORB import *
00029 from omniORB import any
00030
00031 class DataListener(OpenRTM_aist.ConnectorDataListenerT):
00032 def __init__(self, name):
00033 self._name = name
00034 self._data = None
00035 return
00036
00037 def __del__(self):
00038 return
00039
00040 def __call__(self, info, cdrdata):
00041 data = OpenRTM_aist.ConnectorDataListenerT.__call__(self, info, cdrdata, RTC.TimedLong(RTC.Time(0,0),0))
00042 print "------------------------------"
00043 print "Listener: ", self._name
00044 print "------------------------------"
00045 self._data = data
00046 return
00047
00048 def get_data(self):
00049 tmp = self._data
00050 self._data = None
00051 return tmp
00052
00053
00054
00055 class Listener(OpenRTM_aist.ConnectorListener):
00056 def __init__(self, name):
00057 self._name = name
00058 return
00059
00060 def __del__(self):
00061 return
00062
00063 def __call__(self, info):
00064 print "------------------------------"
00065 print "Listener: ", self._name
00066 print "------------------------------"
00067 return
00068
00069
00070 def typeToStringDataListener(type):
00071 if type == OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_WRITE:
00072 return "ON_BUFFER_WRITE"
00073
00074 elif type == OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_FULL:
00075 return "ON_BUFFER_FULL"
00076
00077 elif type == OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_WRITE_TIMEOUT:
00078 return "ON_BUFFER_WRITE_TIMEOUT"
00079
00080 elif type == OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_OVERWRITE:
00081 return "ON_BUFFER_OVERWRITE"
00082
00083 elif type == OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_READ:
00084 return "ON_BUFFER_READ"
00085
00086 elif type == OpenRTM_aist.ConnectorDataListenerType.ON_SEND:
00087 return "ON_SEND"
00088
00089 elif type == OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVED:
00090 return "ON_RECEIVED"
00091
00092 elif type == OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVER_FULL:
00093 return "ON_RECEIVER_FULL"
00094
00095 elif type == OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVER_TIMEOUT:
00096 return "ON_RECEIVER_TIMEOUT"
00097
00098 elif type == OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVER_ERROR:
00099 return "ON_RECEIVER_ERROR"
00100
00101 else:
00102 return "UNKNOWN"
00103
00104
00105 def typeToStringListener(type):
00106 if type == OpenRTM_aist.ConnectorListenerType.ON_BUFFER_EMPTY:
00107 return "ON_BUFFER_EMPTY"
00108
00109 elif type == OpenRTM_aist.ConnectorListenerType.ON_BUFFER_READ_TIMEOUT:
00110 return "ON_BUFFER_READ_TIMEOUT"
00111
00112 elif type == OpenRTM_aist.ConnectorListenerType.ON_SENDER_EMPTY:
00113 return "ON_SENDER_EMPTY"
00114
00115 elif type == OpenRTM_aist.ConnectorListenerType.ON_SENDER_TIMEOUT:
00116 return "ON_SENDER_TIMEOUT"
00117
00118 elif type == OpenRTM_aist.ConnectorListenerType.ON_SENDER_ERROR:
00119 return "ON_SENDER_ERROR"
00120
00121 else:
00122 return "UNKNOWN"
00123
00124
00125 class TestConnectorListener(unittest.TestCase):
00126 def setUp(self):
00127 self._connectorListeners = ConnectorListeners()
00128 self._info = OpenRTM_aist.ConnectorInfo("name",
00129 "id",
00130 [],
00131 OpenRTM_aist.Properties())
00132
00133 self._datalisteners = []
00134 self._listeners = []
00135
00136 for i in range(OpenRTM_aist.ConnectorDataListenerType.CONNECTOR_DATA_LISTENER_NUM):
00137 self._datalisteners.append(DataListener(typeToStringDataListener(i)))
00138
00139 for i in range(OpenRTM_aist.ConnectorListenerType.CONNECTOR_LISTENER_NUM):
00140 self._listeners.append(Listener(typeToStringListener(i)))
00141
00142 return
00143
00144
00145 def test_ConnectorDataListenerT(self):
00146
00147 for i in range(OpenRTM_aist.ConnectorDataListenerType.CONNECTOR_DATA_LISTENER_NUM):
00148 self._connectorListeners.connectorData_[i].addListener(self._datalisteners[i],True)
00149
00150
00151 for i in range(OpenRTM_aist.ConnectorDataListenerType.CONNECTOR_DATA_LISTENER_NUM):
00152 data = RTC.TimedLong(RTC.Time(0,0),i)
00153 cdr_data = cdrMarshal(any.to_any(data).typecode(), data, True)
00154 self._connectorListeners.connectorData_[i].notify(self._info, cdr_data)
00155 self.assertEqual(self._datalisteners[i].get_data().data, i)
00156
00157
00158
00159 info = OpenRTM_aist.ConnectorInfo("name",
00160 "id",
00161 [],
00162 OpenRTM_aist.Properties())
00163 info.properties.setProperty("serializer.cdr.endian","big")
00164
00165 for i in range(OpenRTM_aist.ConnectorDataListenerType.CONNECTOR_DATA_LISTENER_NUM):
00166 data = RTC.TimedLong(RTC.Time(0,0),i)
00167 cdr_data = cdrMarshal(any.to_any(data).typecode(), data, False)
00168 self._connectorListeners.connectorData_[i].notify(info, cdr_data)
00169 self.assertEqual(self._datalisteners[i].get_data().data, i)
00170
00171
00172
00173 for i in range(OpenRTM_aist.ConnectorDataListenerType.CONNECTOR_DATA_LISTENER_NUM):
00174 self._connectorListeners.connectorData_[i].removeListener(self._datalisteners[i])
00175
00176 for i in range(OpenRTM_aist.ConnectorDataListenerType.CONNECTOR_DATA_LISTENER_NUM):
00177 data = RTC.TimedLong(RTC.Time(0,0),i)
00178 cdr_data = cdrMarshal(any.to_any(data).typecode(), data, True)
00179 self._connectorListeners.connectorData_[i].notify(self._info, cdr_data)
00180
00181 self.assertEqual(self._datalisteners[i].get_data(), None)
00182
00183 return
00184
00185
00186 def test_ConnectorListener(self):
00187
00188 for i in range(OpenRTM_aist.ConnectorListenerType.CONNECTOR_LISTENER_NUM):
00189 self._connectorListeners.connector_[i].addListener(self._listeners[i],True)
00190
00191
00192 for i in range(OpenRTM_aist.ConnectorListenerType.CONNECTOR_LISTENER_NUM):
00193 self._connectorListeners.connector_[i].notify(self._info)
00194
00195
00196 for i in range(OpenRTM_aist.ConnectorListenerType.CONNECTOR_LISTENER_NUM):
00197 self._connectorListeners.connector_[i].removeListener(self._listeners[i])
00198
00199 for i in range(OpenRTM_aist.ConnectorListenerType.CONNECTOR_LISTENER_NUM):
00200 self._connectorListeners.connector_[i].notify(self._info)
00201
00202 return
00203
00204
00205
00206
00207
00208
00209 if __name__ == '__main__':
00210 unittest.main()
00211