Source code for master_discovery_fkie.filter_interface

# Software License Agreement (BSD License)
#
# Copyright (c) 2012, Fraunhofer FKIE/US, Alexander Tiderko
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
#  * Redistributions of source code must retain the above copyright
#    notice, this list of conditions and the following disclaimer.
#  * Redistributions in binary form must reproduce the above
#    copyright notice, this list of conditions and the following
#    disclaimer in the documentation and/or other materials provided
#    with the distribution.
#  * Neither the name of Fraunhofer nor the names of its
#    contributors may be used to endorse or promote products derived
#    from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

import re

import rospy

from .common import create_pattern, gen_pattern, is_empty_pattern, EMPTY_PATTERN
from .common import resolve_url, read_interface


[docs]class FilterInterface(object): ''' The class represents a filter used by sync node to filter nodes, topics or services. The filter is based on regular expressions. The filter object can be converted to a tuple of strings and then passed to the XML-RPC method to get a filtered ROS master state. After creation of the filter object you must call :mod:`master_discovery_fkie.filter_interface.FilterInterface.load()` or :mod:`master_discovery_fkie.filter_interface.FilterInterface.from_list()`. Otherwise the object is invalid and the test methods return always `False`. ''' def __init__(self): self.is_valid = False self._re_do_not_sync = EMPTY_PATTERN self._re_do_not_sync_from_list = EMPTY_PATTERN
[docs] def load(self, mastername='', ignore_nodes=[], sync_nodes=[], ignore_topics=[], sync_topics=[], ignore_srv=[], sync_srv=[], ignore_type=[], ignore_publishers=[], ignore_subscribers=[], do_not_sync=[]): ''' Reads the parameter and creates the pattern using :mod:`master_discovery_fkie.common.create_pattern()` ''' self.__interface_file = interface_file = resolve_url(rospy.get_param('~interface_url', '')) self.__mastername = mastername self.__data = data = read_interface(interface_file) if interface_file else {} # set the pattern for ignore and sync lists self._re_ignore_nodes = create_pattern('ignore_nodes', data, interface_file, ignore_nodes, mastername) self._re_sync_nodes = create_pattern('sync_nodes', data, interface_file, sync_nodes, mastername) self._re_ignore_topics = create_pattern('ignore_topics', data, interface_file, ignore_topics, mastername) self._re_sync_topics = create_pattern('sync_topics', data, interface_file, sync_topics, mastername) self._re_ignore_services = create_pattern('ignore_services', data, interface_file, ignore_srv, mastername) self._re_sync_services = create_pattern('sync_services', data, interface_file, sync_srv, mastername) self._re_ignore_type = create_pattern('ignore_type', data, interface_file, ignore_type, mastername) self._re_ignore_publishers = create_pattern('ignore_publishers', data, interface_file, ignore_publishers, mastername) self._re_ignore_subscribers = create_pattern('ignore_subscribers', data, interface_file, ignore_subscribers, mastername) self._sync_remote_nodes = False if interface_file: if 'sync_remote_nodes' in data: self._sync_remote_nodes = data['sync_remote_nodes'] elif rospy.has_param('~sync_remote_nodes'): self._sync_remote_nodes = rospy.get_param('~sync_remote_nodes') if do_not_sync: self._re_do_not_sync = gen_pattern(do_not_sync, 'do_not_sync') else: self.read_do_not_sync() self.is_valid = True
[docs] def read_do_not_sync(self): _do_not_sync = rospy.get_param('do_not_sync', []) if isinstance(_do_not_sync, (str, unicode)): # create a list from string _do_not_sync = _do_not_sync.strip('[').rstrip(']').replace(' ', ',').split(',') # remove empty values _do_not_sync = [val for val in _do_not_sync if val] self._re_do_not_sync = gen_pattern(_do_not_sync, 'do_not_sync', print_info=False)
[docs] def update_sync_topics_pattern(self, topics=[]): ''' Updates the sync topics pattern. :param topics: the list of topics to sync. :type topics: list of strings ''' self._re_sync_topics = create_pattern('sync_topics', self.__data, self.__interface_file, topics, self.__mastername)
[docs] def sync_remote_nodes(self): ''' Returns the value stored in `sync_remote_nodes` parameter. ''' if not self.is_valid: return False return self._sync_remote_nodes
[docs] def is_ignored_node(self, node): ''' Searches the given node in `ignore_nodes` and `sync_nodes` lists. :param node: the name of the node to test. :type node: str :return: `True`, if the node was found in the `ignore_nodes` list or the `sync_nodes` is not empty. :note: If the filter object is not initialized by load() or from_list() the returned value is `False` ''' if not self.is_valid: return False if self.do_not_sync(node): return True if self._re_sync_nodes.match(node): return False # there are no sync nodes defined => return False return not is_empty_pattern(self._re_sync_nodes)
[docs] def is_ignored_topic(self, node, topic, topictype): ''' NOTE: This function is deprecated. Please use `is_ignored_subscriber` and `is_ignored_publisher` instead Searches firstly in ignore lists `ignore_type`, `ignore_nodes` and `ignore_topics`. Then in `sync_nodes` or `sync_topics`. :param node: the name of the node published or subscribed the topic. :type node: str :param topic: the name of the topic to test. :type topic: str :param topictype: the type of the topic. (e.g. the synchronization of bond/Status terminate the nodelets) :type topictype: str :return: `True`, if the values are found in `ignore_type`, `ignore_nodes` or `ignore_topics`. If `sync_nodes` or `sync_topics` is empty `True` will be returned, too. :note: If the filter object is not initialized by load() or from_list() the returned value is `False` ''' rospy.logwarn("Call to deprecated method 'is_ignored_topic'. Please use" "'is_ignored_subscriber' and 'is_ignored_publisher' instead") self._is_ignored_topic(node, topic, topictype)
def _is_ignored_topic(self, node, topic, topictype): if not self.is_valid: return False if self.do_not_sync([node, topic, topictype]): return True # do not sync the bond message of the nodelets!! if self._re_ignore_type.match(topictype): return True if self._re_ignore_nodes.match(node): return True if self._re_ignore_topics.match(topic): return True if self._re_sync_nodes.match(node): return False if self._re_sync_topics.match(topic): return False # there are no sync nodes and topic lists defined => return False (=>sync the given topic) return not is_empty_pattern(self._re_sync_nodes) or not is_empty_pattern(self._re_sync_topics)
[docs] def is_ignored_subscriber(self, node, topic, topictype): ''' Searches first in the `ignore_subscribers` ignore list Then in ignore lists `ignore_type`, `ignore_nodes` and `ignore_topics`. Finally in `sync_nodes` or `sync_topics`. :param node: the name of the node published or subscribed the topic. :type node: str :param topic: the name of the topic to test. :type topic: str :param topictype: the type of the topic. (e.g. the synchronization of bond/Status terminate the nodelets) :type topictype: str :return: `True`, if the values are found in `ignore_type`, `ignore_nodes` or `ignore_topics`. If `sync_nodes` or `sync_topics` is empty `True` will be returned, too. :note: If the filter object is not initialized by load() or from_list() the returned value is `False` ''' if self.do_not_sync([node, topic, topictype]): return True return self._re_ignore_subscribers.match(topic) or self._is_ignored_topic(node, topic, topictype)
[docs] def is_ignored_publisher(self, node, topic, topictype): ''' Searches first in the `ignore_publishers` ignore list Then in ignore lists `ignore_type`, `ignore_nodes` and `ignore_topics`. Finally in `sync_nodes` or `sync_topics`. :param node: the name of the node published or subscribed the topic. :type node: str :param topic: the name of the topic to test. :type topic: str :param topictype: the type of the topic. (e.g. the synchronization of bond/Status terminate the nodelets) :type topictype: str :return: `True`, if the values are found in `ignore_type`, `ignore_nodes` or `ignore_topics`. If `sync_nodes` or `sync_topics` is empty `True` will be returned, too. :note: If the filter object is not initialized by load() or from_list() the returned value is `False` ''' if self.do_not_sync([node, topic, topictype]): return True return self._re_ignore_publishers.match(topic) or self._is_ignored_topic(node, topic, topictype)
[docs] def is_ignored_service(self, node, service): ''' Searches firstly in ignore lists `ignore_nodes` and `ignore_services`. Then in `sync_nodes` or `sync_services`. :param node: the name of the node provided the service. :type node: str :param topic: the name of the service to test. :type topic: str :return: `True`, if the values are found in `ignore_nodes` or `ignore_services`. If `sync_nodes` or `sync_services` is empty `True` will be returned, too. :note: If the filter object is not initialized by load() or from_list() the returned value is `False` ''' if not self.is_valid: return False if self.do_not_sync([node, service]): return True if self._re_ignore_nodes.match(node): return True if self._re_ignore_services.match(service.strip()): return True if self._re_sync_nodes.match(node): return False if self._re_sync_services.match(service): return False return not is_empty_pattern(self._re_sync_nodes) or not is_empty_pattern(self._re_sync_services)
[docs] def do_not_sync(self, name): if isinstance(name, list): for nval in name: if self._re_do_not_sync.match(nval) or self._re_do_not_sync_from_list.match(nval): return True elif self._re_do_not_sync.match(name) or self._re_do_not_sync_from_list.match(name): return True return False
[docs] def to_list(self): ''' :returns: the tuple of the all filter patterns. :: (sync_remote_nodes, ignore_nodes, sync_nodes, ignore_topics, sync_topics, ignore_services, sync_services, ignore_type, ignore_publishers, ignore_subscribers, do_not_sync) :rtype: `(bool, str, str, str, str, str, str, str, str, str, str)` ''' if not self.is_valid: return (False, '', '', '', '', '', '', '', '', '', '') return (self._sync_remote_nodes, _to_str(self._re_ignore_nodes), _to_str(self._re_sync_nodes), _to_str(self._re_ignore_topics), _to_str(self._re_sync_topics), _to_str(self._re_ignore_services), _to_str(self._re_sync_services), _to_str(self._re_ignore_type), _to_str(self._re_ignore_publishers), _to_str(self._re_ignore_subscribers), _to_str(self._re_do_not_sync))
@staticmethod
[docs] def from_list(l=None): ''' Reads the tuple of :: (sync_remote_nodes, ignore_nodes, sync_nodes, ignore_topics, sync_topics, ignore_services, sync_services, ignore_type, ignore_publishers, ignore_subscribers, do_not_sync)` with types `(bool, str, str, str, str, str, str, str, str)` and creates the `FilterInterface` object. :return: `FilterInterface` object or `None` on failure ''' try: result = FilterInterface() if l is None: l = (False, '', '', '', '', '', '', '', '', '', '') else: result.read_do_not_sync() result._sync_remote_nodes = bool(l[0]) result._re_ignore_nodes = _from_str(l[1]) result._re_sync_nodes = _from_str(l[2]) result._re_ignore_topics = _from_str(l[3]) result._re_sync_topics = _from_str(l[4]) result._re_ignore_services = _from_str(l[5]) result._re_sync_services = _from_str(l[6]) result._re_ignore_type = _from_str(l[7]) result._re_ignore_publishers = _from_str(l[8] if len(l) > 8 else '') result._re_ignore_subscribers = _from_str(l[9] if len(l) > 9 else '') result._re_do_not_sync_from_list = _from_str(l[10] if len(l) > 10 else '') result.is_valid = True return result except: import traceback print traceback.format_exc() return None
def _to_str(re_object): if is_empty_pattern(re_object): return '' return re_object.pattern def _from_str(msg): if msg: return re.compile(msg, re.I) else: return EMPTY_PATTERN