00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027 import roslib; roslib.load_manifest('program_queue')
00028 import rospkg
00029 import rospy
00030
00031 from program_queue.srv import *
00032 from program_queue.msg import *
00033 from std_srvs.srv import Empty
00034 from std_msgs.msg import Header
00035
00036 import sqlite3
00037 import bcrypt
00038 import random
00039 import subprocess
00040 import base64
00041
00042 class Queue:
00043 def __init__(self):
00044
00045
00046
00047
00048 self.dbpath = rospy.get_param('~dbpath', rospkg.get_ros_home() + '/program_queue.db')
00049
00050 db = sqlite3.connect(self.dbpath)
00051
00052
00053 db.execute('create table if not exists users(id integer primary key asc autoincrement, name text unique not null, password_hash text not null, admin int not null)')
00054 db.execute('create table if not exists tokens(id integer primary key, user_id integer references users(id))')
00055 db.execute('create table if not exists programs(id integer primary key asc autoincrement, user_id integer references users(id), name text default "myprogram" not null, type integer, code text default "" not null)')
00056 db.execute('create table if not exists output(id integer primary key asc autoincrement, program_id integer references programs(id) not null, time double not null, output text not null)')
00057 db.execute('create table if not exists queue(id integer primary key asc autoincrement, program_id integer unique references programs(id))')
00058
00059
00060 admin_hash = bcrypt.hashpw('admin', bcrypt.gensalt())
00061 db.execute("insert or ignore into users (name, password_hash, admin) values (?, ?, ?)", ('admin', admin_hash, 1,))
00062
00063 db.commit()
00064 db.close()
00065
00066
00067 rospy.Service('clear_queue', ClearQueue, self.handle_clear_queue)
00068 rospy.Service('create_program', CreateProgram,
00069 self.handle_create_program)
00070 rospy.Service('create_user', CreateUser, self.handle_create_user)
00071 rospy.Service('dequeue_program', DequeueProgram,
00072 self.handle_dequeue_program)
00073 rospy.Service('get_my_programs', GetMyPrograms,
00074 self.handle_get_my_programs)
00075 rospy.Service('get_output', GetOutput, self.handle_get_output)
00076 rospy.Service('get_program', GetProgram, self.handle_get_program)
00077 rospy.Service('get_programs', GetPrograms, self.handle_get_programs)
00078 rospy.Service('get_queue', GetQueue, self.handle_get_queue)
00079 rospy.Service('login', Login, self.handle_login)
00080 rospy.Service('logout', Logout, self.handle_logout)
00081 rospy.Service('queue_program', QueueProgram,
00082 self.handle_queue_program)
00083 rospy.Service('run_program', RunProgram, self.handle_run_program)
00084 rospy.Service('update_program', UpdateProgram,
00085 self.handle_update_program)
00086
00087 rospy.Service('start_queue', Empty, self.handle_start_queue)
00088 rospy.Service('stop_queue', Empty, self.handle_stop_queue)
00089
00090 rospy.wait_for_service('run_slider_program')
00091
00092 rospy.loginfo("Queue ready")
00093
00094 def db(self):
00095 return sqlite3.connect(self.dbpath)
00096
00097 class User:
00098 def __init__(self, id, name, pwhash, admin):
00099 self.id = id
00100 self.name = name
00101 self.pwhash = pwhash
00102 self.admin = admin
00103
00104 def get_user(self, db, token):
00105
00106 cur = db.cursor()
00107 cur.execute('select users.id, users.name, users.password_hash, users.admin from users join tokens on users.id = tokens.user_id where tokens.id = ?', (token,))
00108 row = cur.fetchone()
00109 if row:
00110 return Queue.User(row[0], row[1], row[2], row[3])
00111 else:
00112
00113 return None
00114
00115 def get_program_info(self, db, program_id):
00116
00117 cur = db.cursor()
00118 cur.execute('select programs.id, programs.user_id, programs.name, programs.type, users.name from programs join users on programs.user_id = users.id where programs.id = ?', (program_id,))
00119 row = cur.fetchone()
00120 if row:
00121 return (row[1], ProgramInfo(row[0], row[2].encode('ascii'), row[3], row[4].encode('ascii')))
00122 else:
00123 return (None, None)
00124
00125 def get_program(self, db, program_id):
00126
00127 cur = db.cursor()
00128 cur.execute('select programs.id, programs.user_id, programs.name, programs.type, programs.code, users.name from programs join users on programs.user_id = users.id where programs.id = ?', (program_id,))
00129 row = cur.fetchone()
00130 if row:
00131 return (row[1], Program(ProgramInfo(row[0], row[2].encode('ascii'), row[3], row[5].encode('ascii')), row[4].encode('ascii')))
00132 else:
00133 return (None, None)
00134
00135 def token(self, cur, uid):
00136 rows = 0
00137 t = 0
00138 while rows < 1:
00139
00140 t = random.getrandbits(63)
00141
00142 while t == 0:
00143 t = random.getrandbits(63)
00144 cur.execute('insert or ignore into tokens values (?, ?)', (t, uid))
00145 rows = cur.rowcount
00146 if rows < 1:
00147 rospy.logwarn("Tried to insert duplicate token %d"%t)
00148 return t
00149
00150 def handle_clear_queue(self, req):
00151 db = self.db()
00152 user = self.get_user(db, req.token)
00153 if user and user.admin:
00154 db.execute('delete from queue')
00155 db.commit()
00156 elif user:
00157 rospy.loginfo("ClearQueue called by non-admin user %s" % user.name)
00158 db.close()
00159 return ClearQueueResponse()
00160
00161 def handle_create_program(self, req):
00162 db = self.db()
00163 user = self.get_user(db, req.token)
00164 if user:
00165 rospy.loginfo("Creating new program for user %s"%user.name)
00166 else:
00167
00168 rospy.logwarn("No user for token %d"%req.token)
00169 return False
00170
00171 c = db.cursor()
00172 c.execute('insert into programs (user_id) values (?)', (user.id,))
00173 program_id = c.lastrowid
00174 c.close()
00175 db.commit()
00176 db.close()
00177 return CreateProgramResponse(program_id)
00178
00179 def handle_create_user(self, req):
00180 pwhash = bcrypt.hashpw(req.password, bcrypt.gensalt())
00181 db = self.db()
00182 cur = db.cursor()
00183 cur.execute('insert or ignore into users (name, password_hash, admin) values (?, ?, ?)', (req.name, pwhash, 0,))
00184 if cur.rowcount > 0:
00185 userid = cur.lastrowid
00186 token = self.token(cur, userid)
00187 db.commit()
00188 db.close()
00189 return CreateUserResponse(token)
00190 else:
00191 rospy.loginfo("User %s already exists"%req.name)
00192 return CreateUserResponse(0)
00193
00194 def handle_dequeue_program(self, req):
00195
00196 db = self.db()
00197 user = self.get_user(db, req.token)
00198 if user:
00199 (owner, program) = self.get_program_info(db, req.id)
00200 if user.id != owner and not user.admin:
00201 rospy.loginfo("User %s is not allowed to dequeue %d"%(user.name, req.id))
00202 else:
00203 db.execute('delete from queue where program_id = ?',(req.id,))
00204 db.commit()
00205 db.close()
00206 return DequeueProgramResponse()
00207
00208 def handle_get_my_programs(self, req):
00209 resp = GetMyProgramsResponse()
00210 db = self.db()
00211 user = self.get_user(db, req.token)
00212 if not user:
00213
00214 return resp
00215 cur = db.cursor()
00216 cur.execute('select programs.id, programs.name, programs.type, users.name from programs join users on programs.user_id = users.id where users.id = ?', (user.id,))
00217 for r in cur.fetchall():
00218 resp.programs.append(ProgramInfo(r[0], r[1].encode('ascii'), r[2], r[3].encode('ascii')))
00219
00220 cur.close()
00221 db.close()
00222 return resp
00223
00224 def handle_get_output(self, req):
00225 resp = GetOutputResponse()
00226 db = self.db()
00227 user = self.get_user(db, req.token)
00228 if user:
00229 (owner, program) = self.get_program_info(db, req.program_id)
00230 if user.id != owner:
00231 rospy.loginfo("User %s is not allowed to get output from program %d"%(user.name, req.program_id))
00232 else:
00233 cur = db.cursor()
00234
00235 cur.execute('select time, output from output where program_id = ? order by time desc',
00236 (req.program_id,))
00237 for r in cur.fetchall():
00238 resp.output.append(Output(Header(0, rospy.Time(r[0]), ''), r[1].encode('ascii')))
00239 cur.close()
00240
00241 db.close()
00242
00243 return resp
00244
00245 def handle_get_program(self, req):
00246 db = self.db()
00247 (owner, program) = self.get_program(db, req.id)
00248 db.close()
00249 return GetProgramResponse(program)
00250
00251 def handle_get_programs(self, req):
00252 db = self.db()
00253 cur = db.cursor()
00254 cur.execute('select programs.id, programs.name, programs.type, users.name from programs join users on programs.user_id = users.id')
00255 resp = GetProgramsResponse()
00256 for r in cur.fetchall():
00257 resp.programs.append(ProgramInfo(r[0], r[1].encode('ascii'), r[2], r[3].encode('ascii')))
00258 return resp
00259
00260 def handle_get_queue(self, req):
00261 db = self.db()
00262 cur = db.cursor()
00263 cur.execute('select programs.id, programs.name, programs.type, users.name from programs join queue on programs.id = queue.program_id join users on programs.user_id = users.id order by queue.id')
00264 resp = GetQueueResponse()
00265 for r in cur.fetchall():
00266 resp.programs.append(ProgramInfo(r[0], r[1].encode('ascii'), r[2], r[3].encode('ascii')))
00267
00268 return resp
00269
00270 def handle_login(self, req):
00271 db = self.db()
00272 cur = db.cursor()
00273 cur.execute('select id, password_hash, admin from users where name = ?', (req.name,))
00274 row = cur.fetchone()
00275 if row == None:
00276 rospy.loginfo("No user named %s"%req.name)
00277 return LoginResponse(0, False)
00278 else:
00279 if bcrypt.hashpw(req.password, row[1]) == row[1]:
00280 token = self.token(cur, row[0])
00281 cur.close()
00282 db.commit()
00283 db.close()
00284 rospy.loginfo("Logged in %s"%req.name)
00285 return LoginResponse(token, row[2] != 0)
00286 else:
00287 rospy.loginfo("Password failed for %s"%req.name)
00288 return LoginResponse(0, False)
00289
00290 return LoginResponse(0, False)
00291
00292 def handle_logout(self, req):
00293 db = self.db()
00294 db.execute('delete from tokens where id = ?', (req.token,))
00295 db.commit()
00296 db.close()
00297 return LogoutResponse()
00298
00299 def handle_queue_program(self, req):
00300 db = self.db()
00301 user = self.get_user(db, req.token)
00302 (owner, program) = self.get_program_info(db, req.program_id)
00303 p = 0
00304 if user and program:
00305 if user.admin or user.id == owner:
00306 db.execute('insert or ignore into queue (program_id) values (?)',
00307 (req.program_id,))
00308 cur = db.cursor()
00309
00310
00311 cur.execute('select count(*) from queue')
00312 row = cur.fetchone()
00313 p = row[0] - 1
00314 cur.close()
00315 db.commit()
00316 else:
00317 rospy.loginfo("User %s does not have permission to put program %d into the queue"%(user.name, req.program_id))
00318 else:
00319 rospy.loginfo("Bad token %d or program id %d"%(token, req.program.info.id))
00320
00321 db.close()
00322 return QueueProgramResponse(p)
00323
00324 def handle_run_program(self, req):
00325
00326 db = self.db()
00327 user = self.get_user(db, req.token)
00328 if user and user.admin:
00329 cur = db.cursor()
00330 cur.execute('select type, code from programs where id = ?', (req.id,))
00331 row = cur.fetchone()
00332 if row:
00333 if row[0] == ProgramInfo.PYTHON:
00334 rospy.loginfo("Run python program %d"%(req.id))
00335 py = subprocess.Popen(['python'], stdin=subprocess.PIPE,
00336 stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
00337 output = py.communicate(row[1])[0]
00338 elif row[0] == ProgramInfo.PUPPET:
00339 output = "Puppet program execution is not supported"
00340 rospy.logerr(output)
00341 elif row[0] == ProgramInfo.SLIDER:
00342 rospy.loginfo("Run slider program %d"%(req.id))
00343 rospy.ServiceProxy('run_slider_program', CallProgram)(row[1])
00344 output = "Slider program run"
00345 elif row[0] == 4:
00346 rospy.loginfo("Run web slider program %d"%(req.id))
00347 rospy.ServiceProxy('/museum/run_web_slider_program', CallProgram)(row[1])
00348 output = "Web slider program run"
00349 else:
00350 output = "Error: Unknown program type " + row[0]
00351 rospy.logerr(output)
00352
00353 db.execute('insert into output (program_id, time, output) values'+
00354 '(?, ?, ?)', (req.id, rospy.Time.now().to_sec(), output,))
00355 db.execute('delete from queue where program_id = ?', (req.id,))
00356 db.commit()
00357 else:
00358 rospy.logerror("Bad program: " + req.id)
00359 else:
00360 rospy.loginfo("%s is not allowed to run programs"%user.name)
00361
00362 db.close()
00363
00364 return RunProgramResponse()
00365
00366 def handle_start_queue(self, req):
00367
00368 return EmptyResponse()
00369
00370 def handle_stop_queue(self, req):
00371
00372 return EmptyResponse()
00373
00374 def handle_update_program(self, req):
00375 db = self.db()
00376 user = self.get_user(db, req.token)
00377 (owner, program) = self.get_program_info(db, req.program.info.id)
00378 if user and program:
00379
00380 if owner == user.id:
00381 db.execute('update programs set name=?, type=?, code=? where id=?',
00382 (req.program.info.name, req.program.info.type,
00383 req.program.code, req.program.info.id))
00384 db.commit()
00385 else:
00386 rospy.loginfo("User %s does not own program %d"%(user.name,
00387 req.program.info.id))
00388 else:
00389 rospy.loginfo("Bad token %d or program id %d"%(req.token, req.program.info.id))
00390 db.close()
00391 return UpdateProgramResponse()
00392
00393
00394 if __name__ == '__main__':
00395 rospy.init_node('program_queue')
00396
00397 queue = Queue()
00398
00399 rospy.spin()