Source code for master_discovery_fkie.udp

# Software License Agreement (BSD License)
#
# Copyright (c) 2012, Fraunhofer FKIE/US, Alexander Tiderko
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
#  * Redistributions of source code must retain the above copyright
#    notice, this list of conditions and the following disclaimer.
#  * Redistributions in binary form must reproduce the above
#    copyright notice, this list of conditions and the following
#    disclaimer in the documentation and/or other materials provided
#    with the distribution.
#  * Neither the name of Fraunhofer nor the names of its
#    contributors may be used to endorse or promote products derived
#    from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

import Queue
import array
import errno
import fcntl
import os
import platform
import roslib.network
import rospy
import socket
import struct
import threading

try:
    import netifaces
    _use_netifaces = True
except:
    _use_netifaces = False


SEND_ERRORS = {}


[docs]class QueueReceiveItem(): LOOPBACK = 'LOOPBACK' UNICAST = 'UNICAST' MULTICAST = 'MULTICAST' def __init__(self, msg, sender_addr, via='LOOPBACK'): self.msg = msg self.sender_addr = sender_addr self.via = via
[docs]class QueueSendItem(): def __init__(self, msg, destinations=[]): self.msg = msg self.destinations = destinations if not isinstance(destinations, list): self.destinations = [destinations]
[docs]class DiscoverSocket(socket.socket): ''' The DiscoverSocket class enables the send and receive UDP messages to a multicast group and unicast address. The unicast socket is only created if 'send_mcast' and 'listen_mcast' parameter are set to False or a specific interface is defined. :param port: the port to bind the socket :type port: int :param mgroup: the multicast group to join :type mgroup: str :param reuse: allows the reusing of the port :type reuse: boolean (Default: True) :param ttl: time to leave :type ttl: int (Default: 20) ''' def __init__(self, port, mgroup, ttl=20, send_mcast=True, listen_mcast=True): ''' Creates a socket, bind it to a given port and join to a given multicast group. IPv4 and IPv6 are supported. @param port: the port to bind the socket @type port: int @param mgroup: the multicast group to join @type mgroup: str @param ttl: time to leave @type ttl: int (Default: 20) @param send_mcast: send multicast messages @type send_mcast: bool (Default: True) @param listen_mcast: listen to the multicast group @type listen_mcast: bool (Default: True) ''' self.port = port self.receive_queue = Queue.Queue() self._send_queue = Queue.Queue() self._lock = threading.RLock() self.send_mcast = send_mcast self.listen_mcast = listen_mcast self.unicast_only = not (send_mcast or listen_mcast) self._closed = False self._locals = [ip for ifname, ip in DiscoverSocket.localifs()] self._locals.append('localhost') self.sock_5_error_printed = [] self.SOKET_ERRORS_NEEDS_RECONNECT = False # get the group and interface. Especially for definition like 226.0.0.0@192.168.101.10 # it also reads ~interface and ROS_IP self.mgroup, self.interface = DiscoverSocket.normalize_mgroup(mgroup, True) self.unicast_socket = None # get the AF_INET information for group to ensure that the address family # of group is the same as for interface addrinfo = UcastSocket.getaddrinfo(self.mgroup) self.interface_ip = '' if self.unicast_only: # inform about no braodcasting rospy.logwarn("Multicast disabled! This master is only by unicast reachable!") if self.interface: addrinfo = UcastSocket.getaddrinfo(self.interface, addrinfo[0]) if addrinfo is not None: self.interface_ip = addrinfo[4][0] self.unicast_socket = UcastSocket(self.interface_ip, port) elif self.unicast_only: self.unicast_socket = UcastSocket('', port) rospy.logdebug("mgroup: %s", self.mgroup) rospy.logdebug("interface : %s", self.interface) rospy.logdebug("inet: %s", addrinfo) socket.socket.__init__(self, addrinfo[0], socket.SOCK_DGRAM, socket.IPPROTO_UDP) if not self.unicast_only: rospy.loginfo("Create multicast socket at ('%s', %d)", self.mgroup, port) # initialize multicast socket # Allow multiple copies of this program on one machine if hasattr(socket, "SO_REUSEPORT"): try: self.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) except: rospy.logwarn("SO_REUSEPORT failed: Protocol not available, some functions are not available.") # Set Time-to-live (optional) and loop count ttl_bin = struct.pack('@i', ttl) if addrinfo[0] == socket.AF_INET: # IPv4 self.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, ttl_bin) self.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_LOOP, 1) else: # IPv6 self.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS, ttl_bin) self.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_LOOP, 1) try: if self.listen_mcast: if addrinfo[0] == socket.AF_INET: # IPv4 # Create group_bin for de-register later # Set socket options for multicast specific interface or general if not self.interface_ip: self.group_bin = socket.inet_pton(socket.AF_INET, self.mgroup) + struct.pack('=I', socket.INADDR_ANY) self.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, self.group_bin) else: self.group_bin = socket.inet_aton(self.mgroup) + socket.inet_aton(self.interface_ip) self.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF, socket.inet_aton(self.interface_ip)) self.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, self.group_bin) else: # IPv6 # Create group_bin for de-register later # Set socket options for multicast self.group_bin = socket.inet_pton(addrinfo[0], self.mgroup) + struct.pack('@I', socket.INADDR_ANY) self.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, self.group_bing) except socket.error as errobj: msg = str(errobj) if errobj.errno in [errno.ENODEV]: msg = "socket.error[%d]: %s,\nis multicast route set? e.g. sudo route add -net 224.0.0.0 netmask 224.0.0.0 eth0" % (errobj.errno, msg) raise Exception(msg) # Bind to the port try: # bind to default interfaces if not unicast socket was created to_group = self.mgroup if self.unicast_socket is not None else '' self.bind((to_group, port)) except socket.error as errobj: msg = str(errobj) rospy.logfatal("Unable to bind multicast to interface: %s, check that it exists: %s", self.mgroup, msg) raise self.addrinfo = addrinfo if not self.unicast_only: # create a thread to handle the received multicast messages self._recv_thread = threading.Thread(target=self.recv_loop_multicast) self._recv_thread.start() if self.unicast_socket is not None: # create a thread to handle the received unicast messages self._recv_thread = threading.Thread(target=self.recv_loop_unicast) self._recv_thread.start() self._send_tread = threading.Thread(target=self._send_loop_from_queue) self._send_tread.start() @staticmethod
[docs] def normalize_mgroup(mgroup, getinterface=False): groupaddr, _, interface = mgroup.partition('@') if not getinterface: return groupaddr # Interface parameter takes precedence if not interface: interface = rospy.get_param('~interface', '') if interface: return groupaddr, interface # Otherwise, resolve interface with ROS_HOSTNAME or ROS_IP as per: # http://wiki.ros.org/ROS/EnvironmentVariables#ROS_IP.2BAC8-ROS_HOSTNAME if 'ROS_HOSTNAME' in os.environ: addr = socket.gethostbyname(os.environ['ROS_HOSTNAME']) if addr[:4] != '127.': # 127.x.y.z is loopback return groupaddr, addr if 'ROS_IP' in os.environ: return groupaddr, os.environ['ROS_IP'] # Cannot determine network interface return groupaddr, None
[docs] def close(self): ''' Unregister from the multicast group and close the socket. ''' self._closed = True # Use the stored group_bin to de-register if not self.unicast_only: if self.listen_mcast: if self.addrinfo[0] == socket.AF_INET: # IPv4 self.setsockopt(socket.IPPROTO_IP, socket.IP_DROP_MEMBERSHIP, self.group_bin) else: # IPv6 self.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_LEAVE_GROUP, self.group_bing) rospy.loginfo("Close multicast socket at ('%s', %s)", self.mgroup, self.port) self.sendto('', ('localhost', self.port)) socket.socket.close(self) # close the unicast socket if self.unicast_socket is not None: self.unicast_socket.close()
[docs] def send_queued(self, msg, destinations=[]): try: self._send_queue.put(QueueSendItem(msg, destinations), timeout=1) except Queue.Full as full: import traceback print traceback.format_exc() rospy.logwarn("Can't send message: %s" % full) except Exception as e: rospy.logwarn("Error while put message into queue: %s" % e)
def _get_next_queue_item(self): ''' Wait for next available QueueSendItem. This method cancel waiting on exit and return None. ''' while not self._closed: try: send_item = self._send_queue.get(timeout=1) return send_item except Queue.Empty: pass except Exception as e: rospy.logwarn("Error while get send item from queue: %s" % e) return None def _send_loop_from_queue(self): while not self._closed: send_item = self._get_next_queue_item() if send_item is not None: msg = send_item.msg if send_item.destinations: # send to given addresses for addr in send_item.destinations: try: if addr in self._locals: self.receive_queue.put(QueueReceiveItem(msg, (addr, self.port), QueueReceiveItem.LOOPBACK), timeout=1) elif self.unicast_socket is None: self.sendto(msg, (addr, self.getsockname()[1])) else: self.unicast_socket.send2addr(msg, addr) try: del SEND_ERRORS[addr] except: pass except socket.error as errobj: erro_msg = "Error while send to '%s': %s" % (addr, errobj) SEND_ERRORS[addr] = erro_msg # -2: Name or service not known if errobj.errno in [-5, -2]: if addr not in self.sock_5_error_printed: rospy.logwarn(erro_msg) self.sock_5_error_printed.append(addr) else: rospy.logwarn(erro_msg) if errobj.errno in [errno.ENETDOWN, errno.ENETUNREACH, errno.ENETRESET, errno]: self.SOKET_ERRORS_NEEDS_RECONNECT = True except Exception as e: erro_msg = "Send to robot host '%s' failed: %s" % (addr, e) rospy.logwarn(erro_msg) SEND_ERRORS[addr] = erro_msg else: # send a multicast message # simulate the reception of a message from local host addr = self.mgroup try: if not self.listen_mcast: self.receive_queue.put(QueueReceiveItem(msg, ('localhost', self.port), QueueReceiveItem.LOOPBACK), timeout=1) if self.unicast_only and self.unicast_socket: addr = self.unicast_socket.interface self.unicast_socket.send2addr(msg, self.unicast_socket.interface) elif self.send_mcast: # Send to the multicast group address as supplied # Default '226.0.0.0' self.sendto(msg, (self.mgroup, self.getsockname()[1])) try: del SEND_ERRORS[addr] except: pass except socket.error as errobj: erro_msg = "Error while send to '%s': %s" % (addr, errobj) SEND_ERRORS[addr] = erro_msg # -2: Name or service not known if errobj.errno in [-5, -2]: if addr not in self.sock_5_error_printed: rospy.logwarn(erro_msg) self.sock_5_error_printed.append(addr) else: rospy.logdebug(erro_msg) if errobj.errno in [errno.ENETDOWN, errno.ENETUNREACH, errno.ENETRESET]: self.SOKET_ERRORS_NEEDS_RECONNECT = True except Exception as e: erro_msg = "Send to robot host '%s' failed: %s" % (addr, e) rospy.logwarn(erro_msg) SEND_ERRORS[addr] = erro_msg
[docs] def hasEnabledMulticastIface(self): ''' Test all enabled interfaces for a MULTICAST flag. If no enabled interfaces has a multicast support, False will be returned. :return: ``True``, if any interfaces with multicast support are enabled. :rtype: bool ''' if platform.system() in ['Linux', 'FreeBSD']: SIOCGIFFLAGS = 0x8913 IFF_MULTICAST = 0x1000 # Supports multicast. IFF_UP = 0x1 # Interface is up. for (ifname, _) in DiscoverSocket.localifs(): args = (ifname + '\0' * 32)[:32] try: result = fcntl.ioctl(self.fileno(), SIOCGIFFLAGS, args) except IOError: return False flags, = struct.unpack('H', result[16:18]) if ((flags & IFF_MULTICAST) != 0) & ((flags & IFF_UP) != 0): return True return False else: return True
@staticmethod
[docs] def localifs(): ''' Used to get a list of the up interfaces and associated IP addresses on this machine (linux only). :return: List of interface tuples. Each tuple consists of ``(interface name, interface IP)`` :rtype: list of ``(str, str)`` ''' if _use_netifaces: # #addresses on multiple platforms (OS X, Unix, Windows) local_addrs = [] # see http://alastairs-place.net/netifaces/ for i in netifaces.interfaces(): try: local_addrs.extend([(i, d['addr']) for d in netifaces.ifaddresses(i)[netifaces.AF_INET]]) except KeyError: pass return local_addrs else: SIOCGIFCONF = 0x8912 MAXBYTES = 8096 arch = platform.architecture()[0] # I really don't know what to call these right now var1 = -1 var2 = -1 if arch == '32bit': var1 = 32 var2 = 32 elif arch == '64bit': var1 = 16 var2 = 40 else: raise OSError("Unknown architecture: %s" % arch) sockfd = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) names = array.array('B', '\0' * MAXBYTES) outbytes = struct.unpack('iL', fcntl.ioctl(sockfd.fileno(), SIOCGIFCONF, struct.pack('iL', MAXBYTES, names.buffer_info()[0]) ))[0] namestr = names.tostring() return [(namestr[i:i + var1].split('\0', 1)[0], socket.inet_ntoa(namestr[i + 20:i + 24])) for i in xrange(0, outbytes, var2)]
[docs] def recv_loop_multicast(self): ''' This method handles the received multicast messages. ''' while not rospy.is_shutdown() and not self._closed: try: (msg, address) = self.recvfrom(1024) if not self._closed: self.receive_queue.put(QueueReceiveItem(msg, address, QueueReceiveItem.MULTICAST), timeout=1) except socket.timeout: pass except Queue.Full as full_error: rospy.logwarn("Error while process recevied multicast message: %s", full_error) except socket.error: import traceback rospy.logwarn("socket error: %s", traceback.format_exc())
[docs] def recv_loop_unicast(self): ''' This method handles the received unicast messages. ''' if self.unicast_socket is not None: while not rospy.is_shutdown() and not self._closed: try: (msg, address) = self.unicast_socket.recvfrom(1024) if not self._closed: self.receive_queue.put(QueueReceiveItem(msg, address, QueueReceiveItem.UNICAST), timeout=1) except socket.timeout: pass except Queue.Full as full_error: rospy.logwarn("Error while process recevied unicast message: %s", full_error) except socket.error: import traceback rospy.logwarn("unicast socket error: %s", traceback.format_exc())
[docs]class UcastSocket(socket.socket): def __init__(self, interface, port): ''' Creates a socket, bind it to a given interface+port for unicast send/receive. IPv4 and IPv6 are supported. @param interface: The interface to bind to @type interface: str @param port: the port to bind the socket @type port: int ''' self.interface = interface self.port = port addrinfo = None # If interface isn't specified, try to find an non localhost interface to # get some info for binding. Otherwise use localhost if not self.interface: ifaces = roslib.network.get_local_addresses() self.interface = 'localhost' for iface in ifaces: if not (iface.startswith('127') or iface.startswith('::1')): self.interface = iface break rospy.loginfo("+ Bind to unicast socket @(%s:%s)", self.interface, port) addrinfo = UcastSocket.getaddrinfo(self.interface) else: # Otherwise get the address info for the interface specified. rospy.loginfo("+ Bind to specified unicast socket @(%s:%s)", self.interface, port) addrinfo = UcastSocket.getaddrinfo(self.interface) # Configure socket type socket.socket.__init__(self, addrinfo[0], socket.SOCK_DGRAM, socket.IPPROTO_UDP) # Allow multiple copies of this program on one machine # Required to receive unicast UDP if hasattr(socket, "SO_REUSEPORT"): try: self.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) except: rospy.logwarn("SO_REUSEPORT failed: Protocol not available, some functions are not available.") # Bind to the port try: rospy.logdebug("Ucast bind to: (%s:%s)", addrinfo[4][0], port) self.bind((addrinfo[4][0], port)) except socket.error as errobj: msg = str(errobj) rospy.logfatal("Unable to bind unicast to interface: %s, check that it exists: %s", self.interface, msg) raise
[docs] def send2addr(self, msg, addr): ''' Sends the given message to the joined multicast group. Some errors on send will be ignored (``ENETRESET``, ``ENETDOWN``, ``ENETUNREACH``) :param msg: message to send :type msg: str :param addr: IPv4 or IPv6 address :type addr: str ''' try: self.sendto(msg, (addr, self.getsockname()[1])) except socket.error as errobj: msg = str(errobj) if errobj.errno in [-5]: if addr not in self.sock_5_error_printed: rospy.logwarn("socket.error[%d]: %s, addr: %s", errobj.errno, msg, addr) self.sock_5_error_printed.append(addr) elif errobj.errno in [errno.EINVAL, -2]: raise # Exception('Cannot send to `%s`, try to change the interface, message: %s' % (addr, msg)) elif errobj.errno not in [errno.ENETDOWN, errno.ENETUNREACH, errno.ENETRESET]: raise
[docs] def close(self): """ Cleanup and close the socket""" self.sendto('', (self.interface, self.port)) socket.socket.close(self)
@staticmethod
[docs] def getaddrinfo(addr, family=None): ''' :param addr: the addess to get the info for :param family: type of the address family (e.g. socket.AF_INET) ''' # get info about the IP version (4 or 6) addrinfos = socket.getaddrinfo(addr, None) addrinfo = None if family is None and len(addrinfos) > 0: addrinfo = addrinfos[0] elif family: for ainfo in addrinfos: if ainfo[0] == family: return ainfo return addrinfo