simpkl_log.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # -*- Python -*-
00003 # -*- coding: utf-8 -*-
00004 
00005 '''rtshell
00006 
00007 Copyright (C) 2009-2014
00008     Geoffrey Biggs
00009     RT-Synthesis Research Group
00010     Intelligent Systems Research Institute,
00011     National Institute of Advanced Industrial Science and Technology (AIST),
00012     Japan
00013     All rights reserved.
00014 Licensed under the Eclipse Public License -v 1.0 (EPL)
00015 http://www.opensource.org/licenses/eclipse-1.0.txt
00016 
00017 Pickle-based log.
00018 
00019 '''
00020 
00021 
00022 import copy
00023 import os
00024 import pickle
00025 import traceback
00026 
00027 import ilog
00028 
00029 
00030 ###############################################################################
00031 ## Current position pointer
00032 
00033 class CurPos(object):
00034     def __init__(self, index=0, timestamp=0, prev_pos=0, cache=0, file_pos=0):
00035         super(CurPos, self).__init__()
00036         self.index = index
00037         self.ts = timestamp
00038         self.prev = prev_pos
00039         self.cache = cache
00040         self.fp = file_pos
00041 
00042     def __str__(self):
00043         return 'Index: {0}, timestamp: {1}, previous position: {2}, cache '\
00044                 'position: {3}, file position: {4}'.format(self.index, self.ts,
00045                         self.prev, self.cache, self.fp)
00046 
00047 
00048 ###############################################################################
00049 ## Simple pickle-based log object. Its support for the full log interface
00050 ## is rudimentary and slow (although writing and simple reading should be fast
00051 ## enough).
00052 ##
00053 ## The simple pickle-based format is as follows (each entry is serialised):
00054 ## Port specification (in the metadata block)
00055 ## [Data entries: (Index, Time stamp, Data)]
00056 
00057 class SimplePickleLog(ilog.Log):
00058     # Indices in data entries for bits of data
00059     INDEX = 0
00060     TS = 1
00061     DATA = 2
00062     FP = 3
00063     PREV = 4
00064     # Spare space at the start for pointers
00065     BUFFER_SIZE = 256
00066 
00067     def __init__(self, filename='', *args, **kwargs):
00068         self._is_open = False
00069         self._fn = filename
00070         self._cur_pos = CurPos()
00071         self._start = None
00072         self._end = None
00073         self._next = None
00074         self._write_ind = 0
00075         self._prev_pos = 0
00076         super(SimplePickleLog, self).__init__(*args, **kwargs)
00077 
00078     def __str__(self):
00079         return 'SimplePickleLog({0}, {1}) at position {2}.'.format(self._fn,
00080                 self._mode, self._cur_pos)
00081 
00082     def write(self, timestamp, data):
00083         val = (self._write_ind, timestamp, data, self._file.tell(), self._prev_pos)
00084         # Track the start of the last entry for later writing at the file start
00085         self._cur_pos.ts = timestamp
00086         self._end = copy.copy(self._cur_pos)
00087         # Record the new "previous" position before writing
00088         self._prev_pos = self._file.tell()
00089         self._write(val)
00090         # Update the current position to after the new final record
00091         self._cur_pos.index = val[self.INDEX] + 1
00092         self._cur_pos.ts = -1
00093         self._cur_pos.prev = self._prev_pos
00094         self._cur_pos.cache = self._prev_pos
00095         self._cur_pos.fp = self._file.tell()
00096         self._write_ind += 1
00097         self._vb_print('Wrote entry at ({0}, {1}, {2}, {3}).'.format(
00098             val[self.INDEX], val[self.TS], val[self.FP], val[self.PREV]))
00099 
00100     def read(self, timestamp=None, number=None):
00101         if number is not None:
00102             return self._read_number(number)
00103         elif timestamp is not None:
00104             return self._read_to_timestamp(timestamp)
00105         else:
00106             return self._read_single_entry()
00107 
00108     def rewind(self):
00109         self._vb_print('Rewinding log from position {0}.'.format(
00110                 self._cur_pos))
00111         if self._mode == 'r':
00112             self._file.seek(0)
00113         else:
00114             self._file.truncate()
00115         self._write_ind = 0
00116         self._init_log()
00117 
00118     def seek(self, timestamp=None, index=None):
00119         self._vb_print('Seeking log from position {0}.'.format(self._cur_pos))
00120         if index is not None:
00121             self._seek_to_index(index)
00122         elif timestamp is not None:
00123             self._seek_to_timestamp(timestamp)
00124         # Do nothing if neither is set
00125         self._vb_print('New current position: {0}.'.format(self._cur_pos))
00126 
00127     def _backup_one(self):
00128         '''Reverses in the log one entry.'''
00129         self._vb_print('Backing up one entry from {0}.'.format(self._cur_pos))
00130         if self._cur_pos.index == 0:
00131             # Already at the start
00132             self._vb_print('Backup already at start.')
00133             return
00134         else:
00135             self._next = None
00136             target = self._cur_pos.prev
00137             # Move back in the file one entry
00138             self._file.seek(target)
00139             # Update the next pointer
00140             self._next = self._read()
00141             self._update_cur_pos(self._next)
00142         self._vb_print('New current position: {0}.'.format(self._cur_pos))
00143 
00144     def _close(self):
00145         if not self._is_open:
00146             return
00147         if self._mode == 'w':
00148             # Go back to the beginning and write the end position
00149             self._file.seek(0)
00150             self._file.seek(self._buf_start) # Skip the meta data
00151             self._write(self._end)
00152             self._vb_print('Wrote end pointer: {0}'.format(self._end))
00153             self._file.close()
00154             self._is_open = False
00155             self._start = None
00156             self._end = None
00157             self._vb_print('Closed file.')
00158 
00159     def _eof(self):
00160         return self._next is None
00161 
00162     def _get_cur_pos(self):
00163         self._vb_print('Current position: {0}'.format(self._cur_pos))
00164         return self._cur_pos.index, self._cur_pos.ts
00165 
00166     def _get_start(self):
00167         if self._start is None:
00168             self._set_start()
00169         self._vb_print('Start position: {0}'.format(self._start))
00170         return (self._start.index, self._start.ts)
00171 
00172     def _get_end(self):
00173         self._vb_print('End position: {0}'.format(self._end))
00174         return (self._end.index, self._end.ts)
00175 
00176     def _init_log(self):
00177         if self._mode == 'r':
00178             self._vb_print('Initialising log for reading.')
00179             # Read out the metadata
00180             self._meta = self._read()
00181             pos = self._file.tell()
00182             # Read the end marker
00183             self._end = self._read()
00184             # Skip to the start of the data
00185             self._file.seek(pos + self.BUFFER_SIZE)
00186             self._vb_print('Read end position: {0}'.format(self._end))
00187             # Grab the position of the first entry and make it the current
00188             self._set_start()
00189             self._cur_pos = copy.copy(self._start)
00190             # Get the first entry
00191             self._next = self._read()
00192         else:
00193             self._vb_print('Initialising log for writing.')
00194             # Write the metadata
00195             self._write(self._meta)
00196             self._vb_print('Wrote meta data of length {0}'.format(
00197                 self._file.tell()))
00198             self._buf_start = self._file.tell()
00199             # Put some blank space to write the end marker
00200             self._file.write(''.ljust(self.BUFFER_SIZE))
00201             self._vb_print('Wrote buffer of length {0} at position {1}'.format(
00202                 self.BUFFER_SIZE, self._buf_start))
00203             self._write_ind = 0
00204             self._prev_pos = 0
00205             self._cur_pos = CurPos(file_pos=self._file.tell())
00206             self._vb_print('First entry will be written at {0}'.format(
00207                 self._cur_pos))
00208 
00209     def _open(self):
00210         if self._is_open:
00211             return
00212         if self._mode == 'r':
00213             flags = 'rb'
00214         elif self._mode == 'w':
00215             flags = 'wb'
00216         else:
00217             raise NotImplementedError
00218         self._file = open(self._fn, flags)
00219         self._init_log()
00220         self._is_open = True
00221         self._vb_print('Opened file {0} in mode {1}.'.format(self._fn,
00222             self._mode))
00223 
00224     def _read(self):
00225         '''Read a single entry from the log.'''
00226         self._vb_print('Reading one data block at {0}.'.format(
00227             self._file.tell()))
00228         try:
00229             data = pickle.load(self._file)
00230         except EOFError:
00231             self._vb_print('End of log reached.')
00232             raise ilog.EndOfLogError
00233         return data
00234 
00235     def _read_number(self, number):
00236         self._vb_print('Reading {0} entries.'.format(number))
00237         res = []
00238         if number < 0:
00239             raise ValueError
00240         if not self._next:
00241             self._vb_print('End of log before reading.')
00242             return []
00243         try:
00244             for ii in range(number):
00245                 res.append((self._next[self.INDEX], self._next[self.TS],
00246                     self._next[self.DATA]))
00247                 self._next = self._read()
00248                 if not self._next:
00249                     self._set_eof_pos()
00250                     self._vb_print('End of log during reading, current '\
00251                             'position is {1}.'.format(self._cur_pos))
00252                     break
00253                 self._update_cur_pos(self._next)
00254                 self._vb_print('Read entry {0} of {1}, current position '\
00255                         'is {2}.'.format(ii + 1, number, self._cur_pos))
00256         except ilog.EndOfLogError:
00257             self._set_eof_pos()
00258             self._next = None
00259             self._vb_print('End of log while reading, current '\
00260                     'position is {0}.'.format(self._cur_pos))
00261         self._vb_print('Finished reading; current position is ' \
00262                 '{0}.'.format(self._cur_pos))
00263         return res
00264 
00265     def _read_to_timestamp(self, timestamp):
00266         self._vb_print('Reading until time stamp {0}.'.format(timestamp))
00267         res = []
00268         if timestamp < 0:
00269             raise ValueError
00270         if not self._next:
00271             self._vb_print('End of log before reading.')
00272             return []
00273         if self._cur_pos.ts > timestamp:
00274             # The time limit is before the next item - nothing to read
00275             self._vb_print('Current position is beyond the time limit.')
00276             return []
00277         try:
00278             while self._next[self.TS] <= timestamp:
00279                 res.append((self._next[self.INDEX], self._next[self.TS],
00280                     self._next[self.DATA]))
00281                 self._next = self._read()
00282                 if not self._next:
00283                     self._set_eof_pos()
00284                     self._vb_print('End of log during reading, current '\
00285                             'position is {1}.'.format(self._cur_pos))
00286                     break
00287                 self._update_cur_pos(self._next)
00288                 self._vb_print('Read entry at time index {0}, current '\
00289                         'position is {1}.'.format(res[-1][1], self._cur_pos))
00290         except ilog.EndOfLogError:
00291             self._set_eof_pos()
00292             self._next = None
00293             self._vb_print('End of log while reading, current '\
00294                     'position is {0}.'.format(self._cur_pos))
00295         self._vb_print('Finished reading; current position is ' \
00296                 '{0}.'.format(self._cur_pos))
00297         return res
00298 
00299     def _read_single_entry(self):
00300         self._vb_print('Reading a single entry.')
00301         if not self._next:
00302             self._vb_print('End of log before reading.')
00303             return []
00304         else:
00305             res = [(self._next[self.INDEX], self._next[self.TS],
00306                 self._next[self.DATA])]
00307             try:
00308                 self._next = self._read()
00309             except ilog.EndOfLogError:
00310                 self._next = None
00311             if not self._next:
00312                 self._set_eof_pos()
00313                 self._vb_print('End of log during reading, current '\
00314                         'position is {0}.'.format(self._cur_pos))
00315             else:
00316                 self._update_cur_pos(self._next)
00317                 self._vb_print('Read entry, current position is ' \
00318                         '{0}.'.format(self._cur_pos))
00319             self._vb_print('Cached next entry is {0}'.format(self._next))
00320             return res
00321 
00322     def _seek_to_index(self, ind):
00323         '''Seeks forward or backward in the log to find the given index.'''
00324         if ind == self._cur_pos.index:
00325             self._vb_print('Seek by index: already at destination.')
00326             return
00327         if ind < 0:
00328             raise ilog.InvalidIndexError
00329         elif ind < self._cur_pos.index:
00330             # Rewind
00331             # TODO: Rewinding may be more efficient in many cases if done by
00332             # fast-forwarding from the start of the file rather than traversing
00333             # backwards.
00334             self._vb_print('Rewinding to index {0}.'.format(ind))
00335             while self._cur_pos.index > ind and self._cur_pos.index > 0:
00336                 self._backup_one()
00337         else:
00338             # Fast-forward
00339             self._vb_print('Fast-forwarding to index {0}.'.format(ind))
00340             while self._cur_pos.index < ind:
00341                 if not self.read():
00342                     break # EOF
00343         self._vb_print('New current position is {0}.'.format(self._cur_pos))
00344 
00345     def _seek_to_timestamp(self, ts):
00346         '''Seeks forward or backward in the log to find the given timestamp.'''
00347         if ts == self._cur_pos.ts and not self.eof:
00348             self._vb_print('Seek by timestamp: already at destination.')
00349             return
00350         elif ts < self._cur_pos.ts or self.eof:
00351             # Rewind
00352             self._vb_print('Rewinding to timestamp {0}.'.format(ts))
00353             while (self._cur_pos.ts > ts and self._cur_pos.index > 0) or \
00354                     self.eof:
00355                 self._backup_one()
00356             # Need to move one forward again, unless have hit the beginning
00357             if self._cur_pos.ts < ts:
00358                 self.read()
00359         else:
00360             self._vb_print('Fast-forwarding to timestamp {0}.'.format(ts))
00361             # Fast-forward
00362             while self._cur_pos.ts < ts:
00363                 if not self.read():
00364                     break # EOF
00365         self._vb_print('New current position is {0}.'.format(self._cur_pos))
00366 
00367     def _set_eof_pos(self):
00368         '''Sets the current position to the end-of-file value.'''
00369         self._vb_print('Setting EOF at file position {0}, prev cur pos '\
00370                 '{1}'.format(self._file.tell(), self._cur_pos))
00371         self._cur_pos.index += 1 # The "next" index
00372         # Don't touch the time stamp (indicates the end time of the file)
00373         self._cur_pos.prev = self._cur_pos.cache # This is the final entry
00374         self._cur_pos.cache = 0 # No valid entry at current file position
00375         self._cur_pos.fp = self._file.tell() # This is the end of the file
00376 
00377     def _set_start(self):
00378         # Save the current position
00379         current = self._file.tell()
00380         # Move to the start
00381         self._file.seek(0)
00382         # Skip the metadata block
00383         self._read()
00384         # Skip the buffer
00385         self._file.seek(self.BUFFER_SIZE, os.SEEK_CUR)
00386         # Read the first entry
00387         pos = self._file.tell()
00388         entry = self._read()
00389         self._start = CurPos(entry[self.INDEX], entry[self.TS],
00390                 entry[self.PREV], pos, self._file.tell())
00391         self._file.seek(current)
00392         self._vb_print('Measured start position: {0}'.format(self._start))
00393 
00394     def _update_cur_pos(self, val):
00395         '''Updates the current pos from a data entry.'''
00396         self._cur_pos.index = val[self.INDEX]
00397         self._cur_pos.ts = val[self.TS]
00398         self._cur_pos.prev = val[self.PREV]
00399         self._cur_pos.cache = self._cur_pos.fp
00400         self._cur_pos.fp = self._file.tell()
00401 
00402     def _write(self, data):
00403         '''Pickle some data and write it to the file.'''
00404         self._vb_print('Writing one data block.')
00405         pickle.dump(data, self._file, pickle.HIGHEST_PROTOCOL)
00406 


rtshell
Author(s): Geoffrey Biggs
autogenerated on Fri Aug 28 2015 12:55:12