$search
00001 #! /usr/bin/env python 00002 00003 """ 00004 usage: %(progname)s [args] 00005 """ 00006 00007 import os, sys, string, time, getopt 00008 from log import * 00009 00010 import odb 00011 import unittest 00012 00013 ## ----------------------------------------------------------------------- 00014 ## T E S T S 00015 ## ----------------------------------------------------------------------- 00016 00017 00018 def TEST_MYSQL(output=warn): 00019 output("------ TESTING MySQLdb ---------") 00020 import odb_mysql 00021 conn = odb_mysql.Connection(host = 'localhost',user='root', passwd = '', db='test') 00022 db = odb.Database(conn) 00023 return db 00024 00025 def TEST_SQLITE(output=warn): 00026 output("------ TESTING sqlite2 ----------") 00027 import odb_sqlite 00028 conn = odb_sqlite.Connection("/tmp/test2.db", autocommit=1) 00029 db = odb.Database(conn) 00030 return db 00031 00032 def TEST_SQLITE3(output=warn): 00033 output("------ TESTING sqlite3 ----------") 00034 import odb_sqlite3 00035 conn = odb_sqlite3.Connection("/tmp/test3.db", autocommit=1) 00036 db = odb.Database(conn) 00037 return db 00038 00039 def TEST_POSTGRES(output=warn): 00040 output("------ TESTING postgres ----------") 00041 import odb_postgres 00042 conn = odb_postgres.Connection(database="test") 00043 db = odb.Database(conn) 00044 return db 00045 00046 def setupDB(dbType="sqlite3"): 00047 if dbType == "sqlite3": 00048 db = TEST_SQLITE3() 00049 elif dbType == "sqlite2": 00050 db = TEST_SQLITE() 00051 elif dbType == "mysql": 00052 db = TEST_MYSQL() 00053 elif dbType == "postgres": 00054 db = TEST_POSTGRES() 00055 00056 db.enabledCompression() 00057 00058 db.addTable("agents", "agents", AgentsTable) 00059 try: db.agents.dropTable() 00060 except: pass 00061 00062 db.addTable("roles", "roles", TestTable) 00063 try: db.test.dropTable() 00064 except: pass 00065 00066 db.createTables() 00067 db.createIndices() 00068 00069 return db 00070 00071 class AgentsTable(odb.Table): 00072 def _defineRows(self): 00073 self.d_addColumn("agent_id",odb.kInteger,None,primarykey = 1,autoincrement = 1) 00074 self.d_addColumn("login",odb.kVarString,200,notnull=1) 00075 self.d_addColumn("ext_email",odb.kVarString,200,notnull=1, indexed=1) 00076 self.d_addColumn("hashed_pw",odb.kVarString,20,notnull=1) 00077 self.d_addColumn("name",odb.kBigString,compress_ok=1) 00078 self.d_addColumn("auth_level",odb.kInteger,None) 00079 self.d_addColumn("ticket_count",odb.kIncInteger,None) 00080 self.d_addColumn("data",odb.kBlob,None, compress_ok=1) 00081 self.d_addValueColumn() 00082 self.d_addVColumn("columnA",odb.kInteger,None) 00083 self.d_addVColumn("columnB",odb.kInteger,None) 00084 00085 self.d_hasMany("roles") 00086 00087 class TestTable(odb.Table): 00088 def _defineRows(self): 00089 self.d_addColumn("oid",odb.kInteger,None,primarykey = 1,autoincrement = 1) 00090 self.d_addColumn("agent_id",odb.kInteger) 00091 self.d_addColumn("a",odb.kInteger) 00092 self.d_addColumn("b",odb.kInteger) 00093 00094 self.d_belongsTo("agents", foreign_key="agent_id") 00095 00096 00097 00098 class Test_Database(unittest.TestCase): 00099 def __init__(self, methodName='runTest', db=None): 00100 unittest.TestCase.__init__(self, methodName=methodName) 00101 self.db = db 00102 if self.db is None: self.db = gDB 00103 00104 self.TEST_INSERT_COUNT = 5 00105 00106 def setUp(self): 00107 self.db.agents.deleteAllRows() 00108 00109 def test_fetchRow(self): 00110 00111 # --------------------------------------------------------------- 00112 # make sure we can catch a missing row 00113 00114 try: 00115 a_row = self.db.agents.fetchRow( ("agent_id", 1000) ) 00116 self.fail("fetchRow eNoMatchingRows") 00117 except odb.eNoMatchingRows: 00118 pass 00119 00120 def test_rowCreation(self): 00121 # -------------------------------------------------------------- 00122 # create new rows and insert them 00123 00124 00125 for n in range(self.TEST_INSERT_COUNT): 00126 new_id = n + 1 00127 00128 newrow = self.db.agents.newRow() 00129 newrow.name = "name #%d" % new_id 00130 newrow.login = "name%d" % new_id 00131 newrow.ext_email = "%d@name" % new_id 00132 newrow.hashed_pw = "hashedpw" 00133 newrow.save() 00134 msg = "new insert id (%d) does not match expected value (%d)" % (newrow.agent_id,new_id) 00135 self.assertEqual(newrow.agent_id,new_id, msg=msg) 00136 00137 00138 def test_fetchRow_One(self): 00139 # -------------------------------------------------------------- 00140 # fetch one row 00141 row = self.db.agents.newRow() 00142 row.name = "name #1" 00143 row.login = "name #2" 00144 row.ext_email = "name #2" 00145 row.hashed_pw = "lsjdf" 00146 row.save() 00147 a_row = self.db.agents.fetchRow( ("agent_id", 1) ) 00148 00149 self.assertEqual(a_row.name, "name #1") 00150 00151 return a_row 00152 00153 def test_save(self): 00154 # --------------------------------------------------------------- 00155 # don't change and save it 00156 # (i.e. the "dummy cursor" string should never be called!) 00157 # 00158 a_row = self.test_fetchRow_One() 00159 try: 00160 a_row.save(cursor = "dummy cursor") 00161 except AttributeError, reason: 00162 self.fail("row tried to access cursor on save() when no changes were made!") 00163 00164 def test_save(self): 00165 # --------------------------------------------------------------- 00166 # change, save, load, test 00167 00168 a_row = self.test_fetchRow_One() 00169 00170 a_row.auth_level = 10 00171 a_row.save() 00172 b_row = self.db.agents.fetchRow( ("agent_id", 1) ) 00173 self.assertEqual(b_row.auth_level, 10, "save and load failed") 00174 00175 def misc(self): 00176 # --------------------------------------------------------------- 00177 # replace 00178 00179 if 0: 00180 repl_row = self.db.agents.newRow(replace=1) 00181 repl_row.agent_id = a_row.agent_id 00182 repl_row.login = a_row.login + "-" + a_row.login 00183 repl_row.ext_email = "foo" 00184 repl_row.hashed_pw = "hashed_pw" 00185 repl_row.save() 00186 00187 b_row = self.db.agents.fetchRow( ("agent_id", a_row.agent_id) ) 00188 if b_row.login != repl_row.login: 00189 raise "replace failed" 00190 00191 # -------------------------------------------------------------- 00192 # access unknown attribute 00193 def test_unknownAttribute(self): 00194 a_row = self.test_fetchRow_One() 00195 try: 00196 a = a_row.UNKNOWN_ATTRIBUTE 00197 self.fail("unknown attribute") 00198 except AttributeError, reason: 00199 pass 00200 except odb.eNoSuchColumn, reason: 00201 pass 00202 00203 def test_unknownAttribute2(self): 00204 a_row = self.test_fetchRow_One() 00205 try: 00206 a_row.UNKNOWN_ATTRIBUTE = 1 00207 self.fail("unknown attribute") 00208 except AttributeError, reason: 00209 pass 00210 except odb.eNoSuchColumn, reason: 00211 pass 00212 00213 # -------------------------------------------------------------- 00214 # access unknown dict item 00215 00216 def test_unknownAttribute3(self): 00217 a_row = self.test_fetchRow_One() 00218 try: 00219 a = a_row["UNKNOWN_ATTRIBUTE"] 00220 self.fail("unknown attribute") 00221 except KeyError, reason: 00222 pass 00223 except odb.eNoSuchColumn, reason: 00224 pass 00225 00226 def test_unknownAttribute3(self): 00227 a_row = self.test_fetchRow_One() 00228 try: 00229 a_row["UNKNOWN_ATTRIBUTE"] = 1 00230 self.fail("unknown attribute") 00231 except KeyError, reason: 00232 pass 00233 except odb.eNoSuchColumn, reason: 00234 pass 00235 00236 def misc2(self): 00237 # -------------------------------------------------------------- 00238 # use wrong data for column type 00239 00240 if 0: 00241 try: 00242 a_row.agent_id = "this is a string" 00243 raise "test error" 00244 except eInvalidData, reason: 00245 pass 00246 00247 output("PASSED! invalid data for column type") 00248 00249 # -------------------------------------------------------------- 00250 # fetch 1 rows 00251 00252 def test_fetchRows(self): 00253 arow = self.test_fetchRow_One() 00254 rows = self.db.agents.fetchRows( ('agent_id', 1) ) 00255 self.assertEqual(len(rows), 1) 00256 00257 # -------------------------------------------------------------- 00258 # fetch All rows 00259 00260 def test_fetchRows(self): 00261 arow = self.test_rowCreation() 00262 00263 rows = self.db.agents.fetchAllRows() 00264 self.assertEqual(len(rows), self.TEST_INSERT_COUNT) 00265 00266 00267 def test_deleteObject(self): 00268 # -------------------------------------------------------------- 00269 # delete row object 00270 00271 arow = self.test_fetchRow_One() 00272 00273 row = self.db.agents.fetchRow( ('agent_id', 1) ) 00274 row.delete() 00275 try: 00276 row = self.db.agents.fetchRow( ('agent_id', 1) ) 00277 self.fail() 00278 except odb.eNoMatchingRows: 00279 pass 00280 00281 # -------------------------------------------------------------- 00282 # table deleteRow() call 00283 def test_deleteRow(self): 00284 self.test_rowCreation() 00285 00286 row = self.db.agents.fetchRow( ('agent_id',2) ) 00287 self.db.agents.deleteRow( ('agent_id', 2) ) 00288 try: 00289 row = self.db.agents.fetchRow( ('agent_id',2) ) 00290 self.fail() 00291 except odb.eNoMatchingRows: 00292 pass 00293 00294 # -------------------------------------------------------------- 00295 def test_datasize(self): 00296 self.test_rowCreation() 00297 row = self.db.agents.fetchRow( ('agent_id',3) ) 00298 if row.databaseSizeForColumn('name') != len(row.name): 00299 self.fail() 00300 00301 # -------------------------------------------------------------- 00302 # table fetchRowUsingPrimaryKey 00303 def test_primarykey(self): 00304 self.test_rowCreation() 00305 00306 row = self.db.agents.fetchRowUsingPrimaryKey(3) 00307 if row.agent_id != 3: 00308 self.fail() 00309 00310 def test_lookup(self): 00311 self.test_rowCreation() 00312 row = self.db.agents.lookup(agent_id=3) 00313 if row.agent_id != 3: 00314 self.fail() 00315 00316 def test_lookupCreate(self): 00317 row = self.db.agents.lookupCreate(agent_id=2) 00318 if row.isClean(): 00319 self.fail() 00320 row.login = 1 00321 row.ext_email = "hassan@dotfunk.com" 00322 row.hashed_pw = "sdfj" 00323 row.save() 00324 00325 def misc3(self): 00326 if 0: 00327 # -------------------------------------------------------------- 00328 # test inc fields 00329 row = self.db.agents.newRow() 00330 new_id = 1092 00331 row.name = "name #%d" % new_id 00332 row.login = "name%d" % new_id 00333 row.ext_email = "%d@name" % new_id 00334 row.inc('ticket_count') 00335 row.hashed_pw = "hashed_pw" 00336 row.save() 00337 new_id = row.agent_id 00338 00339 trow = self.db.agents.fetchRow( ('agent_id',new_id) ) 00340 if trow.ticket_count != 1: 00341 raise "ticket_count didn't inc!", repr(trow.ticket_count) 00342 00343 row.inc('ticket_count', count=2) 00344 row.save() 00345 trow = self.db.agents.fetchRow( ('agent_id',new_id) ) 00346 if trow.ticket_count != 3: 00347 raise "ticket_count wrong, expected 3, got %d" % trow.ticket_count 00348 00349 trow.inc('ticket_count') 00350 trow.save() 00351 if trow.ticket_count != 4: 00352 raise "ticket_count wrong, expected 4, got %d" % trow.ticket_count 00353 00354 00355 def test_virtualColumns(self): 00356 self.test_rowCreation() 00357 00358 try: 00359 b_row = self.db.agents.newRow(replace=1) 00360 except odb.eNoMatchingRows: 00361 self.fail() 00362 00363 b_row.login = "scott" 00364 b_row.ext_email = "scott@" 00365 b_row.hashed_pw = "lsdjf" 00366 b_row.columnA = "hello1" 00367 b_row.columnB = "hello2" 00368 b_row.save() 00369 00370 c_row = self.db.agents.fetchRow( ("agent_id", b_row.agent_id) ) 00371 00372 self.assertEqual(c_row.columnA, "hello1") 00373 self.assertEqual(c_row.columnB, "hello2") 00374 00375 def test_compression(self): 00376 self.test_rowCreation() 00377 00378 try: 00379 b_row = self.db.agents.fetchRow( ("agent_id", 5) ) 00380 except odb.eNoMatchingRows: 00381 self.fail() 00382 00383 d = "hello\0hello" * 100 00384 b_row.data = d 00385 b_row.save() 00386 00387 b_row = self.db.agents.fetchRow( ("agent_id", 5) ) 00388 self.assertEqual(b_row.data, d) 00389 00390 def test_relations(self): 00391 self.test_rowCreation() 00392 00393 row = self.db.roles.newRow() 00394 row.agent_id = 2 00395 row.a = "1" 00396 row.b = "2" 00397 row.save() 00398 00399 row2 = self.db.roles.lookup(oid=row.oid) 00400 00401 00402 00403 00404 00405 00406 00407 def test(): 00408 pass 00409 00410 def usage(progname): 00411 print __doc__ % vars() 00412 00413 def main(argv, stdout, environ): 00414 progname = argv[0] 00415 optlist, args = getopt.getopt(argv[1:], "", ["help", "test", "debug"]) 00416 00417 testflag = 0 00418 00419 for (field, val) in optlist: 00420 if field == "--help": 00421 usage(progname) 00422 return 00423 elif field == "--debug": 00424 debugfull() 00425 elif field == "--test": 00426 testflag = 1 00427 00428 if testflag: 00429 test() 00430 return 00431 00432 global gDB 00433 gDB = setupDB("sqlite3") 00434 unittest.main() 00435 return 00436 00437 for arg in args: 00438 if arg == "sqlite2": 00439 TEST_SQLITE() 00440 elif arg == "sqlite3": 00441 TEST_SQLITE3() 00442 elif arg == "mysql": 00443 TEST_MYSQL() 00444 elif arg == "postgres": 00445 TEST_POSTGRES() 00446 00447 if __name__ == "__main__": 00448 main(sys.argv, sys.stdout, os.environ)