odb_sqlite3.py
Go to the documentation of this file.
00001 #! /usr/bin/env python
00002 
00003 """
00004 usage: %(progname)s [args]
00005 """
00006 
00007 
00008 import os, sys, string, time, getopt
00009 from log import *
00010 
00011 import types
00012 
00013 import odb
00014 #import sqliterep as sqlite
00015 import sqlite3
00016 sqlite = sqlite3
00017 
00018 import _sqlite3
00019 odb.OperationalError = _sqlite3.OperationalError
00020 odb.DatabaseError = _sqlite3.DatabaseError
00021 odb.DataError = _sqlite3.DataError
00022 odb.IntegrityError = _sqlite3.IntegrityError
00023 odb.NotSupportedError = _sqlite3.NotSupportedError
00024 
00025 class Cursor(odb.Cursor):
00026   def insert_id(self, tablename, colname):
00027     return self.cursor.lastrowid
00028 
00029   def execute(self, sql):
00030     try:
00031       return self.cursor.execute(sql)
00032     except _sqlite3.DatabaseError, reason:
00033       reason = str(reason) + "(%s)" % sql
00034       raise _sqlite3.DatabaseError, reason
00035     
00036       
00037 
00038   def begin(self):
00039     pass
00040 
00041 
00042 class Connection(odb.Connection):
00043   def __init__(self, *args, **kwargs):
00044     odb.Connection.__init__(self)
00045 
00046     try:
00047       self._conn = apply(sqlite.connect, args, kwargs)
00048     except:
00049       warn("unable to open db", args)
00050       raise
00051 
00052     self.SQLError = sqlite.Error
00053 
00054   def getConnType(self): return "sqlite"
00055     
00056   def cursor(self):
00057     return Cursor(self._conn.cursor())
00058 
00059   def encode(self, str):
00060     return sqlite3.encode(str)
00061   def decode(self, str):
00062     return sqlite3.decode(str)
00063 
00064   def escape(self,s):
00065     if s is None:
00066       return None
00067     elif type(s) == types.StringType:
00068       return string.replace(s,"'","''")
00069     elif type(s) in (types.IntType, types.FloatType):
00070       return s
00071     elif type(s) == types.UnicodeType:
00072       return str(s)
00073     else:
00074       warn("unknown column data type: <%s> value=%s" % (type(s), s[:100]))
00075       return str(s)
00076 
00077 
00078   def listTables(self, cursor):
00079     cursor.execute("select name from sqlite_master where type='table'")
00080     rows = cursor.fetchall()
00081     tables = []
00082     for row in rows: tables.append(row[0])
00083     return tables
00084 
00085   def supportsTriggers(self): return True
00086 
00087   def listTriggers(self, cursor):
00088     cursor.execute("select name from sqlite_master where type='trigger'")
00089     rows = cursor.fetchall()
00090     tables = []
00091     for row in rows: tables.append(row[0])
00092     return tables
00093 
00094   def listIndices(self, tableName, cursor):
00095     cursor.execute("select name from sqlite_master where type='index'")
00096     rows = cursor.fetchall()
00097     tables = []
00098     for row in rows: 
00099       if row[0].find("sqlite_autoindex_") != -1:
00100         continue
00101       tables.append(row[0])
00102     return tables
00103 
00104   def listFieldsDict(self, table_name, cursor):
00105     sql = "pragma table_info(%s)" % table_name
00106     cursor.execute(sql)
00107     rows = cursor.fetchall()
00108 
00109     columns = {}
00110     for row in rows:
00111       colname = row[1]
00112       columns[colname] = row
00113     return columns
00114 
00115   def _tableCreateStatement(self, table_name, cursor):
00116     sql = "select sql from sqlite_master where type='table' and name='%s'" % table_name
00117     print sql
00118     cursor.execute(sql)
00119     row = cursor.fetchone()
00120     sqlstatement = row[0]
00121     return sqlstatement
00122     
00123 
00124   def alterTableToMatch(self, table, cursor):
00125     tableName = table.getTableName()
00126     debug("alterTableToMatch", tableName)
00127     tmpTableName = tableName + "_" + str(os.getpid())
00128 
00129     invalidAppCols, invalidDBCols = table.checkTable(warnflag=0)
00130 #    warn(invalidAppCols, invalidDBCols)
00131          
00132 
00133 ##     if invalidAppCols or invalidDBCols:
00134 ##       return
00135 
00136     if not invalidAppCols and not invalidDBCols:
00137       return
00138 
00139 
00140     oldcols = self.listFieldsDict(tableName, cursor)
00141 #    tmpcols = oldcols.keys()
00142     
00143     tmpcols = []
00144     newcols = table.getAppColumnList()
00145     for colname, coltype, options in newcols:
00146       if oldcols.has_key(colname): tmpcols.append(colname)
00147     
00148     tmpcolnames = string.join(tmpcols, ",")
00149 
00150     count = table.fetchRowCount()
00151     warn("count for %s=" % table.getTableName(), count)
00152       
00153     statements = []
00154 
00155     #sql = "begin transaction"
00156     #statements.append(sql)
00157 
00158 #    sql = "create temporary table %s (%s)" % (tmpTableName, tmpcolnames)
00159     sql = "create table %s (%s)" % (tmpTableName, tmpcolnames)
00160     statements.append(sql)
00161 
00162     sql = "insert into %s select %s from %s" % (tmpTableName, tmpcolnames, tableName)
00163     statements.append(sql)
00164 
00165     sql = "drop table %s" % tableName
00166     statements.append(sql)
00167     
00168     sql = table._createTableSQL()
00169     statements.append(sql)
00170 
00171     sql = "insert into %s(%s) select %s from %s" % (tableName, tmpcolnames, tmpcolnames, tmpTableName)
00172     statements.append(sql)
00173 
00174     sql = "drop table %s" % tmpTableName
00175     statements.append(sql)
00176     
00177     #sql = "commit"
00178     #statements.append(sql)
00179 
00180     self.begin()
00181     for statement in statements:
00182       print statement
00183       cursor.execute(statement)
00184     self.commit()
00185 
00186   def auto_increment(self, coltype):
00187     return coltype, ""
00188 
00189   def create_fullTextSearchTable(self, tableName, column_list):
00190     defs = []
00191     for colname, coltype, options in column_list:
00192       if colname in ("rowid", "docid"): continue
00193       defs.append(colname)
00194     defs = string.join(defs, ", ")
00195     
00196     return "CREATE virtual TABLE %s using FTS3(%s)" % (tableName, defs)
00197 
00198 def test():
00199   pass
00200 
00201 def usage(progname):
00202   print __doc__ % vars()
00203 
00204 def main(argv, stdout, environ):
00205   progname = argv[0]
00206   optlist, args = getopt.getopt(argv[1:], "", ["help", "test", "debug"])
00207 
00208   testflag = 0
00209   if len(args) == 0:
00210     usage(progname)
00211     return
00212   for (field, val) in optlist:
00213     if field == "--help":
00214       usage(progname)
00215       return
00216     elif field == "--debug":
00217       debugfull()
00218     elif field == "--test":
00219       testflag = 1
00220 
00221   if testflag:
00222     test()
00223     return
00224 
00225 
00226 if __name__ == "__main__":
00227   main(sys.argv, sys.stdout, os.environ)


pyclearsilver
Author(s): Scott Hassan/hassan@willowgarage.com
autogenerated on Sat Dec 28 2013 17:47:30