program_queue_mysql.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 #
00003 # A program management and queueing system with a ROS service interface
00004 #
00005 # Author: Austin Hendrix
00006 
00007 # Design notes:
00008 #  At the moment, the db backend is sqlite. In the process of making things
00009 #   concise I think I've abstacted this to the point that it could be easily
00010 #   changed
00011 #  Token IDs are random 63-bit numbers.
00012 #  * When there are 4.31E8 tokens in existence, the probability of a collision
00013 #    is 0.01
00014 #  * When there are 1 million tokens in existence, the probability of a
00015 #    collision is 5.4E-8
00016 #  * See the Birthday problem for more in-depth coverage of the statistics
00017 #    that makes this possible
00018 #  * Token generation is checked and will retry until a new unique token is 
00019 #    created
00020 #  
00021 #
00022 # TODO: 
00023 #  * more validation
00024 #  * add username to ProgramInfo
00025 
00026 
00027 import roslib; roslib.load_manifest('program_queue')
00028 import rospkg
00029 import rospy
00030 
00031 from program_queue.srv import *
00032 from program_queue.msg import *
00033 from std_srvs.srv import Empty
00034 from std_msgs.msg import Header
00035 
00036 #import sqlite3
00037 import MySQLdb as mysql
00038 import bcrypt
00039 import random
00040 import subprocess
00041 import base64
00042 import threading
00043 
00044 class Queue:
00045    def __init__(self):
00046       # set up database
00047 
00048       # get the database path from the parameter server; or default to the
00049       #  user's ~/.ros/ directory
00050       #self.dbpath = rospy.get_param('~dbpath', rospkg.get_ros_home() + '/program_queue.db')
00051       self.dbhost = rospy.get_param('~dbhost', 'demobase3')
00052       self.dbuser = rospy.get_param('~dbuser', 'willow')
00053       self.dbpass = rospy.get_param('~dbpass', 'willow')
00054       self.dbname = rospy.get_param('~dbpass', 'pr2_programs')
00055 
00056       self.dblock = threading.Lock()
00057 
00058       db = mysql.connect(self.dbhost, self.dbuser, self.dbpass, self.dbname)
00059       self.dbconn = db
00060 
00061       with self.dblock:
00062          cur = db.cursor()
00063    
00064          from warnings import filterwarnings, resetwarnings
00065          filterwarnings('ignore', category = mysql.Warning)
00066          # create tables if they don't exist. 
00067          cur.execute('create table if not exists users(id integer primary key auto_increment, name varchar(256) unique not null, password_hash varchar(60) not null, admin int not null)')
00068          cur.execute('create table if not exists tokens(id bigint primary key, user_id integer references users(id))') # TODO: add an issue/expiration date to tokens
00069          cur.execute('create table if not exists programs(id integer primary key auto_increment, user_id integer references users(id), name text not null, type integer, code text not null)')
00070          cur.execute('create table if not exists output(id integer primary key auto_increment, program_id integer references programs(id), time double not null, output text not null)')
00071          cur.execute('create table if not exists queue(id integer primary key auto_increment, program_id integer unique references programs(id))')
00072          resetwarnings()
00073    
00074          # create an admin user if one doesn't exist
00075          admin_hash = bcrypt.hashpw('admin', bcrypt.gensalt())
00076          cur.execute("insert ignore into users (name, password_hash, admin) values (%s, %s, %s)", ('admin', admin_hash, 1))
00077    
00078          db.commit()
00079          cur.close()
00080 
00081       # set up services
00082       rospy.Service('clear_queue',     ClearQueue,     self.handle_clear_queue)
00083       rospy.Service('create_program',  CreateProgram,  
00084             self.handle_create_program)
00085       rospy.Service('create_user',     CreateUser,     self.handle_create_user)
00086       rospy.Service('dequeue_program', DequeueProgram, 
00087             self.handle_dequeue_program)
00088       rospy.Service('get_my_programs', GetMyPrograms,  
00089             self.handle_get_my_programs)
00090       rospy.Service('get_output',      GetOutput,      self.handle_get_output)
00091       rospy.Service('get_program',     GetProgram,     self.handle_get_program)
00092       rospy.Service('get_programs',    GetPrograms,    self.handle_get_programs)
00093       rospy.Service('get_queue',       GetQueue,       self.handle_get_queue)
00094       rospy.Service('login',           Login,          self.handle_login)
00095       rospy.Service('logout',          Logout,         self.handle_logout)
00096       rospy.Service('queue_program',   QueueProgram,   
00097             self.handle_queue_program)
00098       rospy.Service('run_program',     RunProgram,     self.handle_run_program)
00099       rospy.Service('update_program',  UpdateProgram, 
00100             self.handle_update_program)
00101 
00102       rospy.Service('start_queue',     Empty,          self.handle_start_queue)
00103       rospy.Service('stop_queue',     Empty,           self.handle_stop_queue)
00104 
00105       #rospy.wait_for_service('run_slider_program')
00106 
00107       rospy.loginfo("Queue ready")
00108 
00109    def db(self):
00110       #return sqlite3.connect(self.dbpath)
00111       return self.dbconn
00112 
00113    class User:
00114       def __init__(self, id, name, pwhash, admin):
00115          self.id = id
00116          self.name = name
00117          self.pwhash = pwhash
00118          self.admin = admin
00119 
00120    def get_user(self, db, token):
00121       # take a token and return a User object
00122       cur = db.cursor()
00123       cur.execute('select users.id, users.name, users.password_hash, users.admin from users join tokens on users.id = tokens.user_id where tokens.id = %s', (token,))
00124       rows = cur.fetchall()
00125       #row = cur.fetchone()
00126       row = None
00127       if rows:
00128          row = rows[0]
00129       ret = None
00130       if row:
00131          ret = Queue.User(row[0], row[1], row[2], row[3])
00132       cur.close()
00133       return ret
00134 
00135    def get_program_info(self, db, program_id):
00136       # take a program_id and return a Program object
00137       cur = db.cursor()
00138       cur.execute('select programs.id, programs.user_id, programs.name, programs.type, users.name from programs join users on programs.user_id = users.id where programs.id = %s', (program_id,))
00139       row = cur.fetchone()
00140       ret = (None, None)
00141       if row:
00142          ret = (row[1], ProgramInfo(row[0], row[2].encode('ascii'), row[3], row[4].encode('ascii')))
00143       cur.close()
00144       return ret
00145 
00146    def get_program(self, db, program_id):
00147       # take a program_id and return a Program object
00148       cur = db.cursor()
00149       cur.execute('select programs.id, programs.user_id, programs.name, programs.type, programs.code, users.name from programs join users on programs.user_id = users.id where programs.id = %s', (program_id,))
00150       row = cur.fetchone()
00151       ret = (None, None)
00152       if row:
00153          ret = (row[1], Program(ProgramInfo(row[0], row[2].encode('ascii'), row[3], row[5].encode('ascii')), row[4].encode('ascii')))
00154       cur.close()
00155       return ret
00156 
00157    def token(self, cur, uid):
00158       rows = 0
00159       t = 0
00160       while rows < 1:
00161          # generate a random 63-bit token
00162          t = random.getrandbits(63)
00163          # ensure that our token is never 0
00164          while t == 0:
00165             t = random.getrandbits(63)
00166          cur.execute('insert ignore into tokens values (%s, %s)', (t, uid))
00167          rows = cur.rowcount
00168          if rows < 1:
00169             rospy.logwarn("Tried to insert duplicate token %d"%t)
00170       return t
00171 
00172    def handle_clear_queue(self, req):
00173       with self.dblock:
00174          db = self.db()
00175          cur = db.cursor()
00176          user = self.get_user(db, req.token)
00177          if user and user.admin:
00178             cur.execute('delete from queue')
00179             db.commit()
00180          elif user:
00181             rospy.loginfo("ClearQueue called by non-admin user %s" % user.name)
00182          cur.close()
00183          return ClearQueueResponse()
00184 
00185    def handle_create_program(self, req):
00186       with self.dblock:
00187          db = self.db()
00188          user = self.get_user(db, req.token)
00189          if user:
00190             rospy.loginfo("Creating new program for user %s"%user.name)
00191          else:
00192             # TODO: handle this failure case better
00193             rospy.logwarn("No user for token %d"%req.token)
00194             return False
00195    
00196          cur = db.cursor()
00197          cur.execute('insert into programs (user_id, name, code) values (%s, %s, %s)', (user.id, 'myprogram', ''))
00198          db.commit()
00199          program_id = cur.lastrowid
00200          cur.close()
00201          return CreateProgramResponse(program_id)
00202 
00203    def handle_create_user(self, req):
00204       pwhash = bcrypt.hashpw(req.password, bcrypt.gensalt())
00205       with self.dblock:
00206          db = self.db()
00207          cur = db.cursor()
00208          cur.execute('insert ignore into users (name, password_hash, admin) values (%s, %s, %s)', (req.name, pwhash, 0,))
00209          db.commit()
00210          if cur.rowcount > 0:
00211             userid = cur.lastrowid
00212             token = self.token(cur, userid)
00213             cur.close()
00214             return CreateUserResponse(token)
00215          else:
00216             cur.close()
00217             rospy.loginfo("User %s already exists"%req.name)
00218             return CreateUserResponse(0)
00219 
00220    def handle_dequeue_program(self, req):
00221       # TODO: test
00222       with self.dblock:
00223          db = self.db()
00224          user = self.get_user(db, req.token)
00225          if user:
00226             (owner, program) = self.get_program_info(db, req.id)
00227             if user.id != owner and not user.admin:
00228                rospy.loginfo("User %s is not allowed to dequeue %d"%(user.name, req.id))
00229             else:
00230                cur = db.cursor()
00231                cur.execute('delete from queue where program_id = %s',(req.id,))
00232                db.commit()
00233                cur.close()
00234          return DequeueProgramResponse()
00235 
00236    def handle_get_my_programs(self, req):
00237       resp = GetMyProgramsResponse()
00238       with self.dblock:
00239          db = self.db()
00240          user = self.get_user(db, req.token)
00241          if not user:
00242             # if we don't have a user, return an empty list
00243             return resp
00244          cur = db.cursor()
00245          cur.execute('select programs.id, programs.name, programs.type, users.name from programs join users on programs.user_id = users.id where users.id = %s', (user.id,))
00246          for r in cur.fetchall():
00247             resp.programs.append(ProgramInfo(r[0], r[1].encode('ascii'), r[2], r[3].encode('ascii')))
00248    
00249          cur.close()
00250          return resp
00251 
00252    def handle_get_output(self, req):
00253       resp = GetOutputResponse()
00254       with self.dblock:
00255          db = self.db()
00256          user = self.get_user(db, req.token)
00257          if user:
00258             (owner, program) = self.get_program_info(db, req.program_id)
00259             if user.id != owner:
00260                rospy.loginfo("User %s is not allowed to get output from program %d"%(user.name, req.program_id))
00261             else:
00262                cur = db.cursor()
00263                # TODO: enforce output limit
00264                cur.execute('select time, output from output where program_id = %s order by time desc',
00265                      (req.program_id,))
00266                for r in cur.fetchall():
00267                   resp.output.append(Output(Header(0, rospy.Time(r[0]), ''), r[1].encode('ascii')))
00268                cur.close()
00269    
00270          return resp
00271 
00272    def handle_get_program(self, req):
00273       with self.dblock:
00274          db = self.db()
00275          (owner, program) = self.get_program(db, req.id)
00276          return GetProgramResponse(program)
00277 
00278    def handle_get_programs(self, req):
00279       with self.dblock:
00280          db = self.db()
00281          cur = db.cursor()
00282          cur.execute('select programs.id, programs.name, programs.type, users.name from programs join users on programs.user_id = users.id')
00283          resp = GetProgramsResponse()
00284          for r in cur.fetchall():
00285             resp.programs.append(ProgramInfo(r[0], r[1].encode('ascii'), r[2], r[3].encode('ascii')))
00286          cur.close()
00287          return resp
00288 
00289    def handle_get_queue(self, req):
00290       with self.dblock:
00291          db = self.db()
00292          cur = db.cursor()
00293          cur.execute('select programs.id, programs.name, programs.type, users.name from programs join queue on programs.id = queue.program_id join users on programs.user_id = users.id order by queue.id')
00294          resp = GetQueueResponse()
00295          for r in cur.fetchall():
00296             resp.programs.append(ProgramInfo(r[0], r[1].encode('ascii'), r[2], r[3].encode('ascii')))
00297    
00298          cur.close()
00299          return resp
00300 
00301    def handle_login(self, req):
00302       with self.dblock:
00303          db = self.db()
00304          cur = db.cursor()
00305          cur.execute('select id, password_hash, admin from users where name = %s', (req.name,))
00306          row = cur.fetchone()
00307          if row == None:
00308             rospy.loginfo("No user named %s"%req.name)
00309             cur.close()
00310             return LoginResponse(0, False)
00311          else:
00312             if bcrypt.hashpw(req.password, row[1]) == row[1]:
00313                token = self.token(cur, row[0])
00314                rospy.loginfo("Logged in %s"%req.name)
00315                cur.close()
00316                return LoginResponse(token, row[2] != 0)
00317             else:
00318                rospy.loginfo("Password failed for %s"%req.name)
00319                cur.close()
00320                return LoginResponse(0, False)
00321 
00322    def handle_logout(self, req):
00323       with self.dblock:
00324          db = self.db()
00325          cur = db.cursor()
00326          cur.execute('delete from tokens where id = %s', (req.token,))
00327          db.commit()
00328          cur.close()
00329          return LogoutResponse()
00330 
00331    def handle_queue_program(self, req):
00332       with self.dblock:
00333          db = self.db()
00334          user = self.get_user(db, req.token)
00335          (owner, program) = self.get_program_info(db, req.program_id)
00336          p = 0
00337          if user and program:
00338             if user.admin or user.id == owner:
00339                cur = db.cursor()
00340                cur.execute('insert ignore into queue (program_id) values (%s)',
00341                      (req.program_id,))
00342                db.commit()
00343                # FIXME: only works when appending to queue. find an efficient way
00344                #  to do this
00345                cur.execute('select count(*) from queue')
00346                row = cur.fetchone()
00347                p = row[0] - 1
00348                cur.close()
00349             else:
00350                rospy.loginfo("User %s does not have permission to put program %d into the queue"%(user.name, req.program_id))
00351          else:
00352             rospy.loginfo("Bad token %d or program id %d"%(token, req.program.info.id))
00353    
00354          return QueueProgramResponse(p)
00355 
00356    def handle_run_program(self, req):
00357       # FIXME: stub
00358       db = self.db()
00359       user = self.get_user(db, req.token)
00360       if user and user.admin:
00361          with self.dblock:
00362             cur = db.cursor()
00363             cur.execute('select type, code from programs where id = %s', (req.id,))
00364             row = cur.fetchone()
00365             cur.close()
00366          if row:
00367             if row[0] == ProgramInfo.PYTHON:
00368                rospy.loginfo("Run python program %d"%(req.id))
00369                py = subprocess.Popen(['python'], stdin=subprocess.PIPE, 
00370                      stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
00371                output = py.communicate(row[1])[0]
00372             elif row[0] == ProgramInfo.PUPPET:
00373                output = "Puppet program execution is not supported"
00374                rospy.logerr(output)
00375             elif row[0] == ProgramInfo.SLIDER:
00376                rospy.loginfo("Run slider program %d"%(req.id))
00377                rospy.ServiceProxy('run_slider_program', CallProgram)(row[1])
00378                output = "Slider program run"
00379             elif row[0] == 4:
00380                rospy.loginfo("Run web slider program %d"%(req.id))
00381                rospy.ServiceProxy('/museum/run_web_slider_program', CallProgram)(row[1])
00382                output = "Web slider program run"
00383             else:
00384                output = "Error: Unknown program type " + row[0]
00385                rospy.logerr(output)
00386    
00387             with self.dblock:
00388                cur = db.cursor()
00389                cur.execute('insert into output (program_id, time, output) values'+
00390                   '(%s, %s, %s)', (req.id, rospy.Time.now().to_sec(), output,))
00391                cur.execute('delete from queue where program_id = %s', (req.id,))
00392                db.commit()
00393                cur.close()
00394          else:
00395             rospy.logerror("Bad program: " + req.id)
00396       else:
00397          rospy.loginfo("%s is not allowed to run programs"%user.name)
00398    
00399    
00400       return RunProgramResponse()
00401 
00402    def handle_start_queue(self, req):
00403       # TODO: require an admin token
00404       return EmptyResponse()
00405 
00406    def handle_stop_queue(self, req):
00407       # TODO: require an admin token
00408       return EmptyResponse()
00409 
00410    def handle_update_program(self, req):
00411       with self.dblock:
00412          db = self.db()
00413          user = self.get_user(db, req.token)
00414          (owner, program) = self.get_program_info(db, req.program.info.id)
00415          if user and program:
00416             # Food for thought: allow admins to edit any program?
00417             if owner == user.id:
00418                cur = db.cursor()
00419                cur.execute('update programs set name=%s, type=%s, code=%s where id=%s',
00420                      (req.program.info.name, req.program.info.type, 
00421                         req.program.code, req.program.info.id))
00422                db.commit()
00423                cur.close()
00424             else:
00425                rospy.loginfo("User %s does not own program %d"%(user.name, 
00426                   req.program.info.id))
00427          else:
00428             rospy.loginfo("Bad token %d or program id %d"%(req.token, req.program.info.id))
00429          return UpdateProgramResponse()
00430 
00431 
00432 if __name__ == '__main__':
00433    rospy.init_node('program_queue')
00434 
00435    queue = Queue()
00436 
00437    rospy.spin()


program_queue
Author(s): Austin Hendrix
autogenerated on Wed Aug 26 2015 15:37:32