$search
00001 ########################################################################## 00002 # # 00003 # GO - Eg(G)secution Framework for Robotics Applicati(O)ns # 00004 # # 00005 # Release: 040707 # 00006 # # 00007 # (C) Copyright 2005 Fraunhofer Institute # 00008 # Manufacturing Engineering # 00009 # --- / and Automation # 00010 # / \ # 00011 # ========= # 00012 # | | Winfried Baum <wmb(at)ipa.fraunhofer.de> # 00013 # =========<< Christopher Parlitz <cip(at)ipa.fraunhofer.de> # 00014 # \ / # 00015 # --- # 00016 # # 00017 # GO is free software; you can redistribute it and/or modify # 00018 # it under the terms of the GNU General Public License as published by # 00019 # the Free Software Foundation; either version 2 of the License, or # 00020 # (at your option) any later version. # 00021 # # 00022 # GO is distributed in the hope that it will be useful, # 00023 # but WITHOUT ANY WARRANTY; without even the implied warranty of # 00024 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # 00025 # GNU General Public License for more details. # 00026 # # 00027 ########################################################################## 00028 00029 00030 from time import sleep, time 00031 import thread, os, sys 00032 from subprocess import Popen, PIPE 00033 00034 debug = 1 00035 00036 def dprint(s): 00037 if debug: 00038 print s 00039 00040 class ActivityError(StandardError): 00041 def __init__(self, errno=0, msg=None): 00042 self.errno = errno 00043 if msg: 00044 self.msg = msg 00045 else: 00046 self.msg = self.messages[errno] 00047 00048 def __repr__(self): 00049 return self.msg 00050 00051 def __str__(self): 00052 return self.msg 00053 00054 messages = [] 00055 00056 def addError(msg = "error"): 00057 ActivityError.messages.append(msg) 00058 return len(ActivityError.messages) - 1 00059 00060 errUnknown = addError("unknown error") 00061 errTimeout = addError("activity has timed out") 00062 errStopped = addError("activity has been stopped") 00063 00064 AS_RUNNING=0 00065 AS_STOPPING=1 00066 AS_ENDED=2 00067 AS_STOPPED=3 00068 AS_TIMEOUT=4 00069 AS_FAILED = 5 00070 00071 def altWait(*activities): 00072 "waits until at least one of the activities has ended" 00073 dprint("waiting for following activities") 00074 waitLock = thread.allocate_lock() 00075 for a in activities: 00076 dprint(" %s" % a.name) 00077 a.runLock.addWaitLock(waitLock) 00078 waitLock.acquire() 00079 00080 def altWaitCycle(*activities): 00081 "waits until the current cycle of at least one of the activities has ended" 00082 dprint("waiting for following activities") 00083 waitLock = thread.allocate_lock() 00084 for a in activities: 00085 dprint(" %s" % a.name) 00086 a.cycleLock.addWaitLock(waitLock) 00087 waitLock.acquire() 00088 00089 def check(): 00090 "checks, if there is a request to stop the current activity" 00091 a = Async.activityTable.get(thread.get_ident()) 00092 if a != None: 00093 if a.state == AS_STOPPING: 00094 raise ActivityError(errStopped) 00095 a.checkTimeout() 00096 00097 def csleep(duration): 00098 endTime = time() + duration 00099 while duration > 0: 00100 sleep(min(duration, 0.1)) 00101 check() 00102 duration = endTime - time() 00103 00104 class _MultiLock: 00105 def __init__(self): 00106 self.intLock = thread.allocate_lock() 00107 self.waitLocks = [] 00108 self.locked = 1 00109 00110 def addWaitLock(self, waitLock): 00111 self.intLock.acquire() 00112 if self.locked: 00113 self.waitLocks.append(waitLock) 00114 if not waitLock.locked(): 00115 waitLock.acquire() 00116 else: 00117 if waitLock.locked(): 00118 waitLock.release() 00119 self.intLock.release() 00120 00121 def unlock(self): 00122 self.intLock.acquire() 00123 self.locked = 0 00124 for lock in self.waitLocks: 00125 try: 00126 lock.release() 00127 except thread.error: 00128 pass 00129 self.waitLocks = [] 00130 self.intLock.release() 00131 00132 def unlockLock(self): 00133 self.intLock.acquire() 00134 for lock in self.waitLocks: 00135 lock.release() 00136 self.waitLocks = [] 00137 self.intLock.release() 00138 00139 def wait(self): 00140 waitLock = thread.allocate_lock() 00141 self.addWaitLock(waitLock) 00142 # TODO to be changed later 00143 while waitLock.locked(): 00144 check() 00145 sleep(0.1) 00146 waitLock.acquire() 00147 00148 def locked(self): 00149 return self.locked 00150 00151 00152 class Async: 00153 "one shot activity" 00154 activityTable = {} 00155 00156 def __init__(self, func, args, timeout=None, name=None): 00157 """ 00158 func: function to be executed 00159 args: tuple of the arguments to be passed to the function 00160 timeout: time, after which the function is automatically stopped 00161 name: name of the activity (for debug)""" 00162 self.func = func 00163 00164 if timeout: 00165 self.endTime = time() + timeout 00166 else: 00167 self.endTime = None 00168 00169 if name: 00170 self.name = name 00171 else: 00172 self.name = "%s%s" % (func.__name__, args) 00173 00174 self.runLock = _MultiLock() 00175 self.returnVal = None 00176 self.state = AS_RUNNING 00177 00178 dprint("starting activity %s" % self.name) 00179 self.ident = thread.start_new_thread(self.run, (args,)) 00180 Async.activityTable[self.ident] = self 00181 00182 def __repr__(self): 00183 return self.name 00184 00185 def run(self, args): 00186 try: 00187 self.returnVal = apply(self.func, args) 00188 dprint("activity %s ended" % self.name) 00189 self.state = AS_ENDED 00190 except ActivityError, e: 00191 if e.errno == errStopped: 00192 dprint("activity %s stopped" % self.name) 00193 self.state = AS_STOPPED 00194 elif e.errno == errTimeout: 00195 dprint("activity %s timed out" % self.name) 00196 self.state = AS_TIMEOUT 00197 else: 00198 dprint("activity %s failed: %s" % (self.name, e.msg)) 00199 self.state = AS_FAILED 00200 00201 self.runLock.unlock() 00202 00203 del Async.activityTable[self.ident] 00204 00205 def getState(self): 00206 "returns the current execution state of the activity" 00207 return self.state 00208 00209 def getReturnVal(self): 00210 """ 00211 returns the return value of the function, after this has successfully 00212 ended""" 00213 return self.returnVal 00214 00215 def stop(self): 00216 "advise the activity to stop itself" 00217 dprint("stopping activity %s ..." % self.name) 00218 self.state = AS_STOPPING 00219 ### stop also all Asyncs on which we wait 00220 00221 def wait(self): 00222 """ 00223 wait, until the activity has ended 00224 returns the returnvalue of the executed function""" 00225 dprint("waiting for activity %s ..." % self.name) 00226 self.runLock.wait() 00227 if self.state == AS_STOPPED: 00228 raise ActivityError(errStopped) 00229 elif self.state == AS_TIMEOUT: 00230 raise ActivityError(errTimeout) 00231 return self.returnVal 00232 00233 def checkTimeout(self): 00234 if self.endTime: 00235 if time() > self.endTime: 00236 raise ActivityError(errTimeout) 00237 00238 class AsyncCyclic(Async): 00239 "cyclic activity" 00240 def __init__(self, func, args, timeout=None, cycleTime=0, name=None): 00241 """ 00242 func: function to be executed cyclically 00243 args: tuple of the arguments to be passed to the function 00244 timeout: time, after which the function is automatically stopped 00245 cycleTime: cycle time, in which the function is executed 00246 name: name of the activity (for debug)""" 00247 if not name: 00248 name = "cyclic %s%s" % (func.__name__, args) 00249 self.cyclicFunc = func 00250 self.cycleTime = cycleTime 00251 self.cycleLock = _MultiLock() 00252 self.cycleLock.lock() 00253 self.lastReturnVal = None 00254 Async.__init__(self, self.runCyclic, (args,), timeout, name) 00255 00256 def runCyclic(self, args): 00257 while 1: 00258 self.lastReturnVal = apply(self.cyclicFunc, args) 00259 self.cycleLock.unlockLock() 00260 check() 00261 if self.cycleTime: 00262 sleep(self.cycleTime) 00263 00264 def waitCycle(self): 00265 """ 00266 wait, until the current cycle has ended 00267 returns the return value of the function""" 00268 self.cycleLock.wait() 00269 return self.lastReturnVal 00270 00271 def waitCond(self, cond=lambda x: x): 00272 """ 00273 wait, until the return value of the function satisfies a condition 00274 cond: boolean function to be applied on the return value of the 00275 cyclic function 00276 returns the last return value (which satisfies the condition)""" 00277 while not cond(self.waitCycle()): 00278 pass 00279 return self.lastReturnVal 00280 00281 def getLastReturnVal(self): 00282 "returns the return value of the last successful cycle" 00283 return self.lastReturnVal 00284 00285 00286 class AsyncProc(Async): 00287 00288 if sys.platform == 'win32' : #if os is broken 00289 if os.system("taskkill -? > nul")==0: 00290 myKill = lambda self: os.system("taskkill /F /T /PID " + str(self.pid) + " >nul") 00291 else: 00292 # Install a kill function 00293 print 'Please install "taskkill"' 00294 myKill = lambda self: None 00295 else: #Linux 00296 myKill = lambda self: os.kill(self.pid) 00297 00298 def __init__(self, command, procName=None, input=None, timeout=None, name=None): 00299 self.procName = procName 00300 if name == None: 00301 name = command 00302 Async.__init__(self, self.execute, (command, input), timeout, name) 00303 if timeout: 00304 thread.start_new_thread(self.killThread, (timeout,)) 00305 00306 def execute(self, command, input): 00307 dprint("execute", command, input) 00308 # fout, fin, ferr = popen2.popen3(command) 00309 pp = Popen(command.split(),stdin=PIPE, stdout=PIPE, stderr=PIPE) 00310 self.pid = pp.pid 00311 dprint("Popen successful") 00312 00313 if input: 00314 pp.stdin.write(input) 00315 pp.stdin.close() 00316 00317 out = pp.stdout.read() 00318 err = pp.stderr.read() 00319 00320 if self.state == AS_STOPPING: 00321 raise ActivityError(errStopped) 00322 elif self.state == AS_TIMEOUT: 00323 raise ActivityError(errTimeout) 00324 return out, err 00325 00326 def stop(self): 00327 "kill process" 00328 Async.stop(self) 00329 self.myKill() 00330 00331 def killThread(self, timeout): 00332 endTime = time() + timeout 00333 while time() < endTime: 00334 sleep(0.5) 00335 if self.state != AS_RUNNING: 00336 break 00337 else: 00338 self.state = AS_TIMEOUT 00339 #os.system(AsyncProc.killApp + " " + self.procName) 00340 self.myKill()