test_odb.py
Go to the documentation of this file.
00001 #! /usr/bin/env python
00002 
00003 """
00004 usage: %(progname)s [args]
00005 """
00006 
00007 import os, sys, string, time, getopt
00008 from log import *
00009 
00010 import odb
00011 import unittest
00012 
00013 ## -----------------------------------------------------------------------
00014 ##                            T  E  S  T S
00015 ## -----------------------------------------------------------------------
00016 
00017         
00018 def TEST_MYSQL(output=warn):
00019     output("------ TESTING MySQLdb ---------")
00020     import odb_mysql
00021     conn = odb_mysql.Connection(host = 'localhost',user='root', passwd = '', db='test')
00022     db = odb.Database(conn)
00023     return db
00024 
00025 def TEST_SQLITE(output=warn):
00026     output("------ TESTING sqlite2 ----------")
00027     import odb_sqlite
00028     conn = odb_sqlite.Connection("/tmp/test2.db", autocommit=1)
00029     db = odb.Database(conn)
00030     return db
00031 
00032 def TEST_SQLITE3(output=warn):
00033     output("------ TESTING sqlite3 ----------")
00034     import odb_sqlite3
00035     conn = odb_sqlite3.Connection("/tmp/test3.db", autocommit=1)
00036     db = odb.Database(conn)
00037     return db
00038 
00039 def TEST_POSTGRES(output=warn):
00040     output("------ TESTING postgres ----------")
00041     import odb_postgres
00042     conn = odb_postgres.Connection(database="test")
00043     db = odb.Database(conn)
00044     return db
00045     
00046 def setupDB(dbType="sqlite3"):
00047     if dbType == "sqlite3":
00048       db = TEST_SQLITE3()
00049     elif dbType == "sqlite2":
00050       db = TEST_SQLITE()
00051     elif dbType == "mysql":
00052       db = TEST_MYSQL()
00053     elif dbType == "postgres":
00054       db = TEST_POSTGRES()
00055 
00056     db.enabledCompression()
00057 
00058     db.addTable("agents", "agents", AgentsTable)
00059     try: db.agents.dropTable()
00060     except: pass
00061 
00062     db.addTable("roles", "roles", TestTable)
00063     try: db.test.dropTable()
00064     except: pass
00065 
00066     db.createTables()
00067     db.createIndices()
00068     
00069     return db
00070 
00071 class AgentsTable(odb.Table):
00072     def _defineRows(self):
00073         self.d_addColumn("agent_id",odb.kInteger,None,primarykey = 1,autoincrement = 1)
00074         self.d_addColumn("login",odb.kVarString,200,notnull=1)
00075         self.d_addColumn("ext_email",odb.kVarString,200,notnull=1, indexed=1)
00076         self.d_addColumn("hashed_pw",odb.kVarString,20,notnull=1)
00077         self.d_addColumn("name",odb.kBigString,compress_ok=1)
00078         self.d_addColumn("auth_level",odb.kInteger,None)
00079         self.d_addColumn("ticket_count",odb.kIncInteger,None)
00080         self.d_addColumn("data",odb.kBlob,None, compress_ok=1)
00081         self.d_addValueColumn()
00082         self.d_addVColumn("columnA",odb.kInteger,None)
00083         self.d_addVColumn("columnB",odb.kInteger,None)
00084 
00085         self.d_hasMany("roles")
00086 
00087 class TestTable(odb.Table):
00088     def _defineRows(self):
00089         self.d_addColumn("oid",odb.kInteger,None,primarykey = 1,autoincrement = 1)
00090         self.d_addColumn("agent_id",odb.kInteger)
00091         self.d_addColumn("a",odb.kInteger)
00092         self.d_addColumn("b",odb.kInteger)
00093 
00094         self.d_belongsTo("agents", foreign_key="agent_id")
00095 
00096 
00097 
00098 class Test_Database(unittest.TestCase):
00099   def __init__(self, methodName='runTest', db=None):
00100     unittest.TestCase.__init__(self, methodName=methodName)
00101     self.db = db
00102     if self.db is None: self.db = gDB
00103 
00104     self.TEST_INSERT_COUNT = 5
00105 
00106   def setUp(self):
00107     self.db.agents.deleteAllRows()
00108 
00109   def test_fetchRow(self):
00110 
00111     # ---------------------------------------------------------------
00112     # make sure we can catch a missing row
00113 
00114     try:
00115       a_row = self.db.agents.fetchRow( ("agent_id", 1000) )
00116       self.fail("fetchRow eNoMatchingRows")
00117     except odb.eNoMatchingRows:
00118       pass
00119 
00120   def test_rowCreation(self):
00121     # --------------------------------------------------------------
00122     # create new rows and insert them
00123 
00124 
00125     for n in range(self.TEST_INSERT_COUNT):
00126       new_id = n + 1
00127 
00128       newrow = self.db.agents.newRow()
00129       newrow.name = "name #%d" % new_id
00130       newrow.login = "name%d" % new_id
00131       newrow.ext_email = "%d@name" % new_id
00132       newrow.hashed_pw = "hashedpw"
00133       newrow.save()
00134       msg = "new insert id (%d) does not match expected value (%d)" % (newrow.agent_id,new_id)
00135       self.assertEqual(newrow.agent_id,new_id, msg=msg)
00136 
00137 
00138   def test_fetchRow_One(self):
00139     # --------------------------------------------------------------
00140     # fetch one row
00141     row = self.db.agents.newRow()
00142     row.name = "name #1"
00143     row.login = "name #2"
00144     row.ext_email = "name #2"
00145     row.hashed_pw = "lsjdf"
00146     row.save()
00147     a_row = self.db.agents.fetchRow( ("agent_id", 1) )
00148 
00149     self.assertEqual(a_row.name, "name #1")
00150 
00151     return a_row
00152 
00153   def test_save(self):
00154     # ---------------------------------------------------------------
00155     # don't change and save it
00156     # (i.e. the "dummy cursor" string should never be called!)
00157     #
00158     a_row = self.test_fetchRow_One()
00159     try:
00160         a_row.save(cursor = "dummy cursor")
00161     except AttributeError, reason:
00162         self.fail("row tried to access cursor on save() when no changes were made!")
00163 
00164   def test_save(self):
00165     # ---------------------------------------------------------------
00166     # change, save, load, test
00167     
00168     a_row = self.test_fetchRow_One()
00169 
00170     a_row.auth_level = 10
00171     a_row.save()
00172     b_row = self.db.agents.fetchRow( ("agent_id", 1) )
00173     self.assertEqual(b_row.auth_level, 10,  "save and load failed")
00174 
00175   def misc(self):
00176     # ---------------------------------------------------------------
00177     # replace
00178 
00179     if 0:
00180       repl_row = self.db.agents.newRow(replace=1)
00181       repl_row.agent_id = a_row.agent_id
00182       repl_row.login = a_row.login + "-" + a_row.login
00183       repl_row.ext_email = "foo"
00184       repl_row.hashed_pw = "hashed_pw"
00185       repl_row.save()
00186 
00187       b_row = self.db.agents.fetchRow( ("agent_id", a_row.agent_id) )
00188       if b_row.login != repl_row.login:
00189           raise "replace failed"
00190 
00191     # --------------------------------------------------------------
00192     # access unknown attribute
00193   def test_unknownAttribute(self):
00194     a_row = self.test_fetchRow_One()
00195     try:
00196         a = a_row.UNKNOWN_ATTRIBUTE
00197         self.fail("unknown attribute")
00198     except AttributeError, reason:
00199         pass
00200     except odb.eNoSuchColumn, reason:
00201         pass
00202 
00203   def test_unknownAttribute2(self):
00204     a_row = self.test_fetchRow_One()
00205     try:
00206         a_row.UNKNOWN_ATTRIBUTE = 1
00207         self.fail("unknown attribute")
00208     except AttributeError, reason:
00209         pass
00210     except odb.eNoSuchColumn, reason:
00211         pass
00212 
00213     # --------------------------------------------------------------
00214     # access unknown dict item
00215 
00216   def test_unknownAttribute3(self):
00217     a_row = self.test_fetchRow_One()
00218     try:
00219         a = a_row["UNKNOWN_ATTRIBUTE"]
00220         self.fail("unknown attribute")
00221     except KeyError, reason:
00222         pass
00223     except odb.eNoSuchColumn, reason:
00224         pass
00225 
00226   def test_unknownAttribute3(self):
00227     a_row = self.test_fetchRow_One()
00228     try:
00229         a_row["UNKNOWN_ATTRIBUTE"] = 1
00230         self.fail("unknown attribute")
00231     except KeyError, reason:
00232         pass
00233     except odb.eNoSuchColumn, reason:
00234         pass
00235 
00236   def misc2(self):
00237     # --------------------------------------------------------------
00238     # use wrong data for column type
00239 
00240     if 0:
00241       try:
00242           a_row.agent_id = "this is a string"
00243           raise "test error"
00244       except eInvalidData, reason:
00245           pass
00246 
00247       output("PASSED! invalid data for column type")
00248 
00249     # --------------------------------------------------------------
00250     # fetch 1 rows
00251 
00252   def test_fetchRows(self):
00253     arow = self.test_fetchRow_One()
00254     rows = self.db.agents.fetchRows( ('agent_id', 1) )
00255     self.assertEqual(len(rows), 1)
00256 
00257     # --------------------------------------------------------------
00258     # fetch All rows
00259     
00260   def test_fetchRows(self):
00261     arow = self.test_rowCreation()
00262 
00263     rows = self.db.agents.fetchAllRows()
00264     self.assertEqual(len(rows), self.TEST_INSERT_COUNT)
00265 
00266 
00267   def test_deleteObject(self):
00268     # --------------------------------------------------------------
00269     # delete row object
00270 
00271     arow = self.test_fetchRow_One()
00272 
00273     row = self.db.agents.fetchRow( ('agent_id', 1) )
00274     row.delete()
00275     try:
00276         row = self.db.agents.fetchRow( ('agent_id', 1) )
00277         self.fail()
00278     except odb.eNoMatchingRows:
00279         pass
00280 
00281     # --------------------------------------------------------------
00282     # table deleteRow() call
00283   def test_deleteRow(self):
00284     self.test_rowCreation()
00285 
00286     row = self.db.agents.fetchRow( ('agent_id',2) )
00287     self.db.agents.deleteRow( ('agent_id', 2) )
00288     try:
00289         row = self.db.agents.fetchRow( ('agent_id',2) )
00290         self.fail()
00291     except odb.eNoMatchingRows:
00292         pass
00293 
00294     # --------------------------------------------------------------
00295   def test_datasize(self):
00296     self.test_rowCreation()
00297     row = self.db.agents.fetchRow( ('agent_id',3) )
00298     if row.databaseSizeForColumn('name') != len(row.name):
00299       self.fail()
00300 
00301   # --------------------------------------------------------------
00302   # table fetchRowUsingPrimaryKey
00303   def test_primarykey(self):
00304     self.test_rowCreation()
00305 
00306     row = self.db.agents.fetchRowUsingPrimaryKey(3)
00307     if row.agent_id != 3:
00308       self.fail()
00309       
00310   def test_lookup(self):
00311     self.test_rowCreation()
00312     row = self.db.agents.lookup(agent_id=3)
00313     if row.agent_id != 3:
00314       self.fail()
00315   
00316   def test_lookupCreate(self):
00317     row = self.db.agents.lookupCreate(agent_id=2)
00318     if row.isClean():
00319       self.fail()
00320     row.login = 1
00321     row.ext_email = "hassan@dotfunk.com"
00322     row.hashed_pw = "sdfj"
00323     row.save()
00324 
00325   def misc3(self):
00326     if 0:
00327       # --------------------------------------------------------------
00328       # test inc fields
00329       row = self.db.agents.newRow()
00330       new_id = 1092
00331       row.name = "name #%d" % new_id
00332       row.login = "name%d" % new_id
00333       row.ext_email = "%d@name" % new_id
00334       row.inc('ticket_count')
00335       row.hashed_pw = "hashed_pw"
00336       row.save()
00337       new_id = row.agent_id
00338 
00339       trow = self.db.agents.fetchRow( ('agent_id',new_id) )
00340       if trow.ticket_count != 1:
00341           raise "ticket_count didn't inc!", repr(trow.ticket_count)
00342 
00343       row.inc('ticket_count', count=2)
00344       row.save()
00345       trow = self.db.agents.fetchRow( ('agent_id',new_id) )
00346       if trow.ticket_count != 3:
00347           raise "ticket_count wrong, expected 3, got %d" % trow.ticket_count
00348 
00349       trow.inc('ticket_count')
00350       trow.save()
00351       if trow.ticket_count != 4:
00352           raise "ticket_count wrong, expected 4, got %d" % trow.ticket_count
00353 
00354 
00355   def test_virtualColumns(self):
00356     self.test_rowCreation()
00357 
00358     try:
00359       b_row = self.db.agents.newRow(replace=1)
00360     except odb.eNoMatchingRows:
00361       self.fail()
00362 
00363     b_row.login = "scott"
00364     b_row.ext_email = "scott@"
00365     b_row.hashed_pw = "lsdjf"
00366     b_row.columnA = "hello1"
00367     b_row.columnB = "hello2"
00368     b_row.save()
00369 
00370     c_row = self.db.agents.fetchRow( ("agent_id", b_row.agent_id) )
00371 
00372     self.assertEqual(c_row.columnA, "hello1")
00373     self.assertEqual(c_row.columnB, "hello2")
00374 
00375   def test_compression(self):
00376     self.test_rowCreation()
00377 
00378     try:
00379       b_row = self.db.agents.fetchRow( ("agent_id", 5) )
00380     except odb.eNoMatchingRows:
00381       self.fail()
00382 
00383     d = "hello\0hello" * 100
00384     b_row.data = d
00385     b_row.save()
00386 
00387     b_row = self.db.agents.fetchRow( ("agent_id", 5) )
00388     self.assertEqual(b_row.data, d)
00389 
00390   def test_relations(self):
00391     self.test_rowCreation()
00392 
00393     row = self.db.roles.newRow()
00394     row.agent_id = 2
00395     row.a = "1"
00396     row.b = "2"
00397     row.save()
00398 
00399     row2 = self.db.roles.lookup(oid=row.oid)
00400 
00401 
00402     
00403 
00404 
00405 
00406 
00407 def test():
00408   pass
00409 
00410 def usage(progname):
00411   print __doc__ % vars()
00412 
00413 def main(argv, stdout, environ):
00414   progname = argv[0]
00415   optlist, args = getopt.getopt(argv[1:], "", ["help", "test", "debug"])
00416 
00417   testflag = 0
00418 
00419   for (field, val) in optlist:
00420     if field == "--help":
00421       usage(progname)
00422       return
00423     elif field == "--debug":
00424       debugfull()
00425     elif field == "--test":
00426       testflag = 1
00427 
00428   if testflag:
00429     test()
00430     return
00431 
00432   global gDB
00433   gDB = setupDB("sqlite3")
00434   unittest.main()
00435   return
00436 
00437   for arg in args:
00438     if arg == "sqlite2":
00439       TEST_SQLITE()
00440     elif arg == "sqlite3":
00441       TEST_SQLITE3()
00442     elif arg == "mysql":
00443       TEST_MYSQL()
00444     elif arg == "postgres":
00445       TEST_POSTGRES()
00446 
00447 if __name__ == "__main__":
00448   main(sys.argv, sys.stdout, os.environ)


pyclearsilver
Author(s): Scott Hassan/hassan@willowgarage.com
autogenerated on Wed Apr 23 2014 10:35:42