program_queue.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 #
00003 # A program management and queueing system with a ROS service interface
00004 #
00005 # Author: Austin Hendrix
00006 
00007 # Design notes:
00008 #  At the moment, the db backend is sqlite. In the process of making things
00009 #   concise I think I've abstacted this to the point that it could be easily
00010 #   changed
00011 #  Token IDs are random 63-bit numbers.
00012 #  * When there are 4.31E8 tokens in existence, the probability of a collision
00013 #    is 0.01
00014 #  * When there are 1 million tokens in existence, the probability of a
00015 #    collision is 5.4E-8
00016 #  * See the Birthday problem for more in-depth coverage of the statistics
00017 #    that makes this possible
00018 #  * Token generation is checked and will retry until a new unique token is 
00019 #    created
00020 #  
00021 #
00022 # TODO: 
00023 #  * more validation
00024 #  * add username to ProgramInfo
00025 
00026 
00027 import roslib; roslib.load_manifest('program_queue')
00028 import rospkg
00029 import rospy
00030 
00031 from program_queue.srv import *
00032 from program_queue.msg import *
00033 from std_srvs.srv import Empty
00034 from std_msgs.msg import Header
00035 
00036 import sqlite3
00037 import bcrypt
00038 import random
00039 import subprocess
00040 import base64
00041 
00042 class Queue:
00043    def __init__(self):
00044       # set up database
00045 
00046       # get the database path from the parameter server; or default to the
00047       #  user's ~/.ros/ directory
00048       self.dbpath = rospy.get_param('~dbpath', rospkg.get_ros_home() + '/program_queue.db')
00049 
00050       db = sqlite3.connect(self.dbpath)
00051 
00052       # create tables if they don't exist. 
00053       db.execute('create table if not exists users(id integer primary key asc autoincrement, name text unique not null, password_hash text not null, admin int not null)')
00054       db.execute('create table if not exists tokens(id integer primary key, user_id integer references users(id))') # TODO: add an issue/expiration date to tokens
00055       db.execute('create table if not exists programs(id integer primary key asc autoincrement, user_id integer references users(id), name text default "myprogram" not null, type integer, code text default "" not null)')
00056       db.execute('create table if not exists output(id integer primary key asc autoincrement, program_id integer references programs(id) not null, time double not null, output text not null)')
00057       db.execute('create table if not exists queue(id integer primary key asc autoincrement, program_id integer unique references programs(id))')
00058 
00059       # create an admin user if one doesn't exist
00060       admin_hash = bcrypt.hashpw('admin', bcrypt.gensalt())
00061       db.execute("insert or ignore into users (name, password_hash, admin) values (?, ?, ?)", ('admin', admin_hash, 1,))
00062 
00063       db.commit()
00064       db.close()
00065 
00066       # set up services
00067       rospy.Service('clear_queue',     ClearQueue,     self.handle_clear_queue)
00068       rospy.Service('create_program',  CreateProgram,  
00069             self.handle_create_program)
00070       rospy.Service('create_user',     CreateUser,     self.handle_create_user)
00071       rospy.Service('dequeue_program', DequeueProgram, 
00072             self.handle_dequeue_program)
00073       rospy.Service('get_my_programs', GetMyPrograms,  
00074             self.handle_get_my_programs)
00075       rospy.Service('get_output',      GetOutput,      self.handle_get_output)
00076       rospy.Service('get_program',     GetProgram,     self.handle_get_program)
00077       rospy.Service('get_programs',    GetPrograms,    self.handle_get_programs)
00078       rospy.Service('get_queue',       GetQueue,       self.handle_get_queue)
00079       rospy.Service('login',           Login,          self.handle_login)
00080       rospy.Service('logout',          Logout,         self.handle_logout)
00081       rospy.Service('queue_program',   QueueProgram,   
00082             self.handle_queue_program)
00083       rospy.Service('run_program',     RunProgram,     self.handle_run_program)
00084       rospy.Service('update_program',  UpdateProgram, 
00085             self.handle_update_program)
00086 
00087       rospy.Service('start_queue',     Empty,          self.handle_start_queue)
00088       rospy.Service('stop_queue',     Empty,           self.handle_stop_queue)
00089 
00090       rospy.wait_for_service('run_slider_program')
00091 
00092       rospy.loginfo("Queue ready")
00093 
00094    def db(self):
00095       return sqlite3.connect(self.dbpath)
00096 
00097    class User:
00098       def __init__(self, id, name, pwhash, admin):
00099          self.id = id
00100          self.name = name
00101          self.pwhash = pwhash
00102          self.admin = admin
00103 
00104    def get_user(self, db, token):
00105       # take a token and return a User object
00106       cur = db.cursor()
00107       cur.execute('select users.id, users.name, users.password_hash, users.admin from users join tokens on users.id = tokens.user_id where tokens.id = ?', (token,))
00108       row = cur.fetchone()
00109       if row:
00110          return Queue.User(row[0], row[1], row[2], row[3])
00111       else:
00112          # no tokens; return none
00113          return None
00114 
00115    def get_program_info(self, db, program_id):
00116       # take a program_id and return a Program object
00117       cur = db.cursor()
00118       cur.execute('select programs.id, programs.user_id, programs.name, programs.type, users.name from programs join users on programs.user_id = users.id where programs.id = ?', (program_id,))
00119       row = cur.fetchone()
00120       if row:
00121          return (row[1], ProgramInfo(row[0], row[2].encode('ascii'), row[3], row[4].encode('ascii')))
00122       else:
00123          return (None, None)
00124 
00125    def get_program(self, db, program_id):
00126       # take a program_id and return a Program object
00127       cur = db.cursor()
00128       cur.execute('select programs.id, programs.user_id, programs.name, programs.type, programs.code, users.name from programs join users on programs.user_id = users.id where programs.id = ?', (program_id,))
00129       row = cur.fetchone()
00130       if row:
00131          return (row[1], Program(ProgramInfo(row[0], row[2].encode('ascii'), row[3], row[5].encode('ascii')), row[4].encode('ascii')))
00132       else:
00133          return (None, None)
00134 
00135    def token(self, cur, uid):
00136       rows = 0
00137       t = 0
00138       while rows < 1:
00139          # generate a random 63-bit token
00140          t = random.getrandbits(63)
00141          # ensure that our token is never 0
00142          while t == 0:
00143             t = random.getrandbits(63)
00144          cur.execute('insert or ignore into tokens values (?, ?)', (t, uid))
00145          rows = cur.rowcount
00146          if rows < 1:
00147             rospy.logwarn("Tried to insert duplicate token %d"%t)
00148       return t
00149 
00150    def handle_clear_queue(self, req):
00151       db = self.db()
00152       user = self.get_user(db, req.token)
00153       if user and user.admin:
00154          db.execute('delete from queue')
00155          db.commit()
00156       elif user:
00157          rospy.loginfo("ClearQueue called by non-admin user %s" % user.name)
00158       db.close()
00159       return ClearQueueResponse()
00160 
00161    def handle_create_program(self, req):
00162       db = self.db()
00163       user = self.get_user(db, req.token)
00164       if user:
00165          rospy.loginfo("Creating new program for user %s"%user.name)
00166       else:
00167          # TODO: handle this failure case better
00168          rospy.logwarn("No user for token %d"%req.token)
00169          return False
00170 
00171       c = db.cursor()
00172       c.execute('insert into programs (user_id) values (?)', (user.id,))
00173       program_id = c.lastrowid
00174       c.close()
00175       db.commit()
00176       db.close()
00177       return CreateProgramResponse(program_id)
00178 
00179    def handle_create_user(self, req):
00180       pwhash = bcrypt.hashpw(req.password, bcrypt.gensalt())
00181       db = self.db()
00182       cur = db.cursor()
00183       cur.execute('insert or ignore into users (name, password_hash, admin) values (?, ?, ?)', (req.name, pwhash, 0,))
00184       if cur.rowcount > 0:
00185          userid = cur.lastrowid
00186          token = self.token(cur, userid)
00187          db.commit()
00188          db.close()
00189          return CreateUserResponse(token)
00190       else:
00191          rospy.loginfo("User %s already exists"%req.name)
00192          return CreateUserResponse(0)
00193 
00194    def handle_dequeue_program(self, req):
00195       # TODO: test
00196       db = self.db()
00197       user = self.get_user(db, req.token)
00198       if user:
00199          (owner, program) = self.get_program_info(db, req.id)
00200          if user.id != owner and not user.admin:
00201             rospy.loginfo("User %s is not allowed to dequeue %d"%(user.name, req.id))
00202          else:
00203             db.execute('delete from queue where program_id = ?',(req.id,))
00204             db.commit()
00205       db.close()
00206       return DequeueProgramResponse()
00207 
00208    def handle_get_my_programs(self, req):
00209       resp = GetMyProgramsResponse()
00210       db = self.db()
00211       user = self.get_user(db, req.token)
00212       if not user:
00213          # if we don't have a user, return an empty list
00214          return resp
00215       cur = db.cursor()
00216       cur.execute('select programs.id, programs.name, programs.type, users.name from programs join users on programs.user_id = users.id where users.id = ?', (user.id,))
00217       for r in cur.fetchall():
00218          resp.programs.append(ProgramInfo(r[0], r[1].encode('ascii'), r[2], r[3].encode('ascii')))
00219 
00220       cur.close()
00221       db.close()
00222       return resp
00223 
00224    def handle_get_output(self, req):
00225       resp = GetOutputResponse()
00226       db = self.db()
00227       user = self.get_user(db, req.token)
00228       if user:
00229          (owner, program) = self.get_program_info(db, req.program_id)
00230          if user.id != owner:
00231             rospy.loginfo("User %s is not allowed to get output from program %d"%(user.name, req.program_id))
00232          else:
00233             cur = db.cursor()
00234             # TODO: enforce output limit
00235             cur.execute('select time, output from output where program_id = ? order by time desc',
00236                   (req.program_id,))
00237             for r in cur.fetchall():
00238                resp.output.append(Output(Header(0, rospy.Time(r[0]), ''), r[1].encode('ascii')))
00239             cur.close()
00240 
00241       db.close()
00242 
00243       return resp
00244 
00245    def handle_get_program(self, req):
00246       db = self.db()
00247       (owner, program) = self.get_program(db, req.id)
00248       db.close()
00249       return GetProgramResponse(program)
00250 
00251    def handle_get_programs(self, req):
00252       db = self.db()
00253       cur = db.cursor()
00254       cur.execute('select programs.id, programs.name, programs.type, users.name from programs join users on programs.user_id = users.id')
00255       resp = GetProgramsResponse()
00256       for r in cur.fetchall():
00257          resp.programs.append(ProgramInfo(r[0], r[1].encode('ascii'), r[2], r[3].encode('ascii')))
00258       return resp
00259 
00260    def handle_get_queue(self, req):
00261       db = self.db()
00262       cur = db.cursor()
00263       cur.execute('select programs.id, programs.name, programs.type, users.name from programs join queue on programs.id = queue.program_id join users on programs.user_id = users.id order by queue.id')
00264       resp = GetQueueResponse()
00265       for r in cur.fetchall():
00266          resp.programs.append(ProgramInfo(r[0], r[1].encode('ascii'), r[2], r[3].encode('ascii')))
00267 
00268       return resp
00269 
00270    def handle_login(self, req):
00271       db = self.db()
00272       cur = db.cursor()
00273       cur.execute('select id, password_hash, admin from users where name = ?', (req.name,))
00274       row = cur.fetchone()
00275       if row == None:
00276          rospy.loginfo("No user named %s"%req.name)
00277          return LoginResponse(0, False)
00278       else:
00279          if bcrypt.hashpw(req.password, row[1]) == row[1]:
00280             token = self.token(cur, row[0])
00281             cur.close()
00282             db.commit()
00283             db.close()
00284             rospy.loginfo("Logged in %s"%req.name)
00285             return LoginResponse(token, row[2] != 0)
00286          else:
00287             rospy.loginfo("Password failed for %s"%req.name)
00288             return LoginResponse(0, False)
00289 
00290       return LoginResponse(0, False)
00291 
00292    def handle_logout(self, req):
00293       db = self.db()
00294       db.execute('delete from tokens where id = ?', (req.token,))
00295       db.commit()
00296       db.close()
00297       return LogoutResponse()
00298 
00299    def handle_queue_program(self, req):
00300       db = self.db()
00301       user = self.get_user(db, req.token)
00302       (owner, program) = self.get_program_info(db, req.program_id)
00303       p = 0
00304       if user and program:
00305          if user.admin or user.id == owner:
00306             db.execute('insert or ignore into queue (program_id) values (?)',
00307                   (req.program_id,))
00308             cur = db.cursor()
00309             # FIXME: only works when appending to queue. find an efficient way
00310             #  to do this
00311             cur.execute('select count(*) from queue')
00312             row = cur.fetchone()
00313             p = row[0] - 1
00314             cur.close()
00315             db.commit()
00316          else:
00317             rospy.loginfo("User %s does not have permission to put program %d into the queue"%(user.name, req.program_id))
00318       else:
00319          rospy.loginfo("Bad token %d or program id %d"%(token, req.program.info.id))
00320 
00321       db.close()
00322       return QueueProgramResponse(p)
00323 
00324    def handle_run_program(self, req):
00325       # FIXME: stub
00326       db = self.db()
00327       user = self.get_user(db, req.token)
00328       if user and user.admin:
00329          cur = db.cursor()
00330          cur.execute('select type, code from programs where id = ?', (req.id,))
00331          row = cur.fetchone()
00332          if row:
00333             if row[0] == ProgramInfo.PYTHON:
00334                rospy.loginfo("Run python program %d"%(req.id))
00335                py = subprocess.Popen(['python'], stdin=subprocess.PIPE, 
00336                      stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
00337                output = py.communicate(row[1])[0]
00338             elif row[0] == ProgramInfo.PUPPET:
00339                output = "Puppet program execution is not supported"
00340                rospy.logerr(output)
00341             elif row[0] == ProgramInfo.SLIDER:
00342                rospy.loginfo("Run slider program %d"%(req.id))
00343                rospy.ServiceProxy('run_slider_program', CallProgram)(row[1])
00344                output = "Slider program run"
00345             elif row[0] == 4:
00346                rospy.loginfo("Run web slider program %d"%(req.id))
00347                rospy.ServiceProxy('/museum/run_web_slider_program', CallProgram)(row[1])
00348                output = "Web slider program run"
00349             else:
00350                output = "Error: Unknown program type " + row[0]
00351                rospy.logerr(output)
00352 
00353             db.execute('insert into output (program_id, time, output) values'+
00354                   '(?, ?, ?)', (req.id, rospy.Time.now().to_sec(), output,))
00355             db.execute('delete from queue where program_id = ?', (req.id,))
00356             db.commit()
00357          else:
00358             rospy.logerror("Bad program: " + req.id)
00359       else:
00360          rospy.loginfo("%s is not allowed to run programs"%user.name)
00361 
00362       db.close()
00363 
00364       return RunProgramResponse()
00365 
00366    def handle_start_queue(self, req):
00367       # TODO: require an admin token
00368       return EmptyResponse()
00369 
00370    def handle_stop_queue(self, req):
00371       # TODO: require an admin token
00372       return EmptyResponse()
00373 
00374    def handle_update_program(self, req):
00375       db = self.db()
00376       user = self.get_user(db, req.token)
00377       (owner, program) = self.get_program_info(db, req.program.info.id)
00378       if user and program:
00379          # Food for thought: allow admins to edit any program?
00380          if owner == user.id:
00381             db.execute('update programs set name=?, type=?, code=? where id=?',
00382                   (req.program.info.name, req.program.info.type, 
00383                      req.program.code, req.program.info.id))
00384             db.commit()
00385          else:
00386             rospy.loginfo("User %s does not own program %d"%(user.name, 
00387                req.program.info.id))
00388       else:
00389          rospy.loginfo("Bad token %d or program id %d"%(req.token, req.program.info.id))
00390       db.close()
00391       return UpdateProgramResponse()
00392 
00393 
00394 if __name__ == '__main__':
00395    rospy.init_node('program_queue')
00396 
00397    queue = Queue()
00398 
00399    rospy.spin()


program_queue
Author(s): Austin Hendrix
autogenerated on Wed Aug 26 2015 15:37:32