test_lazy.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 ###############################################################################
00003 # Software License Agreement (BSD License)
00004 #
00005 #  Copyright (c) 2016, JSK Lab, University of Tokyo.
00006 #  All rights reserved.
00007 #
00008 #  Redistribution and use in source and binary forms, with or without
00009 #  modification, are permitted provided that the following conditions
00010 #  are met:
00011 #
00012 #   * Redistributions of source code must retain the above copyright
00013 #     notice, this list of conditions and the following disclaimer.
00014 #   * Redistributions in binary form must reproduce the above
00015 #     copyright notice, this list of conditions and the following
00016 #     disclaimer in the documentation and/o2r other materials provided
00017 #     with the distribution.
00018 #   * Neither the name of the JSK Lab nor the names of its
00019 #     contributors may be used to endorse or promote products derived
00020 #     from this software without specific prior written permission.
00021 #
00022 #  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00023 #  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00024 #  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00025 #  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00026 #  COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00027 #  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00028 #  BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00029 #  LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00030 #  CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00031 #  LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00032 #  ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00033 #  POSSIBILITY OF SUCH DAMAGE.
00034 ###############################################################################
00035 
00036 __author__ = 'Kentaro Wada <www.kentaro.wada@gmail.com>'
00037 
00038 import os
00039 import sys
00040 
00041 import unittest
00042 
00043 import rosgraph
00044 import rospy
00045 import rosmsg
00046 import roslib
00047 
00048 
00049 PKG = 'test_nodelet_topic_tools'
00050 NAME = 'test_lazy'
00051 
00052 
00053 class TestConnection(unittest.TestCase):
00054 
00055     def __init__(self, *args):
00056         super(TestConnection, self).__init__(*args)
00057         rospy.init_node(NAME)
00058         self.master = rosgraph.Master(NAME)
00059 
00060     def test_no_subscribers(self):
00061         check_connected_topics = rospy.get_param('~check_connected_topics')
00062         # Check assumed topics are not there
00063         _, subscriptions, _ = self.master.getSystemState()
00064         for check_topic in check_connected_topics:
00065             for topic, sub_node in subscriptions:
00066                 if topic == rospy.get_namespace() + check_topic:
00067                     raise ValueError('Found topic: {}'.format(check_topic))
00068 
00069     def test_subscriber_appears(self):
00070         topic_type = rospy.get_param('~input_topic_type')
00071         check_connected_topics = rospy.get_param('~check_connected_topics')
00072         wait_time = rospy.get_param('~wait_for_connection', 0)
00073         msg_class = roslib.message.get_message_class(topic_type)
00074         # Subscribe topic and bond connection
00075         sub = rospy.Subscriber('~input', msg_class,
00076                                self._cb_test_subscriber_appears)
00077         print('Waiting for connection for {} sec.'.format(wait_time))
00078         rospy.sleep(wait_time)
00079         # Check assumed topics are there
00080         _, subscriptions, _ = self.master.getSystemState()
00081         for check_topic in check_connected_topics:
00082             for topic, sub_node in subscriptions:
00083                 if topic == rospy.get_namespace() + check_topic:
00084                     break
00085             else:
00086                 raise ValueError('Not found topic: {}'.format(check_topic))
00087 
00088         sub.unregister()
00089         rospy.sleep(1)  # wait for disconnection
00090 
00091         # Check specified topics do not exist
00092         _, subscriptions, _ = self.master.getSystemState()
00093         for check_topic in check_connected_topics:
00094             for topic, sub_node in subscriptions:
00095                 if topic == rospy.get_namespace() + check_topic:
00096                     raise ValueError('Found topic: {}'.format(check_topic))
00097 
00098     def _cb_test_subscriber_appears(self, msg):
00099         pass
00100 
00101 
00102 if __name__ == "__main__":
00103     import rostest
00104     rostest.rosrun(PKG, NAME, TestConnection)


test_nodelet_topic_tools
Author(s): Radu Bogdan Rusu, Tully Foote
autogenerated on Sun Feb 17 2019 03:43:57