test_subscription.py
Go to the documentation of this file.
00001 #! /usr/bin/env python
00002 import roslib; roslib.load_manifest('openni_camera')
00003 
00004 import rospy
00005 from sensor_msgs.msg import Image
00006 import os
00007 import subprocess
00008 import shutil
00009 import shlex
00010 import tempfile
00011 import signal
00012 import time
00013 
00014 def callback(data, args):
00015     args[0] = True
00016 
00017 rospy.init_node('test_sub', log_level=rospy.DEBUG)
00018 
00019 attempt = 0
00020 successes = 0
00021 failures = 0
00022 consecutive_failures = 0
00023 while not rospy.is_shutdown() and consecutive_failures < 5:
00024     # Bind received flag and use new topic each time to be very sure results
00025     # aren't affected by tardy callbacks
00026     received = [False]
00027     topic = "image_%d" % attempt
00028     attempt = attempt + 1
00029     sub = rospy.Subscriber(topic, Image, callback, callback_args=received)
00030 
00031     command = "rosrun openni_camera openni_node_unstable depth/image_raw:=%s" % topic
00032     tmp = tempfile.TemporaryFile(mode='r+')
00033     #tmp = open(os.devnull, 'w')
00034     p = subprocess.Popen(shlex.split(command), stdout=tmp, stderr=subprocess.STDOUT)
00035     start = time.time()
00036     end = start + 10.0
00037     while (not rospy.is_shutdown()) and (time.time() < end) and (not received[0]):
00038         time.sleep(0.1)
00039     duration = time.time() - start
00040 
00041     sub.unregister()
00042     p.send_signal(signal.SIGINT) # Try to stop normally
00043     end = time.time() + 5.0
00044     p.poll()
00045     while p.returncode is None and time.time() < end:
00046         time.sleep(0.1)
00047         p.poll()
00048     if p.returncode is None:
00049         print("Escalating to SIGTERM!")
00050         p.terminate()
00051         end = time.time() + 5.0
00052         p.poll()
00053         while p.returncode is None and time.time() < end:
00054             time.sleep(0.1)
00055             p.poll()
00056         if p.returncode is None:
00057             print("Escalating to SIGKILL!!")
00058             p.kill()
00059             p.wait()
00060 
00061     if not rospy.is_shutdown():
00062         if received[0]:
00063             rospy.loginfo("Succeeded in %.1fs", duration)
00064             successes = successes + 1
00065             consecutive_failures = 0
00066             status = "success"
00067         else:
00068             rospy.logerr("Failed!")
00069             failures = failures + 1
00070             consecutive_failures = consecutive_failures + 1
00071             status = "fail"
00072         # Write out log
00073         with open("%s_%05d.log" % (status, attempt), 'w') as f:
00074             tmp.seek(0)
00075             shutil.copyfileobj(tmp, f)
00076         # Move core dump if present
00077         try:
00078             core_name = "core_%d" % attempt
00079             os.rename("core", core_name)
00080             print("Core dump %s" % core_name)
00081         except OSError:
00082             pass
00083 
00084 print("Successes: %d" % successes)
00085 print("Failures: %d" % failures)


openni_camera
Author(s): Patrick Mihelich, Suat Gedikli, Radu Bogdan Rusu
autogenerated on Thu Jun 6 2019 20:16:13