11 import rocon_console.console
as console
26 Simple check for a ros master's availability without having to initialise 27 this process as a ros node. 29 :param str ros_master_uri: accepts a url for the ros master, else resorts to looking up the env variable. 30 :return pid of the master or None 32 if ros_master_uri
is None:
34 ros_master_uri = os.environ[
"ROS_MASTER_URI"]
36 print(console.red +
"[ERROR] ROS_MASTER_URI not set....do try to give us something to work on!" + console.reset)
38 caller_id =
"/find_ros_master" +
"_" + uuid.uuid4().hex
39 rosgraph_master = rosgraph.Master(caller_id)
41 return rosgraph_master.getPid()
52 Spins up a loop which continuously checks for the availability of 55 def __init__(self, period=5, callback_function=None, ros_master_uri=None):
57 The callback should have the signature: 59 void callback_function(bool new_master_found) 61 :param int period: loop time for checking in seconds. 62 :param func callback_function: execute user code if the state changed 63 :param str ros_msater_uri: the url of the ros master to lookup (if None, checks env). 65 if ros_master_uri
is None:
69 print(console.red +
"[ERROR] ROS_MASTER_URI not set....do try to give us something to work on!" + console.reset)
79 self._check_for_master_timer.start()
82 return True if self.
pid is not None else False 85 self._check_for_master_timer.cancel()
93 if self.pid
is not None:
96 self._callback_function(self.is_available())
98 if self.pid
is None or self.pid != new_pid:
100 self._callback_function(self.is_available())
102 self._check_for_master_timer = threading.Timer(self._sleep_period_seconds, self._wait_for_master)
103 self._check_for_master_timer.start()
112 print(console.green +
"Ros Master available at " + console.yellow + os.environ[
"ROS_MASTER_URI"] + console.green +
"." + console.reset)
114 print(console.red +
"ROS Master unavailable" + console.reset)
116 if __name__ ==
'__main__':
121 except KeyboardInterrupt:
122 ros_master.shutdown()
def _user_callback_test(available)
def __init__(self, period=5, callback_function=None, ros_master_uri=None)
def _wait_for_master(self)
def check_for_master(ros_master_uri=None)
Methods.
def _default_callback_function(self, new_master_found)