transports.py
Go to the documentation of this file.
00001 #! /usr/bin/env python -m
00002 # -*- coding: utf-8 -*-
00003 #     _____
00004 #    /  _  \
00005 #   / _/ \  \
00006 #  / / \_/   \
00007 # /  \_/  _   \  ___  _    ___   ___   ____   ____   ___   _____  _   _
00008 # \  / \_/ \  / /  _\| |  | __| / _ \ | ┌┐ \ | ┌┐ \ / _ \ |_   _|| | | |
00009 #  \ \_/ \_/ /  | |  | |  | └─┐| |_| || └┘ / | └┘_/| |_| |  | |  | └─┘ |
00010 #   \  \_/  /   | |_ | |_ | ┌─┘|  _  || |\ \ | |   |  _  |  | |  | ┌─┐ |
00011 #    \_____/    \___/|___||___||_| |_||_| \_\|_|   |_| |_|  |_|  |_| |_|
00012 #            ROBOTICS™
00013 #
00014 #  File: transports.py
00015 #  Desc: Horizon Message Transport Controllers
00016 #  
00017 #  Copyright © 2010 Clearpath Robotics, Inc. 
00018 #  All Rights Reserved
00019 # 
00020 #  Redistribution and use in source and binary forms, with or without
00021 #  modification, are permitted provided that the following conditions are met:
00022 #      * Redistributions of source code must retain the above copyright
00023 #        notice, this list of conditions and the following disclaimer.
00024 #      * Redistributions in binary form must reproduce the above copyright
00025 #        notice, this list of conditions and the following disclaimer in the
00026 #        documentation and/or other materials provided with the distribution.
00027 #      * Neither the name of Clearpath Robotics, Inc. nor the
00028 #        names of its contributors may be used to endorse or promote products
00029 #        derived from this software without specific prior written permission.
00030 # 
00031 #  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 
00032 #  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
00033 #  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
00034 #  ARE DISCLAIMED. IN NO EVENT SHALL CLEARPATH ROBOTICS, INC. BE LIABLE FOR ANY
00035 #  DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
00036 #  (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00037 #  LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
00038 #  ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
00039 #  (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
00040 #  SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00041 #
00042 #  Please send comments, questions, or patches to code@clearpathrobotics.com
00043 #
00044 
00045 
00046 
00047 
00048 ################################################################################
00049 # Script
00050 
00051 
00052 
00053 # Check if run as a script
00054 if __name__ == "__main__":
00055     
00056     # Warn of Module ONLY status
00057     print ("ERROR: clearpath.horizon.transports is a module and can NOT be run"\
00058            " as a script!\nFor a command-line interface demo of Horizon, run:"\
00059            "\n  python -m clearpath.horizon.demo\n"\
00060            "For Horizon message forwarding, run:\n"\
00061            "  python -m clearpath.horizon.forward")
00062 
00063     # Exit Error
00064     import sys
00065     sys.exit(1)
00066 
00067 
00068 
00069 
00070 ################################################################################
00071 # Module
00072 
00073 
00074 
00075  ## @package clearpath.horizon.transports 
00076 #  Horizon Transports Python Module
00077 # 
00078 #  Horizon Message Transport Controllers                                      \n
00079 #  Supported Horizon version(s): 0.1 - 1.0                                    \n
00080 #                                                                             \n
00081 #  Supported Transports:
00082 #  - Serial (RS-232)
00083 #
00084 #  @author     Ryan Gariepy
00085 #  @author     Malcolm Robert
00086 #  @date       18/01/10
00087 #  @todo       Would be nice to support version translation in IP Servers, 
00088 #              however, that would require additional work to detect the
00089 #              version that clients are using which is currently not supported
00090 #              by the current framework.
00091 #  @todo       FIX encryption
00092 #  @req        clearpath.utils                                                \n
00093 #              clearpath.horizon.messages                                     \n
00094 #              clearpath.horizon.payloads                                     \n
00095 #              pySerial [http://pyserial.sourceforge.net/]                    \n
00096 #              -- for Serial
00097 #  @version    1.0
00098 #
00099 #  @section USE
00100 #
00101 #  The intended purpose of this module is to provide functionality to send
00102 #  and receive Horizon messages over the various transports with minimal
00103 #  knowledge of messages and no knowledge of message payloads. Further, as
00104 #  some transports support a one-to-many connection scheme, this module also
00105 #  provides message routing. Note that as this module implements the transports
00106 #  as defined in the Horizon specification, there is no support for version
00107 #  translation. If version translation is desired, refer to the module 
00108 #  clearpath.horizon.forward.
00109 #
00110 #  @section HISTORY
00111 #
00112 #  Versions 0.1 - 0.3 {Ryan Gariepy}
00113 #  - Initial Creation as protocol.py
00114 #
00115 #  Version 0.4 {Malcolm Robert}
00116 #  - Moved to horizon_protocol.py
00117 #  - Added protocol abstraction
00118 #  - Added TCP/IP
00119 #  - Added UDP
00120 #  - Added logging
00121 #  - Added version support
00122 #  - Added Doxygen documentation
00123 #  - Changed version scheme to match Horizon doc
00124 #  - Horizon support for v0.4
00125 #
00126 #  Version 0.5
00127 #  - Horizon support for v0.5
00128 #
00129 #  Version 0.6
00130 #  - Moved to protocol.py
00131 #  - Horizon support for v0.6
00132 #  - Improved search for header in read
00133 #  - Added TCP encryption
00134 #
00135 #  Version 0.7
00136 #  - Moved to transport.py
00137 #  - Added Server transports
00138 #  - Added IPv6 support
00139 #  - Horizon support for v 0.1 - 0.7
00140 #  - Python 2.6+ & 3.x compatible
00141 #
00142 #  Version 0.8
00143 #  - Horizon support for v 0.1 - 0.8
00144 #
00145 #  Version 1.0
00146 #  - Added one-to-many support
00147 #  - Horizon support for v 0.1 - 1.0
00148 #
00149 #  @section License
00150 #  @copydoc public_license
00151 #
00152 #  @defgroup hardware Hardware
00153 #  @ingroup doc
00154 #  
00155 #  @copydoc serial
00156 #
00157 #  If higher bandwidth communication is required, the protocol may also be 
00158 #  packaged verbatim within TCP/IP or UDP packets if desired, with one protocol 
00159 #  message per packet. The default Horizon port for TCP/UDP is 7284.
00160 #
00161 #  @copydoc tcp
00162 #
00163 #  @copydoc udp
00164 #
00165 """Horizon Message Transport Controllers
00166 
00167    Copyright © 2010 Clearpath Robotics, Inc.
00168    All rights reserved
00169    
00170    Created: 17/03/10
00171    Authors: Ryan Gariepy & Malcolm Robert
00172    Version: 1.0
00173    """
00174 
00175 
00176 # Required Clearpath Modules
00177 from .. import utils            # Clearpath Utilities
00178 from .  import messages         # Horizon Protocol Message Definition
00179 from .  import codes            # Horizon Protocol Versions 
00180 
00181 # Required Python Modules
00182 import logging                  # Logging Utilities
00183 import socket                   # UDP Port Control
00184 import sys                      # Python Interpreter Functionality
00185 import threading                # Python Thread Support
00186 import time                     # System Date & Time
00187 import collections              # deque
00188 import math
00189 
00190 # Version Dependent Modules
00191 try:
00192     import serial               # Serial Port Control
00193 except ImportError:
00194     pass
00195 
00196 MessageRecord = collections.namedtuple('MessageRecord', 'message, expiry')
00197 
00198 
00199 # Module Support
00200 ## Module Version
00201 __version__  = "1.0"
00202 """Module Version"""
00203 ## SVN Code Revision
00204 __revision__ = "$Revision: 898 $"
00205 """ SVN Code Revision"""
00206 
00207 
00208 ## Message Log
00209 logger = logging.getLogger('clearpath.horizon.transports')
00210 """Horizon Transports Module Log"""
00211 logger.setLevel(logging.NOTSET)
00212 logger.addHandler(utils.NullLoggingHandler())
00213 logger.propagate = False
00214 logger.debug("Loading clearpath.horizon.transports ...")              
00215 
00216 
00217 
00218 
00219 ################################################################################
00220 # Horizon Transport Controller
00221 
00222 
00223 
00224 ## Transport
00225 #
00226 class Transport(object):
00227     """Horizon Transport Base Class"""
00228     pass
00229 
00230 
00231 ## Horizon Serial Controller
00232 #
00233 #  Provides a method to send and receive messages over RS-232.                \n
00234 #  Guarantees order of arrival and arrival.                                   \n
00235 #  Low Bandwidth use only.
00236 #
00237 #  @req        pySerial [http://pyserial.sourceforge.net/]
00238 #  @since 0.1
00239 #
00240 #  @pydoc
00241 class Serial(Transport):
00242     """Horizon Transport Controller - Serial Device"""
00243     
00244     def __init__(self, port = None, store_timeout = 0, receive_callback = None):
00245         """Create A Horizon Serial Transport"""
00246 
00247         # Dependency check
00248         try:
00249             serial
00250         except NameError:
00251             logger.error("%s: Cannot create Horizon Serial Transport without"\
00252                          "pySerial!" % self.__class__.__name__)
00253             raise utils.TransportError ("pySerial not found!")
00254     
00255         if port == None: 
00256             raise utils.TransportError \
00257                 ("Serial transport creation failed: port not specified!\n")
00258 
00259         # Class Variables
00260         self.port = port
00261         self._serial = None    
00262         self._opened = False
00263         self.store_timeout = store_timeout
00264         self.receive_callback = receive_callback
00265         self.serial_write_lock = threading.Lock()
00266 
00267         # Initialization
00268         try:
00269             self._serial = serial.Serial()
00270             self._serial.port = port
00271             self._serial.baudrate = 115200
00272             self._serial.timeout = 0
00273     
00274         # Creation failed
00275         except serial.SerialException as ex:
00276             raise utils.TransportError \
00277                 ("Serial Transport creation failed!\n" + str(ex))
00278       
00279 
00280     @classmethod
00281     def autodetect(cls, **kwargs):
00282         ports = utils.list_serial_ports()
00283         logger.info("%s: Attempting autodetect with %s." % 
00284                     (cls.__name__, ' '.join(ports)))
00285 
00286         for trynum in range(5):
00287             logger.info("%s: Autodetect try #%d." %  
00288                         (cls.__name__, trynum + 1))
00289             for port in ports:
00290                 kwargs['port'] = port
00291                 transport = cls(**kwargs)
00292                 try:
00293                     transport.open()
00294                     return transport
00295                 except utils.TransportError:
00296                     # Not this one, move on.
00297                     pass
00298 
00299         raise utils.TransportError("Unable to autodetect a serial Horizon device.")
00300             
00301 
00302     def __str__(self):
00303         """Return the transport device name."""
00304         return self.port
00305     
00306 
00307     def open(self):
00308         if (not self._opened):
00309             logger.debug("%s: Beginning transport opening for %s." % 
00310                          (self.__class__.__name__, self._serial.portstr))
00311 
00312             try:
00313                 self._serial.open()
00314                 if not self._serial.isOpen():
00315                     raise serial.SerialException
00316             except serial.SerialException:
00317                 logger.debug("%s: Transport opening failed." % self.__class__.__name__)
00318                 raise utils.TransportError("Serial Port opening failed.")
00319 
00320             self._opened = True
00321             self.receiver = self.Receiver(self._serial, self.store_timeout, self.receive_callback)
00322             self.receiver.start()
00323             time.sleep(0.1)
00324  
00325             logger.debug("%s: Sending ping request." % (self.__class__.__name__))
00326             message = messages.Message.request('echo') 
00327             try: 
00328                 self.send_message(message)
00329             except utils.TransportError as ex:
00330                 # Must catch this for the sake of closing the serial port and also
00331                 # killing the receiver thread.
00332                 self.close()
00333                 raise utils.TransportError("Serial Port message send failed.\n" + str(ex))
00334   
00335             logger.debug("%s: Ping request sent." % (self.__class__.__name__))
00336 
00337             for sleeping in range(5):
00338                 time.sleep(0.1)
00339                 waiting = self.receiver.get_waiting()
00340                 for message in waiting:
00341                     logger.debug("%s: Message received." % (self.__class__.__name__))
00342                     if codes.names[message.code] == 'echo':
00343                         # Success
00344                         logger.debug("%s: Transport opened." % self.__class__.__name__)
00345                         return
00346   
00347             logger.debug("%s: No response to ping request." % self.__class__.__name__)
00348             self.close()
00349             raise utils.TransportError("Could not communicate with Clearpath platform.")
00350 
00351 
00352     def close(self):
00353         logger.debug("%s: Beginning transport closing for %s." %
00354                 (self.__class__.__name__, self._serial.portstr))
00355         self.receiver.stop()
00356         self.receiver.join()
00357         self._serial.close()
00358         self._opened = False
00359         logger.debug("%s: Transport closed." % self.__class__.__name__)
00360 
00361     
00362     def is_open(self):
00363         return self._opened;
00364 
00365 
00366     def send_message(self, message):
00367         """Serial Transport Device Send Horizon Message"""
00368         self.send_raw(utils.to_bytes(message.data()))
00369 
00370 
00371     def send_raw(self, raw):
00372         if not self._opened:
00373             raise utils.TransportError ("Cannot send while closed!")
00374 
00375         try:
00376             with self.serial_write_lock:
00377                 try:
00378                     getattr(serial, "serial_for_url")
00379                     sent = self._serial.write(raw)
00380                     if sent == None:
00381                         raise utils.TransportError ("Write Failed!")
00382 
00383                 except AttributeError:
00384                     if sys.version_info[0] > 2:
00385                         self._serial.write(list(map(chr, raw)))
00386                     else:
00387                         self._serial.write(raw)
00388                     sent = len(raw)
00389  
00390                 if sent < len(raw):
00391                     raise utils.TransportError ("Write Incomplete!")
00392                 
00393         # Send Failed
00394         except serial.SerialException as ex:
00395             raise utils.TransportError \
00396                     ("Serial Message send failed!\n" + str(ex))
00397 
00398 
00399     class Receiver(threading.Thread):
00400 
00401         def __init__(self, serial, store_timeout, callback):
00402             threading.Thread.__init__(self, name = 'clearpath.horizon.transports.Serial.Receiver')
00403             self._running = False
00404             self._buffer = []
00405             self._serial = serial
00406             self._callback = callback
00407             self.start_time = time.time()
00408             self.store_timeout = store_timeout
00409 
00410             # Received Messages
00411             self._acks_lock = threading.Lock()
00412             self._acks = {}  # Format: {timestamp: (MessageRecord)}
00413             self._received_lock = threading.Lock()
00414             self._received = collections.deque()  # Format: MessageRecord
00415 
00416             # self.daemon = True
00417 
00418 
00419         def stop(self):
00420             self._running = False
00421 
00422 
00423         # Only for internal use by Receiver methods. These timestamps have nothing to
00424         # do with platform time or API time.
00425         def _timestamp(self):
00426             return math.floor((time.time() - self.start_time) * 1000)
00427 
00428 
00429         def get_waiting(self, code=0x0000):
00430             """Horizon Protocol Get Waiting Messages"""
00431             msgs = []
00432             skip = collections.deque()
00433             with self._received_lock:
00434                 try:
00435                     while True:
00436                         message_record = self._received.popleft()
00437                         if code == 0 or message_record.message.code == code:
00438                             msgs.append(message_record.message)
00439                         else:
00440                             skip.append(message_record) 
00441                 except IndexError:
00442                     # No more items in the list
00443                     pass
00444 
00445                 # All the ones we didn't return throw back in the received bin
00446                 self._received = skip
00447             return msgs
00448 
00449 
00450         def run(self):
00451             logger.debug("%s: Entering receive loop for %s." % 
00452                          (self.__class__.__name__, self._serial.portstr))
00453             self._running = True
00454             
00455             while self._running:
00456                 # Check for message
00457                 try:
00458                     message = None
00459                     message = self._get_message()
00460                     if message != None:
00461                         logger.debug("%s: received message:\n %s" % \
00462                                  (self.__class__.__name__, str(message)))
00463                 except IOError as ex:  # TransportError
00464                     # Silently swallow         
00465                     logger.warning(
00466                         "%s: IO error in attempting to retrieve message:\n%s" %\
00467                             (self.__class__.__name__, ex))
00468                 except ValueError as ex:  # ChecksumError
00469                     # Silently swallow         
00470                     logger.info(
00471                         "%s: Value error in received message:\n%s" %\
00472                             (self.__class__.__name__, ex))
00473                     
00474                 # If it's a good message with a payload, handle it.
00475                 if message != None and message.payload != None:
00476                     if message.payload.__class__ == codes.payloads.Ack:
00477                         # Mark the ack as received. It will be up to the main
00478                         # thread to examine this for status and throw any necessary
00479                         # exceptions.
00480                         with self._acks_lock:
00481                             # It's keyed to the message's baked-in timestamp, because that's how
00482                             # acks are matched. But it's also stored beside the current time per
00483                             # this computer, for the purposes of expiring it.
00484                             self._acks[message.timestamp] = MessageRecord(message = message, 
00485                                                                           expiry = self._timestamp() + 
00486                                                                                    self.store_timeout)
00487 
00488                     elif self._callback != None and self._callback(message):
00489                         # Okay, handled by asyncronous handlers.
00490                         pass
00491 
00492                     else:
00493                         # Unhandled. Store it for synchronous access.
00494                         with self._received_lock:
00495                             self._received.append(MessageRecord(message = message, 
00496                                                                 expiry = self._timestamp() + 
00497                                                                          self.store_timeout))
00498 
00499                 # Check timeouts
00500                 current = self._timestamp()
00501                 with self._received_lock:  # stored messages
00502                     try:
00503                         while current > self._received[0].expiry:
00504                             self._received.popleft()
00505                     except IndexError:
00506                         # No more items in list.
00507                         pass
00508                 with self._acks_lock:  # stored acks
00509                     for ts, message_record in list(self._acks.items()): 
00510                         if current > message_record.expiry:
00511                             del self._acks[ts]
00512 
00513                 # Release Processor
00514                 time.sleep(0.001)
00515             
00516             logger.debug("%s: Exiting receive loop for %s." % 
00517                          (self.__class__.__name__, self._serial.portstr))
00518  
00519         def has_ack(self, timestamp):
00520             with self._acks_lock:
00521                 if timestamp in self._acks:
00522                     ack = self._acks[timestamp][0]
00523                     del self._acks[timestamp]
00524                     return ack
00525                 else:
00526                     return False
00527 
00528 
00529         def _get_message(self):
00530             read = 0
00531 
00532             # read as much as possible without blocking (as timeout = 0)
00533             chars = self._serial.read(1000)
00534 
00535             if len(chars) > 0:
00536                 try:
00537                     getattr(serial, "serial_for_url")
00538                     if sys.version_info[0] > 2:
00539                         self._buffer += chars
00540                     else:
00541                         self._buffer += list(map(ord,chars))
00542                 except AttributeError:
00543                     self._buffer += list(map(ord,chars))
00544 
00545             # Discard bytes from the buffer until we find a 
00546             # SOH character followed by two characters which are
00547             # complements, representing the length.
00548             disc = []
00549             while(len(self._buffer) > 3 and (
00550                     self._buffer[0] != messages.Message.SOH or
00551                     self._buffer[1] != 0xFF & (~self._buffer[2]) or
00552                     self._buffer[1] == 0)):
00553                 disc.append(self._buffer.pop(0))
00554 
00555             if len(disc) > 0:
00556                 logger.info("%s: Discarded %d bytes:\n%s" % (
00557                         self.__class__.__name__, len(disc), 
00558                         ' '.join(map(utils.hex,disc))))
00559 
00560             if len(self._buffer) < 3:
00561                 # Not enough data in the buffer read a SOH + LEN + ~LEN.
00562                 return None
00563      
00564             length = self._buffer[1] + 3
00565         
00566             # Now know message length. Is there enough in the buffer yet to read
00567             # the whole message? If not, wait.
00568             if len(self._buffer) < length:
00569                 return None
00570         
00571             # Create and return the Message
00572             raw = self._buffer[0:length]
00573             self._buffer = self._buffer[length:]
00574             logger.info("%s: Message of %d bytes found:\n%s" % (
00575                     self.__class__.__name__, len(raw), 
00576                     ' '.join(map(utils.hex,raw))))
00577 
00578             # This will throw errors if has a bad checksum, for example,
00579             # which will bubble up to the run() function.
00580             return messages.Message.parse(raw)
00581 
00582        
00583     logger.debug("... clearpath.horizon.transports loaded.")
00584     


clearpath_base
Author(s): Mike Purvis
autogenerated on Sat Dec 28 2013 16:50:48