Source code for tests.test_node_proactive

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# License: MIT
#

from __future__ import absolute_import, division, print_function

import time

"""
Testing the node

This module can however be run directly from a python virtual env
where pyros_setup and pyros_utils have been installed and configured
pyros_setup will take care of setting up this process environment for ROS.
"""

##############################################################################
# Imports
##############################################################################

import sys
import unittest
import tempfile

# Doing ROS setup from code if required
try:
    import rospy
except ImportError:
    import pyros_setup
    pyros_setup.configurable_import().configure().activate()
    import rospy

import rospy
import rostest
import roslaunch
import ros1_template_msgs.msg as ros1_template_msgs
import ros1_template_msgs.srv as ros1_template_srvs

from pyros_utils import rostest_nose


##############################################################################
# Test Class
##############################################################################

# Note: Here we want to test the logger
# Not the terminal output, since this will depend on the top level launcher)

httpbin_proc = None


# This should have the same effect as the <name>.test file for rostest.
# Should be used only by nose ( or other python test tool )
[docs]def setup_module(): if not rostest_nose.is_rostest_enabled(): rostest_nose.rostest_nose_setup_module() # rostest does this for you launch = roslaunch.scriptapi.ROSLaunch() launch.start() # Same as <node pkg="ros1_pip_pytemplate" type="node.py" name="httpbin"> # <param name="base_url" value="http://httpbin.org"/> # </node> # Ref : http://docs.ros.org/indigo/api/roslaunch/html/index.html rospy.set_param('/httpbin_proactive/base_url', "http://httpbin.org") httpbin = roslaunch.core.Node('ros1_pip_pytemplate', 'node_proactive.py', name='httpbin_proactive') httpbin_proc = launch.launch(httpbin) assert httpbin_proc.is_alive()
[docs]def teardown_module(): if not rostest_nose.is_rostest_enabled(): rostest_nose.rostest_nose_teardown_module() if httpbin_proc and httpbin_proc.is_alive(): httpbin_proc.terminate()
[docs]class TestHttpbinPub(unittest.TestCase): @classmethod
[docs] def setUpClass(cls): # WE DO NEED A NODE for using topics rospy.init_node('test_httpbin') # CAREFUL : this should be done only once per PROCESS # Here we enforce TEST RUN 1<->1 MODULE 1<->1 PROCESS. ROStest style.
[docs] def test_httpbin_get_arg_echo(self): self.latest_args = None def get_cb(data): self.latest_args = {a.key: a.value for a in data.args} self.get_sub = rospy.Subscriber('/httpbin_proactive/get', ros1_template_msgs.Args, get_cb) while self.get_sub.get_num_connections() < 1: time.sleep(0.1) # No message at first (no param/arg set) print(self.latest_args) self.assertEqual(self.latest_args, None) # setting arguments from params rospy.set_param('/httpbin_proactive/args', {'arg1': 42}) # waiting for first message now = time.time() while time.time() - now < 10 and self.latest_args == None: time.sleep(0.1) print(self.latest_args) self.assertEqual(self.latest_args, {'arg1': '42'}) # changing argument from params rospy.set_param('/httpbin_proactive/args/arg2', 77) # waiting for argument echo now = time.time() while time.time() - now < 10 and self.latest_args == {'arg1': '42'}: time.sleep(0.1) print(self.latest_args) self.assertEqual(self.latest_args, {'arg1': '42', 'arg2': '77'})
if __name__ == '__main__': print("ARGV : {0}".format(sys.argv)) # Note : Tests should be able to run with nosetests, or rostest ( which will launch nosetest here ) rostest_nose.rostest_or_nose_main('ros1_pip_pytemplate', 'test_httpbin', TestHttpbinPub, sys.argv)