00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030 from time import sleep, time
00031 import thread, os, sys
00032 from subprocess import Popen, PIPE
00033
00034 debug = 1
00035
00036 def dprint(s):
00037 if debug:
00038 print s
00039
00040 class ActivityError(StandardError):
00041 def __init__(self, errno=0, msg=None):
00042 self.errno = errno
00043 if msg:
00044 self.msg = msg
00045 else:
00046 self.msg = self.messages[errno]
00047
00048 def __repr__(self):
00049 return self.msg
00050
00051 def __str__(self):
00052 return self.msg
00053
00054 messages = []
00055
00056 def addError(msg = "error"):
00057 ActivityError.messages.append(msg)
00058 return len(ActivityError.messages) - 1
00059
00060 errUnknown = addError("unknown error")
00061 errTimeout = addError("activity has timed out")
00062 errStopped = addError("activity has been stopped")
00063
00064 AS_RUNNING=0
00065 AS_STOPPING=1
00066 AS_ENDED=2
00067 AS_STOPPED=3
00068 AS_TIMEOUT=4
00069 AS_FAILED = 5
00070
00071 def altWait(*activities):
00072 "waits until at least one of the activities has ended"
00073 dprint("waiting for following activities")
00074 waitLock = thread.allocate_lock()
00075 for a in activities:
00076 dprint(" %s" % a.name)
00077 a.runLock.addWaitLock(waitLock)
00078 waitLock.acquire()
00079
00080 def altWaitCycle(*activities):
00081 "waits until the current cycle of at least one of the activities has ended"
00082 dprint("waiting for following activities")
00083 waitLock = thread.allocate_lock()
00084 for a in activities:
00085 dprint(" %s" % a.name)
00086 a.cycleLock.addWaitLock(waitLock)
00087 waitLock.acquire()
00088
00089 def check():
00090 "checks, if there is a request to stop the current activity"
00091 a = Async.activityTable.get(thread.get_ident())
00092 if a != None:
00093 if a.state == AS_STOPPING:
00094 raise ActivityError(errStopped)
00095 a.checkTimeout()
00096
00097 def csleep(duration):
00098 endTime = time() + duration
00099 while duration > 0:
00100 sleep(min(duration, 0.1))
00101 check()
00102 duration = endTime - time()
00103
00104 class _MultiLock:
00105 def __init__(self):
00106 self.intLock = thread.allocate_lock()
00107 self.waitLocks = []
00108 self.locked = 1
00109
00110 def addWaitLock(self, waitLock):
00111 self.intLock.acquire()
00112 if self.locked:
00113 self.waitLocks.append(waitLock)
00114 if not waitLock.locked():
00115 waitLock.acquire()
00116 else:
00117 if waitLock.locked():
00118 waitLock.release()
00119 self.intLock.release()
00120
00121 def unlock(self):
00122 self.intLock.acquire()
00123 self.locked = 0
00124 for lock in self.waitLocks:
00125 try:
00126 lock.release()
00127 except thread.error:
00128 pass
00129 self.waitLocks = []
00130 self.intLock.release()
00131
00132 def unlockLock(self):
00133 self.intLock.acquire()
00134 for lock in self.waitLocks:
00135 lock.release()
00136 self.waitLocks = []
00137 self.intLock.release()
00138
00139 def wait(self):
00140 waitLock = thread.allocate_lock()
00141 self.addWaitLock(waitLock)
00142
00143 while waitLock.locked():
00144 check()
00145 sleep(0.1)
00146 waitLock.acquire()
00147
00148 def locked(self):
00149 return self.locked
00150
00151
00152 class Async:
00153 "one shot activity"
00154 activityTable = {}
00155
00156 def __init__(self, func, args, timeout=None, name=None):
00157 """
00158 func: function to be executed
00159 args: tuple of the arguments to be passed to the function
00160 timeout: time, after which the function is automatically stopped
00161 name: name of the activity (for debug)"""
00162 self.func = func
00163
00164 if timeout:
00165 self.endTime = time() + timeout
00166 else:
00167 self.endTime = None
00168
00169 if name:
00170 self.name = name
00171 else:
00172 self.name = "%s%s" % (func.__name__, args)
00173
00174 self.runLock = _MultiLock()
00175 self.returnVal = None
00176 self.state = AS_RUNNING
00177
00178 dprint("starting activity %s" % self.name)
00179 self.ident = thread.start_new_thread(self.run, (args,))
00180 Async.activityTable[self.ident] = self
00181
00182 def __repr__(self):
00183 return self.name
00184
00185 def run(self, args):
00186 try:
00187 self.returnVal = apply(self.func, args)
00188 dprint("activity %s ended" % self.name)
00189 self.state = AS_ENDED
00190 except ActivityError, e:
00191 if e.errno == errStopped:
00192 dprint("activity %s stopped" % self.name)
00193 self.state = AS_STOPPED
00194 elif e.errno == errTimeout:
00195 dprint("activity %s timed out" % self.name)
00196 self.state = AS_TIMEOUT
00197 else:
00198 dprint("activity %s failed: %s" % (self.name, e.msg))
00199 self.state = AS_FAILED
00200
00201 self.runLock.unlock()
00202
00203 del Async.activityTable[self.ident]
00204
00205 def getState(self):
00206 "returns the current execution state of the activity"
00207 return self.state
00208
00209 def getReturnVal(self):
00210 """
00211 returns the return value of the function, after this has successfully
00212 ended"""
00213 return self.returnVal
00214
00215 def stop(self):
00216 "advise the activity to stop itself"
00217 dprint("stopping activity %s ..." % self.name)
00218 self.state = AS_STOPPING
00219
00220
00221 def wait(self):
00222 """
00223 wait, until the activity has ended
00224 returns the returnvalue of the executed function"""
00225 dprint("waiting for activity %s ..." % self.name)
00226 self.runLock.wait()
00227 if self.state == AS_STOPPED:
00228 raise ActivityError(errStopped)
00229 elif self.state == AS_TIMEOUT:
00230 raise ActivityError(errTimeout)
00231 return self.returnVal
00232
00233 def checkTimeout(self):
00234 if self.endTime:
00235 if time() > self.endTime:
00236 raise ActivityError(errTimeout)
00237
00238 class AsyncCyclic(Async):
00239 "cyclic activity"
00240 def __init__(self, func, args, timeout=None, cycleTime=0, name=None):
00241 """
00242 func: function to be executed cyclically
00243 args: tuple of the arguments to be passed to the function
00244 timeout: time, after which the function is automatically stopped
00245 cycleTime: cycle time, in which the function is executed
00246 name: name of the activity (for debug)"""
00247 if not name:
00248 name = "cyclic %s%s" % (func.__name__, args)
00249 self.cyclicFunc = func
00250 self.cycleTime = cycleTime
00251 self.cycleLock = _MultiLock()
00252 self.cycleLock.lock()
00253 self.lastReturnVal = None
00254 Async.__init__(self, self.runCyclic, (args,), timeout, name)
00255
00256 def runCyclic(self, args):
00257 while 1:
00258 self.lastReturnVal = apply(self.cyclicFunc, args)
00259 self.cycleLock.unlockLock()
00260 check()
00261 if self.cycleTime:
00262 sleep(self.cycleTime)
00263
00264 def waitCycle(self):
00265 """
00266 wait, until the current cycle has ended
00267 returns the return value of the function"""
00268 self.cycleLock.wait()
00269 return self.lastReturnVal
00270
00271 def waitCond(self, cond=lambda x: x):
00272 """
00273 wait, until the return value of the function satisfies a condition
00274 cond: boolean function to be applied on the return value of the
00275 cyclic function
00276 returns the last return value (which satisfies the condition)"""
00277 while not cond(self.waitCycle()):
00278 pass
00279 return self.lastReturnVal
00280
00281 def getLastReturnVal(self):
00282 "returns the return value of the last successful cycle"
00283 return self.lastReturnVal
00284
00285
00286 class AsyncProc(Async):
00287
00288 if sys.platform == 'win32' :
00289 if os.system("taskkill -? > nul")==0:
00290 myKill = lambda self: os.system("taskkill /F /T /PID " + str(self.pid) + " >nul")
00291 else:
00292
00293 print 'Please install "taskkill"'
00294 myKill = lambda self: None
00295 else:
00296 myKill = lambda self: os.kill(self.pid)
00297
00298 def __init__(self, command, procName=None, input=None, timeout=None, name=None):
00299 self.procName = procName
00300 if name == None:
00301 name = command
00302 Async.__init__(self, self.execute, (command, input), timeout, name)
00303 if timeout:
00304 thread.start_new_thread(self.killThread, (timeout,))
00305
00306 def execute(self, command, input):
00307 dprint("execute", command, input)
00308
00309 pp = Popen(command.split(),stdin=PIPE, stdout=PIPE, stderr=PIPE)
00310 self.pid = pp.pid
00311 dprint("Popen successful")
00312
00313 if input:
00314 pp.stdin.write(input)
00315 pp.stdin.close()
00316
00317 out = pp.stdout.read()
00318 err = pp.stderr.read()
00319
00320 if self.state == AS_STOPPING:
00321 raise ActivityError(errStopped)
00322 elif self.state == AS_TIMEOUT:
00323 raise ActivityError(errTimeout)
00324 return out, err
00325
00326 def stop(self):
00327 "kill process"
00328 Async.stop(self)
00329 self.myKill()
00330
00331 def killThread(self, timeout):
00332 endTime = time() + timeout
00333 while time() < endTime:
00334 sleep(0.5)
00335 if self.state != AS_RUNNING:
00336 break
00337 else:
00338 self.state = AS_TIMEOUT
00339
00340 self.myKill()