test_ConnectorListener.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # -*- Python -*-
00003 
00004 ##
00005 # @file test_ConnectorListener.py
00006 # @brief test for connector listener class
00007 # @date $Date: 2010/01/06 $
00008 # @author Shinji Kurihara
00009 #
00010 # Copyright (C) 2010
00011 #     Task-intelligence Research Group,
00012 #     Intelligent Systems Research Institute,
00013 #     National Institute of
00014 #         Advanced Industrial Science and Technology (AIST), Japan
00015 #     All rights reserved.
00016 #
00017 
00018 import sys
00019 sys.path.insert(1,"../")
00020 
00021 import unittest
00022 import OpenRTM_aist
00023 from ConnectorListener import *
00024 
00025 import RTC, RTC__POA
00026 import OpenRTM
00027 
00028 from omniORB import *
00029 from omniORB import any
00030 
00031 class DataListener(OpenRTM_aist.ConnectorDataListenerT):
00032         def __init__(self, name):
00033                 self._name = name
00034                 self._data = None
00035                 return
00036 
00037         def __del__(self):
00038                 return
00039 
00040         def __call__(self, info, cdrdata):
00041                 data = OpenRTM_aist.ConnectorDataListenerT.__call__(self, info, cdrdata, RTC.TimedLong(RTC.Time(0,0),0))
00042                 print "------------------------------"
00043                 print "Listener:       ", self._name
00044                 print "------------------------------"
00045                 self._data = data
00046                 return
00047 
00048         def get_data(self):
00049                 tmp = self._data
00050                 self._data = None
00051                 return tmp
00052 
00053 
00054 
00055 class Listener(OpenRTM_aist.ConnectorListener):
00056         def __init__(self, name):
00057                 self._name = name
00058                 return
00059 
00060         def __del__(self):
00061                 return
00062 
00063         def __call__(self, info):
00064                 print "------------------------------"
00065                 print "Listener:       ", self._name
00066                 print "------------------------------"
00067                 return
00068 
00069 
00070 def typeToStringDataListener(type):
00071         if type == OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_WRITE:
00072                 return "ON_BUFFER_WRITE"
00073 
00074         elif type == OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_FULL:
00075                 return "ON_BUFFER_FULL"
00076 
00077         elif type == OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_WRITE_TIMEOUT:
00078                 return "ON_BUFFER_WRITE_TIMEOUT"
00079 
00080         elif type == OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_OVERWRITE:
00081                 return "ON_BUFFER_OVERWRITE"
00082 
00083         elif type == OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_READ:
00084                 return "ON_BUFFER_READ"
00085 
00086         elif type == OpenRTM_aist.ConnectorDataListenerType.ON_SEND:
00087                 return "ON_SEND"
00088 
00089         elif type == OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVED:
00090                 return "ON_RECEIVED"
00091 
00092         elif type == OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVER_FULL:
00093                 return "ON_RECEIVER_FULL"
00094 
00095         elif type == OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVER_TIMEOUT:
00096                 return "ON_RECEIVER_TIMEOUT"
00097 
00098         elif type == OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVER_ERROR:
00099                 return "ON_RECEIVER_ERROR"
00100 
00101         else:
00102                 return "UNKNOWN"
00103 
00104                 
00105 def typeToStringListener(type):
00106         if type == OpenRTM_aist.ConnectorListenerType.ON_BUFFER_EMPTY:
00107                 return "ON_BUFFER_EMPTY"
00108 
00109         elif type == OpenRTM_aist.ConnectorListenerType.ON_BUFFER_READ_TIMEOUT:
00110                 return "ON_BUFFER_READ_TIMEOUT"
00111 
00112         elif type == OpenRTM_aist.ConnectorListenerType.ON_SENDER_EMPTY:
00113                 return "ON_SENDER_EMPTY"
00114 
00115         elif type == OpenRTM_aist.ConnectorListenerType.ON_SENDER_TIMEOUT:
00116                 return "ON_SENDER_TIMEOUT"
00117 
00118         elif type == OpenRTM_aist.ConnectorListenerType.ON_SENDER_ERROR:
00119                 return "ON_SENDER_ERROR"
00120 
00121         else:
00122                 return "UNKNOWN"
00123 
00124                 
00125 class TestConnectorListener(unittest.TestCase):
00126         def setUp(self):
00127                 self._connectorListeners = ConnectorListeners()
00128                 self._info      = OpenRTM_aist.ConnectorInfo("name",
00129                                                              "id",
00130                                                              [],
00131                                                              OpenRTM_aist.Properties())
00132 
00133                 self._datalisteners = []
00134                 self._listeners = []
00135 
00136                 for i in range(OpenRTM_aist.ConnectorDataListenerType.CONNECTOR_DATA_LISTENER_NUM):
00137                         self._datalisteners.append(DataListener(typeToStringDataListener(i)))
00138 
00139                 for i in range(OpenRTM_aist.ConnectorListenerType.CONNECTOR_LISTENER_NUM):
00140                         self._listeners.append(Listener(typeToStringListener(i)))
00141 
00142                 return
00143 
00144 
00145         def test_ConnectorDataListenerT(self):
00146                 # add DataListener.
00147                 for i in range(OpenRTM_aist.ConnectorDataListenerType.CONNECTOR_DATA_LISTENER_NUM):
00148                         self._connectorListeners.connectorData_[i].addListener(self._datalisteners[i],True)
00149 
00150                 # test for ConnectorDataListenerT.__call__() with little endian data.
00151                 for i in range(OpenRTM_aist.ConnectorDataListenerType.CONNECTOR_DATA_LISTENER_NUM):
00152                         data = RTC.TimedLong(RTC.Time(0,0),i)
00153                         cdr_data = cdrMarshal(any.to_any(data).typecode(), data, True) # little endian
00154                         self._connectorListeners.connectorData_[i].notify(self._info, cdr_data)
00155                         self.assertEqual(self._datalisteners[i].get_data().data, i)
00156                         
00157 
00158                 # test for ConnectorDataListenerT.__call__() with big endian data.
00159                 info = OpenRTM_aist.ConnectorInfo("name",
00160                                                   "id",
00161                                                   [],
00162                                                   OpenRTM_aist.Properties())
00163                 info.properties.setProperty("serializer.cdr.endian","big")
00164 
00165                 for i in range(OpenRTM_aist.ConnectorDataListenerType.CONNECTOR_DATA_LISTENER_NUM):
00166                         data = RTC.TimedLong(RTC.Time(0,0),i)
00167                         cdr_data = cdrMarshal(any.to_any(data).typecode(), data, False) # big endian
00168                         self._connectorListeners.connectorData_[i].notify(info, cdr_data)
00169                         self.assertEqual(self._datalisteners[i].get_data().data, i)
00170                         
00171 
00172                 # remove DataListener.
00173                 for i in range(OpenRTM_aist.ConnectorDataListenerType.CONNECTOR_DATA_LISTENER_NUM):
00174                         self._connectorListeners.connectorData_[i].removeListener(self._datalisteners[i])
00175 
00176                 for i in range(OpenRTM_aist.ConnectorDataListenerType.CONNECTOR_DATA_LISTENER_NUM):
00177                         data = RTC.TimedLong(RTC.Time(0,0),i)
00178                         cdr_data = cdrMarshal(any.to_any(data).typecode(), data, True)
00179                         self._connectorListeners.connectorData_[i].notify(self._info, cdr_data)
00180                         # get_data() return None, because removeListener was called.
00181                         self.assertEqual(self._datalisteners[i].get_data(), None)
00182                         
00183                 return
00184 
00185 
00186         def test_ConnectorListener(self):
00187                 # add Listener.
00188                 for i in range(OpenRTM_aist.ConnectorListenerType.CONNECTOR_LISTENER_NUM):
00189                         self._connectorListeners.connector_[i].addListener(self._listeners[i],True)
00190 
00191                 # test for ConnectorListener.__call__()
00192                 for i in range(OpenRTM_aist.ConnectorListenerType.CONNECTOR_LISTENER_NUM):
00193                         self._connectorListeners.connector_[i].notify(self._info)
00194                         
00195                 # remove Listener.
00196                 for i in range(OpenRTM_aist.ConnectorListenerType.CONNECTOR_LISTENER_NUM):
00197                         self._connectorListeners.connector_[i].removeListener(self._listeners[i])
00198 
00199                 for i in range(OpenRTM_aist.ConnectorListenerType.CONNECTOR_LISTENER_NUM):
00200                         self._connectorListeners.connector_[i].notify(self._info)
00201                         
00202                 return
00203 
00204 
00205                 
00206 
00207 
00208 ############### test #################
00209 if __name__ == '__main__':
00210         unittest.main()
00211 


openrtm_aist_python
Author(s): Shinji Kurihara
autogenerated on Thu Aug 27 2015 14:17:28