Go to the documentation of this file.00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 
00031 
00032 
00033 
00034 
00035 
00036 
00037 
00038 
00039 
00040 
00041 
00042 
00043 
00044 
00045 
00046 
00047 
00048 
00049 
00050 
00051 
00052 
00053 
00054 if __name__ == "__main__":
00055     
00056     
00057     print ("ERROR: clearpath.horizon.transports is a module and can NOT be run"\
00058            " as a script!\nFor a command-line interface demo of Horizon, run:"\
00059            "\n  python -m clearpath.horizon.demo\n"\
00060            "For Horizon message forwarding, run:\n"\
00061            "  python -m clearpath.horizon.forward")
00062 
00063     
00064     import sys
00065     sys.exit(1)
00066 
00067 
00068 
00069 
00070 
00071 
00072 
00073 
00074 
00075  
00076 
00077 
00078 
00079 
00080 
00081 
00082 
00083 
00084 
00085 
00086 
00087 
00088 
00089 
00090 
00091 
00092 
00093 
00094 
00095 
00096 
00097 
00098 
00099 
00100 
00101 
00102 
00103 
00104 
00105 
00106 
00107 
00108 
00109 
00110 
00111 
00112 
00113 
00114 
00115 
00116 
00117 
00118 
00119 
00120 
00121 
00122 
00123 
00124 
00125 
00126 
00127 
00128 
00129 
00130 
00131 
00132 
00133 
00134 
00135 
00136 
00137 
00138 
00139 
00140 
00141 
00142 
00143 
00144 
00145 
00146 
00147 
00148 
00149 
00150 
00151 
00152 
00153 
00154 
00155 
00156 
00157 
00158 
00159 
00160 
00161 
00162 
00163 
00164 
00165 """Horizon Message Transport Controllers
00166 
00167    Copyright © 2010 Clearpath Robotics, Inc.
00168    All rights reserved
00169    
00170    Created: 17/03/10
00171    Authors: Ryan Gariepy & Malcolm Robert
00172    Version: 1.0
00173    """
00174 
00175 
00176 
00177 from .. import utils            
00178 from .  import messages         
00179 from .  import codes            
00180 
00181 
00182 import logging                  
00183 import socket                   
00184 import sys                      
00185 import threading                
00186 import time                     
00187 import collections              
00188 import math
00189 
00190 
00191 try:
00192     import serial               
00193 except ImportError:
00194     pass
00195 
00196 MessageRecord = collections.namedtuple('MessageRecord', 'message, expiry')
00197 
00198 
00199 
00200 
00201 __version__  = "1.0"
00202 """Module Version"""
00203 
00204 __revision__ = "$Revision: 898 $"
00205 """ SVN Code Revision"""
00206 
00207 
00208 
00209 logger = logging.getLogger('clearpath.horizon.transports')
00210 """Horizon Transports Module Log"""
00211 logger.setLevel(logging.NOTSET)
00212 logger.addHandler(utils.NullLoggingHandler())
00213 logger.propagate = False
00214 logger.debug("Loading clearpath.horizon.transports ...")              
00215 
00216 
00217 
00218 
00219 
00220 
00221 
00222 
00223 
00224 
00225 
00226 class Transport(object):
00227     """Horizon Transport Base Class"""
00228     pass
00229 
00230 
00231 
00232 
00233 
00234 
00235 
00236 
00237 
00238 
00239 
00240 
00241 class Serial(Transport):
00242     """Horizon Transport Controller - Serial Device"""
00243     
00244     def __init__(self, port = None, store_timeout = 0, receive_callback = None):
00245         """Create A Horizon Serial Transport"""
00246 
00247         
00248         try:
00249             serial
00250         except NameError:
00251             logger.error("%s: Cannot create Horizon Serial Transport without"\
00252                          "pySerial!" % self.__class__.__name__)
00253             raise utils.TransportError ("pySerial not found!")
00254     
00255         if port == None: 
00256             raise utils.TransportError \
00257                 ("Serial transport creation failed: port not specified!\n")
00258 
00259         
00260         self.port = port
00261         self._serial = None    
00262         self._opened = False
00263         self.store_timeout = store_timeout
00264         self.receive_callback = receive_callback
00265         self.serial_write_lock = threading.Lock()
00266 
00267         
00268         try:
00269             self._serial = serial.Serial()
00270             self._serial.port = port
00271             self._serial.baudrate = 115200
00272             self._serial.timeout = 0
00273     
00274         
00275         except serial.SerialException as ex:
00276             raise utils.TransportError \
00277                 ("Serial Transport creation failed!\n" + str(ex))
00278       
00279 
00280     @classmethod
00281     def autodetect(cls, **kwargs):
00282         ports = utils.list_serial_ports()
00283         logger.info("%s: Attempting autodetect with %s." % 
00284                     (cls.__name__, ' '.join(ports)))
00285 
00286         for trynum in range(5):
00287             logger.info("%s: Autodetect try #%d." %  
00288                         (cls.__name__, trynum + 1))
00289             for port in ports:
00290                 kwargs['port'] = port
00291                 transport = cls(**kwargs)
00292                 try:
00293                     transport.open()
00294                     return transport
00295                 except utils.TransportError:
00296                     
00297                     pass
00298 
00299         raise utils.TransportError("Unable to autodetect a serial Horizon device.")
00300             
00301 
00302     def __str__(self):
00303         """Return the transport device name."""
00304         return self.port
00305     
00306 
00307     def open(self):
00308         if (not self._opened):
00309             logger.debug("%s: Beginning transport opening for %s." % 
00310                          (self.__class__.__name__, self._serial.portstr))
00311 
00312             try:
00313                 self._serial.open()
00314                 if not self._serial.isOpen():
00315                     raise serial.SerialException
00316             except serial.SerialException:
00317                 logger.debug("%s: Transport opening failed." % self.__class__.__name__)
00318                 raise utils.TransportError("Serial Port opening failed.")
00319 
00320             self._opened = True
00321             self.receiver = self.Receiver(self._serial, self.store_timeout, self.receive_callback)
00322             self.receiver.start()
00323             time.sleep(0.1)
00324  
00325             logger.debug("%s: Sending ping request." % (self.__class__.__name__))
00326             message = messages.Message.request('echo') 
00327             try: 
00328                 self.send_message(message)
00329             except utils.TransportError as ex:
00330                 
00331                 
00332                 self.close()
00333                 raise utils.TransportError("Serial Port message send failed.\n" + str(ex))
00334   
00335             logger.debug("%s: Ping request sent." % (self.__class__.__name__))
00336 
00337             for sleeping in range(5):
00338                 time.sleep(0.1)
00339                 waiting = self.receiver.get_waiting()
00340                 for message in waiting:
00341                     logger.debug("%s: Message received." % (self.__class__.__name__))
00342                     if codes.names[message.code] == 'echo':
00343                         
00344                         logger.debug("%s: Transport opened." % self.__class__.__name__)
00345                         return
00346   
00347             logger.debug("%s: No response to ping request." % self.__class__.__name__)
00348             self.close()
00349             raise utils.TransportError("Could not communicate with Clearpath platform.")
00350 
00351 
00352     def close(self):
00353         logger.debug("%s: Beginning transport closing for %s." %
00354                 (self.__class__.__name__, self._serial.portstr))
00355         self.receiver.stop()
00356         self.receiver.join()
00357         self._serial.close()
00358         self._opened = False
00359         logger.debug("%s: Transport closed." % self.__class__.__name__)
00360 
00361     
00362     def is_open(self):
00363         return self._opened;
00364 
00365 
00366     def send_message(self, message):
00367         """Serial Transport Device Send Horizon Message"""
00368         self.send_raw(utils.to_bytes(message.data()))
00369 
00370 
00371     def send_raw(self, raw):
00372         if not self._opened:
00373             raise utils.TransportError ("Cannot send while closed!")
00374 
00375         try:
00376             with self.serial_write_lock:
00377                 try:
00378                     getattr(serial, "serial_for_url")
00379                     sent = self._serial.write(raw)
00380                     if sent == None:
00381                         raise utils.TransportError ("Write Failed!")
00382 
00383                 except AttributeError:
00384                     if sys.version_info[0] > 2:
00385                         self._serial.write(list(map(chr, raw)))
00386                     else:
00387                         self._serial.write(raw)
00388                     sent = len(raw)
00389  
00390                 if sent < len(raw):
00391                     raise utils.TransportError ("Write Incomplete!")
00392                 
00393         
00394         except serial.SerialException as ex:
00395             raise utils.TransportError \
00396                     ("Serial Message send failed!\n" + str(ex))
00397 
00398 
00399     class Receiver(threading.Thread):
00400 
00401         def __init__(self, serial, store_timeout, callback):
00402             threading.Thread.__init__(self, name = 'clearpath.horizon.transports.Serial.Receiver')
00403             self._running = False
00404             self._buffer = []
00405             self._serial = serial
00406             self._callback = callback
00407             self.start_time = time.time()
00408             self.store_timeout = store_timeout
00409 
00410             
00411             self._acks_lock = threading.Lock()
00412             self._acks = {}  
00413             self._received_lock = threading.Lock()
00414             self._received = collections.deque()  
00415 
00416             
00417 
00418 
00419         def stop(self):
00420             self._running = False
00421 
00422 
00423         
00424         
00425         def _timestamp(self):
00426             return math.floor((time.time() - self.start_time) * 1000)
00427 
00428 
00429         def get_waiting(self, code=0x0000):
00430             """Horizon Protocol Get Waiting Messages"""
00431             msgs = []
00432             skip = collections.deque()
00433             with self._received_lock:
00434                 try:
00435                     while True:
00436                         message_record = self._received.popleft()
00437                         if code == 0 or message_record.message.code == code:
00438                             msgs.append(message_record.message)
00439                         else:
00440                             skip.append(message_record) 
00441                 except IndexError:
00442                     
00443                     pass
00444 
00445                 
00446                 self._received = skip
00447             return msgs
00448 
00449 
00450         def run(self):
00451             logger.debug("%s: Entering receive loop for %s." % 
00452                          (self.__class__.__name__, self._serial.portstr))
00453             self._running = True
00454             
00455             while self._running:
00456                 
00457                 try:
00458                     message = None
00459                     message = self._get_message()
00460                     if message != None:
00461                         logger.debug("%s: received message:\n %s" % \
00462                                  (self.__class__.__name__, str(message)))
00463                 except IOError as ex:  
00464                     
00465                     logger.warning(
00466                         "%s: IO error in attempting to retrieve message:\n%s" %\
00467                             (self.__class__.__name__, ex))
00468                 except ValueError as ex:  
00469                     
00470                     logger.info(
00471                         "%s: Value error in received message:\n%s" %\
00472                             (self.__class__.__name__, ex))
00473                     
00474                 
00475                 if message != None and message.payload != None:
00476                     if message.payload.__class__ == codes.payloads.Ack:
00477                         
00478                         
00479                         
00480                         with self._acks_lock:
00481                             
00482                             
00483                             
00484                             self._acks[message.timestamp] = MessageRecord(message = message, 
00485                                                                           expiry = self._timestamp() + 
00486                                                                                    self.store_timeout)
00487 
00488                     elif self._callback != None and self._callback(message):
00489                         
00490                         pass
00491 
00492                     else:
00493                         
00494                         with self._received_lock:
00495                             self._received.append(MessageRecord(message = message, 
00496                                                                 expiry = self._timestamp() + 
00497                                                                          self.store_timeout))
00498 
00499                 
00500                 current = self._timestamp()
00501                 with self._received_lock:  
00502                     try:
00503                         while current > self._received[0].expiry:
00504                             self._received.popleft()
00505                     except IndexError:
00506                         
00507                         pass
00508                 with self._acks_lock:  
00509                     for ts, message_record in list(self._acks.items()): 
00510                         if current > message_record.expiry:
00511                             del self._acks[ts]
00512 
00513                 
00514                 time.sleep(0.001)
00515             
00516             logger.debug("%s: Exiting receive loop for %s." % 
00517                          (self.__class__.__name__, self._serial.portstr))
00518  
00519         def has_ack(self, timestamp):
00520             with self._acks_lock:
00521                 if timestamp in self._acks:
00522                     ack = self._acks[timestamp][0]
00523                     del self._acks[timestamp]
00524                     return ack
00525                 else:
00526                     return False
00527 
00528 
00529         def _get_message(self):
00530             read = 0
00531 
00532             
00533             chars = self._serial.read(1000)
00534 
00535             if len(chars) > 0:
00536                 try:
00537                     getattr(serial, "serial_for_url")
00538                     if sys.version_info[0] > 2:
00539                         self._buffer += chars
00540                     else:
00541                         self._buffer += list(map(ord,chars))
00542                 except AttributeError:
00543                     self._buffer += list(map(ord,chars))
00544 
00545             
00546             
00547             
00548             disc = []
00549             while(len(self._buffer) > 3 and (
00550                     self._buffer[0] != messages.Message.SOH or
00551                     self._buffer[1] != 0xFF & (~self._buffer[2]) or
00552                     self._buffer[1] == 0)):
00553                 disc.append(self._buffer.pop(0))
00554 
00555             if len(disc) > 0:
00556                 logger.info("%s: Discarded %d bytes:\n%s" % (
00557                         self.__class__.__name__, len(disc), 
00558                         ' '.join(map(utils.hex,disc))))
00559 
00560             if len(self._buffer) < 3:
00561                 
00562                 return None
00563      
00564             length = self._buffer[1] + 3
00565         
00566             
00567             
00568             if len(self._buffer) < length:
00569                 return None
00570         
00571             
00572             raw = self._buffer[0:length]
00573             self._buffer = self._buffer[length:]
00574             logger.info("%s: Message of %d bytes found:\n%s" % (
00575                     self.__class__.__name__, len(raw), 
00576                     ' '.join(map(utils.hex,raw))))
00577 
00578             
00579             
00580             return messages.Message.parse(raw)
00581 
00582        
00583     logger.debug("... clearpath.horizon.transports loaded.")
00584