ros_masters.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 #
3 # License: BSD
4 # https://raw.github.com/stonier/rqt_wrapper/license/LICENSE
5 #
6 ##############################################################################
7 # Imports
8 ##############################################################################
9 
10 import os
11 import rocon_console.console as console
12 import rosgraph
13 import socket
14 import threading
15 import time
16 import uuid
17 
18 
19 ##############################################################################
20 # Methods
21 ##############################################################################
22 
23 
24 def check_for_master(ros_master_uri=None):
25  '''
26  Simple check for a ros master's availability without having to initialise
27  this process as a ros node.
28 
29  :param str ros_master_uri: accepts a url for the ros master, else resorts to looking up the env variable.
30  :return pid of the master or None
31  '''
32  if ros_master_uri is None:
33  try:
34  ros_master_uri = os.environ["ROS_MASTER_URI"]
35  except KeyError:
36  print(console.red + "[ERROR] ROS_MASTER_URI not set....do try to give us something to work on!" + console.reset)
37  return False
38  caller_id = "/find_ros_master" + "_" + uuid.uuid4().hex
39  rosgraph_master = rosgraph.Master(caller_id)
40  try:
41  return rosgraph_master.getPid()
42  except socket.error:
43  return None
44 
45 ##############################################################################
46 # Classes
47 ##############################################################################
48 
49 
50 class RosMasterMonitor(object):
51  '''
52  Spins up a loop which continuously checks for the availability of
53  the ros master.
54  '''
55  def __init__(self, period=5, callback_function=None, ros_master_uri=None):
56  '''
57  The callback should have the signature:
58 
59  void callback_function(bool new_master_found)
60 
61  :param int period: loop time for checking in seconds.
62  :param func callback_function: execute user code if the state changed
63  :param str ros_msater_uri: the url of the ros master to lookup (if None, checks env).
64  '''
65  if ros_master_uri is None:
66  try:
67  self._ros_master_uri = os.environ["ROS_MASTER_URI"]
68  except KeyError:
69  print(console.red + "[ERROR] ROS_MASTER_URI not set....do try to give us something to work on!" + console.reset)
70  return
71  else:
72  self._ros_master_uri = ros_master_uri
73  self._sleep_period_seconds = period
74  self._callback_function = callback_function if callback_function is not None else self._default_callback_function
75  # initialise it
77  self._callback_function(self.is_available())
78  self._check_for_master_timer = threading.Timer(period, self._wait_for_master)
79  self._check_for_master_timer.start()
80 
81  def is_available(self):
82  return True if self.pid is not None else False
83 
84  def shutdown(self):
85  self._check_for_master_timer.cancel()
86 
87  def _default_callback_function(self, new_master_found):
88  pass
89 
90  def _wait_for_master(self):
91  new_pid = check_for_master(self._ros_master_uri)
92  if new_pid is None:
93  if self.pid is not None:
94  # TODO : check for a timeout here and use callback function with None
95  self.pid = new_pid
96  self._callback_function(self.is_available())
97  else:
98  if self.pid is None or self.pid != new_pid:
99  self.pid = new_pid
100  self._callback_function(self.is_available())
101  # trigger a new check
102  self._check_for_master_timer = threading.Timer(self._sleep_period_seconds, self._wait_for_master)
103  self._check_for_master_timer.start()
104 
105 
106 ##############################################################################
107 # Testies
108 ##############################################################################
109 
110 def _user_callback_test(available):
111  if available:
112  print(console.green + "Ros Master available at " + console.yellow + os.environ["ROS_MASTER_URI"] + console.green + "." + console.reset)
113  else:
114  print(console.red + "ROS Master unavailable" + console.reset)
115 
116 if __name__ == '__main__':
117  ros_master = RosMasterMonitor(period=1, callback_function=_user_callback_test)
118  while(True):
119  try:
120  time.sleep(1)
121  except KeyboardInterrupt:
122  ros_master.shutdown()
123  break
def _user_callback_test(available)
Definition: ros_masters.py:110
def __init__(self, period=5, callback_function=None, ros_master_uri=None)
Definition: ros_masters.py:55
def check_for_master(ros_master_uri=None)
Methods.
Definition: ros_masters.py:24
def _default_callback_function(self, new_master_found)
Definition: ros_masters.py:87


rqt_wrapper
Author(s): Daniel Stonier
autogenerated on Mon Jun 10 2019 14:59:48