test_subscription.py
Go to the documentation of this file.
1 #! /usr/bin/env python
2 import roslib; roslib.load_manifest('openni_camera')
3 
4 import rospy
5 from sensor_msgs.msg import Image
6 import os
7 import subprocess
8 import shutil
9 import shlex
10 import tempfile
11 import signal
12 import time
13 
14 def callback(data, args):
15  args[0] = True
16 
17 rospy.init_node('test_sub', log_level=rospy.DEBUG)
18 
19 attempt = 0
20 successes = 0
21 failures = 0
22 consecutive_failures = 0
23 while not rospy.is_shutdown() and consecutive_failures < 5:
24  # Bind received flag and use new topic each time to be very sure results
25  # aren't affected by tardy callbacks
26  received = [False]
27  topic = "image_%d" % attempt
28  attempt = attempt + 1
29  sub = rospy.Subscriber(topic, Image, callback, callback_args=received)
30 
31  command = "rosrun openni_camera openni_node_unstable depth/image_raw:=%s" % topic
32  tmp = tempfile.TemporaryFile(mode='r+')
33  #tmp = open(os.devnull, 'w')
34  p = subprocess.Popen(shlex.split(command), stdout=tmp, stderr=subprocess.STDOUT)
35  start = time.time()
36  end = start + 10.0
37  while (not rospy.is_shutdown()) and (time.time() < end) and (not received[0]):
38  time.sleep(0.1)
39  duration = time.time() - start
40 
41  sub.unregister()
42  p.send_signal(signal.SIGINT) # Try to stop normally
43  end = time.time() + 5.0
44  p.poll()
45  while p.returncode is None and time.time() < end:
46  time.sleep(0.1)
47  p.poll()
48  if p.returncode is None:
49  print("Escalating to SIGTERM!")
50  p.terminate()
51  end = time.time() + 5.0
52  p.poll()
53  while p.returncode is None and time.time() < end:
54  time.sleep(0.1)
55  p.poll()
56  if p.returncode is None:
57  print("Escalating to SIGKILL!!")
58  p.kill()
59  p.wait()
60 
61  if not rospy.is_shutdown():
62  if received[0]:
63  rospy.loginfo("Succeeded in %.1fs", duration)
64  successes = successes + 1
65  consecutive_failures = 0
66  status = "success"
67  else:
68  rospy.logerr("Failed!")
69  failures = failures + 1
70  consecutive_failures = consecutive_failures + 1
71  status = "fail"
72  # Write out log
73  with open("%s_%05d.log" % (status, attempt), 'w') as f:
74  tmp.seek(0)
75  shutil.copyfileobj(tmp, f)
76  # Move core dump if present
77  try:
78  core_name = "core_%d" % attempt
79  os.rename("core", core_name)
80  print("Core dump %s" % core_name)
81  except OSError:
82  pass
83 
84 print("Successes: %d" % successes)
85 print("Failures: %d" % failures)
test_subscription.callback
def callback(data, args)
Definition: test_subscription.py:14


openni_camera
Author(s): Patrick Mihelich, Suat Gedikli, Radu Bogdan Rusu
autogenerated on Thu Apr 21 2022 02:37:23