00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027 import roslib; roslib.load_manifest('program_queue')
00028 import rospkg
00029 import rospy
00030
00031 from program_queue.srv import *
00032 from program_queue.msg import *
00033 from std_srvs.srv import Empty
00034 from std_msgs.msg import Header
00035
00036
00037 import MySQLdb as mysql
00038 import bcrypt
00039 import random
00040 import subprocess
00041 import base64
00042 import threading
00043
00044 class Queue:
00045 def __init__(self):
00046
00047
00048
00049
00050
00051 self.dbhost = rospy.get_param('~dbhost', 'demobase3')
00052 self.dbuser = rospy.get_param('~dbuser', 'willow')
00053 self.dbpass = rospy.get_param('~dbpass', 'willow')
00054 self.dbname = rospy.get_param('~dbpass', 'pr2_programs')
00055
00056 self.dblock = threading.Lock()
00057
00058 db = mysql.connect(self.dbhost, self.dbuser, self.dbpass, self.dbname)
00059 self.dbconn = db
00060
00061 with self.dblock:
00062 cur = db.cursor()
00063
00064 from warnings import filterwarnings, resetwarnings
00065 filterwarnings('ignore', category = mysql.Warning)
00066
00067 cur.execute('create table if not exists users(id integer primary key auto_increment, name varchar(256) unique not null, password_hash varchar(60) not null, admin int not null)')
00068 cur.execute('create table if not exists tokens(id bigint primary key, user_id integer references users(id))')
00069 cur.execute('create table if not exists programs(id integer primary key auto_increment, user_id integer references users(id), name text not null, type integer, code text not null)')
00070 cur.execute('create table if not exists output(id integer primary key auto_increment, program_id integer references programs(id), time double not null, output text not null)')
00071 cur.execute('create table if not exists queue(id integer primary key auto_increment, program_id integer unique references programs(id))')
00072 resetwarnings()
00073
00074
00075 admin_hash = bcrypt.hashpw('admin', bcrypt.gensalt())
00076 cur.execute("insert ignore into users (name, password_hash, admin) values (%s, %s, %s)", ('admin', admin_hash, 1))
00077
00078 db.commit()
00079 cur.close()
00080
00081
00082 rospy.Service('clear_queue', ClearQueue, self.handle_clear_queue)
00083 rospy.Service('create_program', CreateProgram,
00084 self.handle_create_program)
00085 rospy.Service('create_user', CreateUser, self.handle_create_user)
00086 rospy.Service('dequeue_program', DequeueProgram,
00087 self.handle_dequeue_program)
00088 rospy.Service('get_my_programs', GetMyPrograms,
00089 self.handle_get_my_programs)
00090 rospy.Service('get_output', GetOutput, self.handle_get_output)
00091 rospy.Service('get_program', GetProgram, self.handle_get_program)
00092 rospy.Service('get_programs', GetPrograms, self.handle_get_programs)
00093 rospy.Service('get_queue', GetQueue, self.handle_get_queue)
00094 rospy.Service('login', Login, self.handle_login)
00095 rospy.Service('logout', Logout, self.handle_logout)
00096 rospy.Service('queue_program', QueueProgram,
00097 self.handle_queue_program)
00098 rospy.Service('run_program', RunProgram, self.handle_run_program)
00099 rospy.Service('update_program', UpdateProgram,
00100 self.handle_update_program)
00101
00102 rospy.Service('start_queue', Empty, self.handle_start_queue)
00103 rospy.Service('stop_queue', Empty, self.handle_stop_queue)
00104
00105
00106
00107 rospy.loginfo("Queue ready")
00108
00109 def db(self):
00110
00111 return self.dbconn
00112
00113 class User:
00114 def __init__(self, id, name, pwhash, admin):
00115 self.id = id
00116 self.name = name
00117 self.pwhash = pwhash
00118 self.admin = admin
00119
00120 def get_user(self, db, token):
00121
00122 cur = db.cursor()
00123 cur.execute('select users.id, users.name, users.password_hash, users.admin from users join tokens on users.id = tokens.user_id where tokens.id = %s', (token,))
00124 rows = cur.fetchall()
00125
00126 row = None
00127 if rows:
00128 row = rows[0]
00129 ret = None
00130 if row:
00131 ret = Queue.User(row[0], row[1], row[2], row[3])
00132 cur.close()
00133 return ret
00134
00135 def get_program_info(self, db, program_id):
00136
00137 cur = db.cursor()
00138 cur.execute('select programs.id, programs.user_id, programs.name, programs.type, users.name from programs join users on programs.user_id = users.id where programs.id = %s', (program_id,))
00139 row = cur.fetchone()
00140 ret = (None, None)
00141 if row:
00142 ret = (row[1], ProgramInfo(row[0], row[2].encode('ascii'), row[3], row[4].encode('ascii')))
00143 cur.close()
00144 return ret
00145
00146 def get_program(self, db, program_id):
00147
00148 cur = db.cursor()
00149 cur.execute('select programs.id, programs.user_id, programs.name, programs.type, programs.code, users.name from programs join users on programs.user_id = users.id where programs.id = %s', (program_id,))
00150 row = cur.fetchone()
00151 ret = (None, None)
00152 if row:
00153 ret = (row[1], Program(ProgramInfo(row[0], row[2].encode('ascii'), row[3], row[5].encode('ascii')), row[4].encode('ascii')))
00154 cur.close()
00155 return ret
00156
00157 def token(self, cur, uid):
00158 rows = 0
00159 t = 0
00160 while rows < 1:
00161
00162 t = random.getrandbits(63)
00163
00164 while t == 0:
00165 t = random.getrandbits(63)
00166 cur.execute('insert ignore into tokens values (%s, %s)', (t, uid))
00167 rows = cur.rowcount
00168 if rows < 1:
00169 rospy.logwarn("Tried to insert duplicate token %d"%t)
00170 return t
00171
00172 def handle_clear_queue(self, req):
00173 with self.dblock:
00174 db = self.db()
00175 cur = db.cursor()
00176 user = self.get_user(db, req.token)
00177 if user and user.admin:
00178 cur.execute('delete from queue')
00179 db.commit()
00180 elif user:
00181 rospy.loginfo("ClearQueue called by non-admin user %s" % user.name)
00182 cur.close()
00183 return ClearQueueResponse()
00184
00185 def handle_create_program(self, req):
00186 with self.dblock:
00187 db = self.db()
00188 user = self.get_user(db, req.token)
00189 if user:
00190 rospy.loginfo("Creating new program for user %s"%user.name)
00191 else:
00192
00193 rospy.logwarn("No user for token %d"%req.token)
00194 return False
00195
00196 cur = db.cursor()
00197 cur.execute('insert into programs (user_id, name, code) values (%s, %s, %s)', (user.id, 'myprogram', ''))
00198 db.commit()
00199 program_id = cur.lastrowid
00200 cur.close()
00201 return CreateProgramResponse(program_id)
00202
00203 def handle_create_user(self, req):
00204 pwhash = bcrypt.hashpw(req.password, bcrypt.gensalt())
00205 with self.dblock:
00206 db = self.db()
00207 cur = db.cursor()
00208 cur.execute('insert ignore into users (name, password_hash, admin) values (%s, %s, %s)', (req.name, pwhash, 0,))
00209 db.commit()
00210 if cur.rowcount > 0:
00211 userid = cur.lastrowid
00212 token = self.token(cur, userid)
00213 cur.close()
00214 return CreateUserResponse(token)
00215 else:
00216 cur.close()
00217 rospy.loginfo("User %s already exists"%req.name)
00218 return CreateUserResponse(0)
00219
00220 def handle_dequeue_program(self, req):
00221
00222 with self.dblock:
00223 db = self.db()
00224 user = self.get_user(db, req.token)
00225 if user:
00226 (owner, program) = self.get_program_info(db, req.id)
00227 if user.id != owner and not user.admin:
00228 rospy.loginfo("User %s is not allowed to dequeue %d"%(user.name, req.id))
00229 else:
00230 cur = db.cursor()
00231 cur.execute('delete from queue where program_id = %s',(req.id,))
00232 db.commit()
00233 cur.close()
00234 return DequeueProgramResponse()
00235
00236 def handle_get_my_programs(self, req):
00237 resp = GetMyProgramsResponse()
00238 with self.dblock:
00239 db = self.db()
00240 user = self.get_user(db, req.token)
00241 if not user:
00242
00243 return resp
00244 cur = db.cursor()
00245 cur.execute('select programs.id, programs.name, programs.type, users.name from programs join users on programs.user_id = users.id where users.id = %s', (user.id,))
00246 for r in cur.fetchall():
00247 resp.programs.append(ProgramInfo(r[0], r[1].encode('ascii'), r[2], r[3].encode('ascii')))
00248
00249 cur.close()
00250 return resp
00251
00252 def handle_get_output(self, req):
00253 resp = GetOutputResponse()
00254 with self.dblock:
00255 db = self.db()
00256 user = self.get_user(db, req.token)
00257 if user:
00258 (owner, program) = self.get_program_info(db, req.program_id)
00259 if user.id != owner:
00260 rospy.loginfo("User %s is not allowed to get output from program %d"%(user.name, req.program_id))
00261 else:
00262 cur = db.cursor()
00263
00264 cur.execute('select time, output from output where program_id = %s order by time desc',
00265 (req.program_id,))
00266 for r in cur.fetchall():
00267 resp.output.append(Output(Header(0, rospy.Time(r[0]), ''), r[1].encode('ascii')))
00268 cur.close()
00269
00270 return resp
00271
00272 def handle_get_program(self, req):
00273 with self.dblock:
00274 db = self.db()
00275 (owner, program) = self.get_program(db, req.id)
00276 return GetProgramResponse(program)
00277
00278 def handle_get_programs(self, req):
00279 with self.dblock:
00280 db = self.db()
00281 cur = db.cursor()
00282 cur.execute('select programs.id, programs.name, programs.type, users.name from programs join users on programs.user_id = users.id')
00283 resp = GetProgramsResponse()
00284 for r in cur.fetchall():
00285 resp.programs.append(ProgramInfo(r[0], r[1].encode('ascii'), r[2], r[3].encode('ascii')))
00286 cur.close()
00287 return resp
00288
00289 def handle_get_queue(self, req):
00290 with self.dblock:
00291 db = self.db()
00292 cur = db.cursor()
00293 cur.execute('select programs.id, programs.name, programs.type, users.name from programs join queue on programs.id = queue.program_id join users on programs.user_id = users.id order by queue.id')
00294 resp = GetQueueResponse()
00295 for r in cur.fetchall():
00296 resp.programs.append(ProgramInfo(r[0], r[1].encode('ascii'), r[2], r[3].encode('ascii')))
00297
00298 cur.close()
00299 return resp
00300
00301 def handle_login(self, req):
00302 with self.dblock:
00303 db = self.db()
00304 cur = db.cursor()
00305 cur.execute('select id, password_hash, admin from users where name = %s', (req.name,))
00306 row = cur.fetchone()
00307 if row == None:
00308 rospy.loginfo("No user named %s"%req.name)
00309 cur.close()
00310 return LoginResponse(0, False)
00311 else:
00312 if bcrypt.hashpw(req.password, row[1]) == row[1]:
00313 token = self.token(cur, row[0])
00314 rospy.loginfo("Logged in %s"%req.name)
00315 cur.close()
00316 return LoginResponse(token, row[2] != 0)
00317 else:
00318 rospy.loginfo("Password failed for %s"%req.name)
00319 cur.close()
00320 return LoginResponse(0, False)
00321
00322 def handle_logout(self, req):
00323 with self.dblock:
00324 db = self.db()
00325 cur = db.cursor()
00326 cur.execute('delete from tokens where id = %s', (req.token,))
00327 db.commit()
00328 cur.close()
00329 return LogoutResponse()
00330
00331 def handle_queue_program(self, req):
00332 with self.dblock:
00333 db = self.db()
00334 user = self.get_user(db, req.token)
00335 (owner, program) = self.get_program_info(db, req.program_id)
00336 p = 0
00337 if user and program:
00338 if user.admin or user.id == owner:
00339 cur = db.cursor()
00340 cur.execute('insert ignore into queue (program_id) values (%s)',
00341 (req.program_id,))
00342 db.commit()
00343
00344
00345 cur.execute('select count(*) from queue')
00346 row = cur.fetchone()
00347 p = row[0] - 1
00348 cur.close()
00349 else:
00350 rospy.loginfo("User %s does not have permission to put program %d into the queue"%(user.name, req.program_id))
00351 else:
00352 rospy.loginfo("Bad token %d or program id %d"%(token, req.program.info.id))
00353
00354 return QueueProgramResponse(p)
00355
00356 def handle_run_program(self, req):
00357
00358 db = self.db()
00359 user = self.get_user(db, req.token)
00360 if user and user.admin:
00361 with self.dblock:
00362 cur = db.cursor()
00363 cur.execute('select type, code from programs where id = %s', (req.id,))
00364 row = cur.fetchone()
00365 cur.close()
00366 if row:
00367 if row[0] == ProgramInfo.PYTHON:
00368 rospy.loginfo("Run python program %d"%(req.id))
00369 py = subprocess.Popen(['python'], stdin=subprocess.PIPE,
00370 stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
00371 output = py.communicate(row[1])[0]
00372 elif row[0] == ProgramInfo.PUPPET:
00373 output = "Puppet program execution is not supported"
00374 rospy.logerr(output)
00375 elif row[0] == ProgramInfo.SLIDER:
00376 rospy.loginfo("Run slider program %d"%(req.id))
00377 rospy.ServiceProxy('run_slider_program', CallProgram)(row[1])
00378 output = "Slider program run"
00379 elif row[0] == 4:
00380 rospy.loginfo("Run web slider program %d"%(req.id))
00381 rospy.ServiceProxy('/museum/run_web_slider_program', CallProgram)(row[1])
00382 output = "Web slider program run"
00383 else:
00384 output = "Error: Unknown program type " + row[0]
00385 rospy.logerr(output)
00386
00387 with self.dblock:
00388 cur = db.cursor()
00389 cur.execute('insert into output (program_id, time, output) values'+
00390 '(%s, %s, %s)', (req.id, rospy.Time.now().to_sec(), output,))
00391 cur.execute('delete from queue where program_id = %s', (req.id,))
00392 db.commit()
00393 cur.close()
00394 else:
00395 rospy.logerror("Bad program: " + req.id)
00396 else:
00397 rospy.loginfo("%s is not allowed to run programs"%user.name)
00398
00399
00400 return RunProgramResponse()
00401
00402 def handle_start_queue(self, req):
00403
00404 return EmptyResponse()
00405
00406 def handle_stop_queue(self, req):
00407
00408 return EmptyResponse()
00409
00410 def handle_update_program(self, req):
00411 with self.dblock:
00412 db = self.db()
00413 user = self.get_user(db, req.token)
00414 (owner, program) = self.get_program_info(db, req.program.info.id)
00415 if user and program:
00416
00417 if owner == user.id:
00418 cur = db.cursor()
00419 cur.execute('update programs set name=%s, type=%s, code=%s where id=%s',
00420 (req.program.info.name, req.program.info.type,
00421 req.program.code, req.program.info.id))
00422 db.commit()
00423 cur.close()
00424 else:
00425 rospy.loginfo("User %s does not own program %d"%(user.name,
00426 req.program.info.id))
00427 else:
00428 rospy.loginfo("Bad token %d or program id %d"%(req.token, req.program.info.id))
00429 return UpdateProgramResponse()
00430
00431
00432 if __name__ == '__main__':
00433 rospy.init_node('program_queue')
00434
00435 queue = Queue()
00436
00437 rospy.spin()