00001
00002
00003
00004
00005 '''rtshell
00006
00007 Copyright (C) 2009-2014
00008 Geoffrey Biggs
00009 RT-Synthesis Research Group
00010 Intelligent Systems Research Institute,
00011 National Institute of Advanced Industrial Science and Technology (AIST),
00012 Japan
00013 All rights reserved.
00014 Licensed under the Eclipse Public License -v 1.0 (EPL)
00015 http://www.opensource.org/licenses/eclipse-1.0.txt
00016
00017 Pickle-based log.
00018
00019 '''
00020
00021
00022 import copy
00023 import os
00024 import pickle
00025 import traceback
00026
00027 import ilog
00028
00029
00030
00031
00032
00033 class CurPos(object):
00034 def __init__(self, index=0, timestamp=0, prev_pos=0, cache=0, file_pos=0):
00035 super(CurPos, self).__init__()
00036 self.index = index
00037 self.ts = timestamp
00038 self.prev = prev_pos
00039 self.cache = cache
00040 self.fp = file_pos
00041
00042 def __str__(self):
00043 return 'Index: {0}, timestamp: {1}, previous position: {2}, cache '\
00044 'position: {3}, file position: {4}'.format(self.index, self.ts,
00045 self.prev, self.cache, self.fp)
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057 class SimplePickleLog(ilog.Log):
00058
00059 INDEX = 0
00060 TS = 1
00061 DATA = 2
00062 FP = 3
00063 PREV = 4
00064
00065 BUFFER_SIZE = 256
00066
00067 def __init__(self, filename='', *args, **kwargs):
00068 self._is_open = False
00069 self._fn = filename
00070 self._cur_pos = CurPos()
00071 self._start = None
00072 self._end = None
00073 self._next = None
00074 self._write_ind = 0
00075 self._prev_pos = 0
00076 super(SimplePickleLog, self).__init__(*args, **kwargs)
00077
00078 def __str__(self):
00079 return 'SimplePickleLog({0}, {1}) at position {2}.'.format(self._fn,
00080 self._mode, self._cur_pos)
00081
00082 def write(self, timestamp, data):
00083 val = (self._write_ind, timestamp, data, self._file.tell(), self._prev_pos)
00084
00085 self._cur_pos.ts = timestamp
00086 self._end = copy.copy(self._cur_pos)
00087
00088 self._prev_pos = self._file.tell()
00089 self._write(val)
00090
00091 self._cur_pos.index = val[self.INDEX] + 1
00092 self._cur_pos.ts = -1
00093 self._cur_pos.prev = self._prev_pos
00094 self._cur_pos.cache = self._prev_pos
00095 self._cur_pos.fp = self._file.tell()
00096 self._write_ind += 1
00097 self._vb_print('Wrote entry at ({0}, {1}, {2}, {3}).'.format(
00098 val[self.INDEX], val[self.TS], val[self.FP], val[self.PREV]))
00099
00100 def read(self, timestamp=None, number=None):
00101 if number is not None:
00102 return self._read_number(number)
00103 elif timestamp is not None:
00104 return self._read_to_timestamp(timestamp)
00105 else:
00106 return self._read_single_entry()
00107
00108 def rewind(self):
00109 self._vb_print('Rewinding log from position {0}.'.format(
00110 self._cur_pos))
00111 if self._mode == 'r':
00112 self._file.seek(0)
00113 else:
00114 self._file.truncate()
00115 self._write_ind = 0
00116 self._init_log()
00117
00118 def seek(self, timestamp=None, index=None):
00119 self._vb_print('Seeking log from position {0}.'.format(self._cur_pos))
00120 if index is not None:
00121 self._seek_to_index(index)
00122 elif timestamp is not None:
00123 self._seek_to_timestamp(timestamp)
00124
00125 self._vb_print('New current position: {0}.'.format(self._cur_pos))
00126
00127 def _backup_one(self):
00128 '''Reverses in the log one entry.'''
00129 self._vb_print('Backing up one entry from {0}.'.format(self._cur_pos))
00130 if self._cur_pos.index == 0:
00131
00132 self._vb_print('Backup already at start.')
00133 return
00134 else:
00135 self._next = None
00136 target = self._cur_pos.prev
00137
00138 self._file.seek(target)
00139
00140 self._next = self._read()
00141 self._update_cur_pos(self._next)
00142 self._vb_print('New current position: {0}.'.format(self._cur_pos))
00143
00144 def _close(self):
00145 if not self._is_open:
00146 return
00147 if self._mode == 'w':
00148
00149 self._file.seek(0)
00150 self._file.seek(self._buf_start)
00151 self._write(self._end)
00152 self._vb_print('Wrote end pointer: {0}'.format(self._end))
00153 self._file.close()
00154 self._is_open = False
00155 self._start = None
00156 self._end = None
00157 self._vb_print('Closed file.')
00158
00159 def _eof(self):
00160 return self._next is None
00161
00162 def _get_cur_pos(self):
00163 self._vb_print('Current position: {0}'.format(self._cur_pos))
00164 return self._cur_pos.index, self._cur_pos.ts
00165
00166 def _get_start(self):
00167 if self._start is None:
00168 self._set_start()
00169 self._vb_print('Start position: {0}'.format(self._start))
00170 return (self._start.index, self._start.ts)
00171
00172 def _get_end(self):
00173 self._vb_print('End position: {0}'.format(self._end))
00174 return (self._end.index, self._end.ts)
00175
00176 def _init_log(self):
00177 if self._mode == 'r':
00178 self._vb_print('Initialising log for reading.')
00179
00180 self._meta = self._read()
00181 pos = self._file.tell()
00182
00183 self._end = self._read()
00184
00185 self._file.seek(pos + self.BUFFER_SIZE)
00186 self._vb_print('Read end position: {0}'.format(self._end))
00187
00188 self._set_start()
00189 self._cur_pos = copy.copy(self._start)
00190
00191 self._next = self._read()
00192 else:
00193 self._vb_print('Initialising log for writing.')
00194
00195 self._write(self._meta)
00196 self._vb_print('Wrote meta data of length {0}'.format(
00197 self._file.tell()))
00198 self._buf_start = self._file.tell()
00199
00200 self._file.write(''.ljust(self.BUFFER_SIZE))
00201 self._vb_print('Wrote buffer of length {0} at position {1}'.format(
00202 self.BUFFER_SIZE, self._buf_start))
00203 self._write_ind = 0
00204 self._prev_pos = 0
00205 self._cur_pos = CurPos(file_pos=self._file.tell())
00206 self._vb_print('First entry will be written at {0}'.format(
00207 self._cur_pos))
00208
00209 def _open(self):
00210 if self._is_open:
00211 return
00212 if self._mode == 'r':
00213 flags = 'rb'
00214 elif self._mode == 'w':
00215 flags = 'wb'
00216 else:
00217 raise NotImplementedError
00218 self._file = open(self._fn, flags)
00219 self._init_log()
00220 self._is_open = True
00221 self._vb_print('Opened file {0} in mode {1}.'.format(self._fn,
00222 self._mode))
00223
00224 def _read(self):
00225 '''Read a single entry from the log.'''
00226 self._vb_print('Reading one data block at {0}.'.format(
00227 self._file.tell()))
00228 try:
00229 data = pickle.load(self._file)
00230 except EOFError:
00231 self._vb_print('End of log reached.')
00232 raise ilog.EndOfLogError
00233 return data
00234
00235 def _read_number(self, number):
00236 self._vb_print('Reading {0} entries.'.format(number))
00237 res = []
00238 if number < 0:
00239 raise ValueError
00240 if not self._next:
00241 self._vb_print('End of log before reading.')
00242 return []
00243 try:
00244 for ii in range(number):
00245 res.append((self._next[self.INDEX], self._next[self.TS],
00246 self._next[self.DATA]))
00247 self._next = self._read()
00248 if not self._next:
00249 self._set_eof_pos()
00250 self._vb_print('End of log during reading, current '\
00251 'position is {1}.'.format(self._cur_pos))
00252 break
00253 self._update_cur_pos(self._next)
00254 self._vb_print('Read entry {0} of {1}, current position '\
00255 'is {2}.'.format(ii + 1, number, self._cur_pos))
00256 except ilog.EndOfLogError:
00257 self._set_eof_pos()
00258 self._next = None
00259 self._vb_print('End of log while reading, current '\
00260 'position is {0}.'.format(self._cur_pos))
00261 self._vb_print('Finished reading; current position is ' \
00262 '{0}.'.format(self._cur_pos))
00263 return res
00264
00265 def _read_to_timestamp(self, timestamp):
00266 self._vb_print('Reading until time stamp {0}.'.format(timestamp))
00267 res = []
00268 if timestamp < 0:
00269 raise ValueError
00270 if not self._next:
00271 self._vb_print('End of log before reading.')
00272 return []
00273 if self._cur_pos.ts > timestamp:
00274
00275 self._vb_print('Current position is beyond the time limit.')
00276 return []
00277 try:
00278 while self._next[self.TS] <= timestamp:
00279 res.append((self._next[self.INDEX], self._next[self.TS],
00280 self._next[self.DATA]))
00281 self._next = self._read()
00282 if not self._next:
00283 self._set_eof_pos()
00284 self._vb_print('End of log during reading, current '\
00285 'position is {1}.'.format(self._cur_pos))
00286 break
00287 self._update_cur_pos(self._next)
00288 self._vb_print('Read entry at time index {0}, current '\
00289 'position is {1}.'.format(res[-1][1], self._cur_pos))
00290 except ilog.EndOfLogError:
00291 self._set_eof_pos()
00292 self._next = None
00293 self._vb_print('End of log while reading, current '\
00294 'position is {0}.'.format(self._cur_pos))
00295 self._vb_print('Finished reading; current position is ' \
00296 '{0}.'.format(self._cur_pos))
00297 return res
00298
00299 def _read_single_entry(self):
00300 self._vb_print('Reading a single entry.')
00301 if not self._next:
00302 self._vb_print('End of log before reading.')
00303 return []
00304 else:
00305 res = [(self._next[self.INDEX], self._next[self.TS],
00306 self._next[self.DATA])]
00307 try:
00308 self._next = self._read()
00309 except ilog.EndOfLogError:
00310 self._next = None
00311 if not self._next:
00312 self._set_eof_pos()
00313 self._vb_print('End of log during reading, current '\
00314 'position is {0}.'.format(self._cur_pos))
00315 else:
00316 self._update_cur_pos(self._next)
00317 self._vb_print('Read entry, current position is ' \
00318 '{0}.'.format(self._cur_pos))
00319 self._vb_print('Cached next entry is {0}'.format(self._next))
00320 return res
00321
00322 def _seek_to_index(self, ind):
00323 '''Seeks forward or backward in the log to find the given index.'''
00324 if ind == self._cur_pos.index:
00325 self._vb_print('Seek by index: already at destination.')
00326 return
00327 if ind < 0:
00328 raise ilog.InvalidIndexError
00329 elif ind < self._cur_pos.index:
00330
00331
00332
00333
00334 self._vb_print('Rewinding to index {0}.'.format(ind))
00335 while self._cur_pos.index > ind and self._cur_pos.index > 0:
00336 self._backup_one()
00337 else:
00338
00339 self._vb_print('Fast-forwarding to index {0}.'.format(ind))
00340 while self._cur_pos.index < ind:
00341 if not self.read():
00342 break
00343 self._vb_print('New current position is {0}.'.format(self._cur_pos))
00344
00345 def _seek_to_timestamp(self, ts):
00346 '''Seeks forward or backward in the log to find the given timestamp.'''
00347 if ts == self._cur_pos.ts and not self.eof:
00348 self._vb_print('Seek by timestamp: already at destination.')
00349 return
00350 elif ts < self._cur_pos.ts or self.eof:
00351
00352 self._vb_print('Rewinding to timestamp {0}.'.format(ts))
00353 while (self._cur_pos.ts > ts and self._cur_pos.index > 0) or \
00354 self.eof:
00355 self._backup_one()
00356
00357 if self._cur_pos.ts < ts:
00358 self.read()
00359 else:
00360 self._vb_print('Fast-forwarding to timestamp {0}.'.format(ts))
00361
00362 while self._cur_pos.ts < ts:
00363 if not self.read():
00364 break
00365 self._vb_print('New current position is {0}.'.format(self._cur_pos))
00366
00367 def _set_eof_pos(self):
00368 '''Sets the current position to the end-of-file value.'''
00369 self._vb_print('Setting EOF at file position {0}, prev cur pos '\
00370 '{1}'.format(self._file.tell(), self._cur_pos))
00371 self._cur_pos.index += 1
00372
00373 self._cur_pos.prev = self._cur_pos.cache
00374 self._cur_pos.cache = 0
00375 self._cur_pos.fp = self._file.tell()
00376
00377 def _set_start(self):
00378
00379 current = self._file.tell()
00380
00381 self._file.seek(0)
00382
00383 self._read()
00384
00385 self._file.seek(self.BUFFER_SIZE, os.SEEK_CUR)
00386
00387 pos = self._file.tell()
00388 entry = self._read()
00389 self._start = CurPos(entry[self.INDEX], entry[self.TS],
00390 entry[self.PREV], pos, self._file.tell())
00391 self._file.seek(current)
00392 self._vb_print('Measured start position: {0}'.format(self._start))
00393
00394 def _update_cur_pos(self, val):
00395 '''Updates the current pos from a data entry.'''
00396 self._cur_pos.index = val[self.INDEX]
00397 self._cur_pos.ts = val[self.TS]
00398 self._cur_pos.prev = val[self.PREV]
00399 self._cur_pos.cache = self._cur_pos.fp
00400 self._cur_pos.fp = self._file.tell()
00401
00402 def _write(self, data):
00403 '''Pickle some data and write it to the file.'''
00404 self._vb_print('Writing one data block.')
00405 pickle.dump(data, self._file, pickle.HIGHEST_PROTOCOL)
00406