odb_sqlite3.py
Go to the documentation of this file.
1 #! /usr/bin/env python
2 
3 """
4 usage: %(progname)s [args]
5 """
6 
7 
8 import os, sys, string, time, getopt
9 from log import *
10 
11 import types
12 
13 import odb
14 #import sqliterep as sqlite
15 import sqlite3
16 sqlite = sqlite3
17 
18 import _sqlite3
19 odb.OperationalError = _sqlite3.OperationalError
20 odb.DatabaseError = _sqlite3.DatabaseError
21 odb.DataError = _sqlite3.DataError
22 odb.IntegrityError = _sqlite3.IntegrityError
23 odb.NotSupportedError = _sqlite3.NotSupportedError
24 
26  def insert_id(self, tablename, colname):
27  return self.cursor.lastrowid
28 
29  def execute(self, sql):
30  try:
31  return self.cursor.execute(sql)
32  except _sqlite3.DatabaseError, reason:
33  reason = str(reason) + "(%s)" % sql
34  raise _sqlite3.DatabaseError, reason
35 
36 
37 
38  def begin(self):
39  pass
40 
41 
42 class Connection(odb.Connection):
43  def __init__(self, *args, **kwargs):
44  odb.Connection.__init__(self)
45 
46  try:
47  self._conn = apply(sqlite.connect, args, kwargs)
48  except:
49  warn("unable to open db", args)
50  raise
51 
52  self.SQLError = sqlite.Error
53 
54  def getConnType(self): return "sqlite"
55 
56  def cursor(self):
57  return Cursor(self._conn.cursor())
58 
59  def encode(self, str):
60  return sqlite3.encode(str)
61  def decode(self, str):
62  return sqlite3.decode(str)
63 
64  def escape(self,s):
65  if s is None:
66  return None
67  elif type(s) == types.StringType:
68  return string.replace(s,"'","''")
69  elif type(s) in (types.IntType, types.FloatType):
70  return s
71  elif type(s) == types.UnicodeType:
72  return str(s)
73  else:
74  warn("unknown column data type: <%s> value=%s" % (type(s), s[:100]))
75  return str(s)
76 
77 
78  def listTables(self, cursor):
79  cursor.execute("select name from sqlite_master where type='table'")
80  rows = cursor.fetchall()
81  tables = []
82  for row in rows: tables.append(row[0])
83  return tables
84 
85  def supportsTriggers(self): return True
86 
87  def listTriggers(self, cursor):
88  cursor.execute("select name from sqlite_master where type='trigger'")
89  rows = cursor.fetchall()
90  tables = []
91  for row in rows: tables.append(row[0])
92  return tables
93 
94  def listIndices(self, tableName, cursor):
95  cursor.execute("select name from sqlite_master where type='index'")
96  rows = cursor.fetchall()
97  tables = []
98  for row in rows:
99  if row[0].find("sqlite_autoindex_") != -1:
100  continue
101  tables.append(row[0])
102  return tables
103 
104  def listFieldsDict(self, table_name, cursor):
105  sql = "pragma table_info(%s)" % table_name
106  cursor.execute(sql)
107  rows = cursor.fetchall()
108 
109  columns = {}
110  for row in rows:
111  colname = row[1]
112  columns[colname] = row
113  return columns
114 
115  def _tableCreateStatement(self, table_name, cursor):
116  sql = "select sql from sqlite_master where type='table' and name='%s'" % table_name
117  print sql
118  cursor.execute(sql)
119  row = cursor.fetchone()
120  sqlstatement = row[0]
121  return sqlstatement
122 
123 
124  def alterTableToMatch(self, table, cursor):
125  tableName = table.getTableName()
126  debug("alterTableToMatch", tableName)
127  tmpTableName = tableName + "_" + str(os.getpid())
128 
129  invalidAppCols, invalidDBCols = table.checkTable(warnflag=0)
130 # warn(invalidAppCols, invalidDBCols)
131 
132 
133 ## if invalidAppCols or invalidDBCols:
134 ## return
135 
136  if not invalidAppCols and not invalidDBCols:
137  return
138 
139 
140  oldcols = self.listFieldsDict(tableName, cursor)
141 # tmpcols = oldcols.keys()
142 
143  tmpcols = []
144  newcols = table.getAppColumnList()
145  for colname, coltype, options in newcols:
146  if oldcols.has_key(colname): tmpcols.append(colname)
147 
148  tmpcolnames = string.join(tmpcols, ",")
149 
150  count = table.fetchRowCount()
151  warn("count for %s=" % table.getTableName(), count)
152 
153  statements = []
154 
155  #sql = "begin transaction"
156  #statements.append(sql)
157 
158 # sql = "create temporary table %s (%s)" % (tmpTableName, tmpcolnames)
159  sql = "create table %s (%s)" % (tmpTableName, tmpcolnames)
160  statements.append(sql)
161 
162  sql = "insert into %s select %s from %s" % (tmpTableName, tmpcolnames, tableName)
163  statements.append(sql)
164 
165  sql = "drop table %s" % tableName
166  statements.append(sql)
167 
168  sql = table._createTableSQL()
169  statements.append(sql)
170 
171  sql = "insert into %s(%s) select %s from %s" % (tableName, tmpcolnames, tmpcolnames, tmpTableName)
172  statements.append(sql)
173 
174  sql = "drop table %s" % tmpTableName
175  statements.append(sql)
176 
177  #sql = "commit"
178  #statements.append(sql)
179 
180  self.begin()
181  for statement in statements:
182  print statement
183  cursor.execute(statement)
184  self.commit()
185 
186  def auto_increment(self, coltype):
187  return coltype, ""
188 
189  def create_fullTextSearchTable(self, tableName, column_list):
190  defs = []
191  for colname, coltype, options in column_list:
192  if colname in ("rowid", "docid"): continue
193  defs.append(colname)
194  defs = string.join(defs, ", ")
195 
196  return "CREATE virtual TABLE %s using FTS3(%s)" % (tableName, defs)
197 
198 def test():
199  pass
200 
201 def usage(progname):
202  print __doc__ % vars()
203 
204 def main(argv, stdout, environ):
205  progname = argv[0]
206  optlist, args = getopt.getopt(argv[1:], "", ["help", "test", "debug"])
207 
208  testflag = 0
209  if len(args) == 0:
210  usage(progname)
211  return
212  for (field, val) in optlist:
213  if field == "--help":
214  usage(progname)
215  return
216  elif field == "--debug":
217  debugfull()
218  elif field == "--test":
219  testflag = 1
220 
221  if testflag:
222  test()
223  return
224 
225 
226 if __name__ == "__main__":
227  main(sys.argv, sys.stdout, os.environ)
def auto_increment(self, coltype)
Definition: odb_sqlite3.py:186
def insert_id(self, tablename, colname)
Definition: odb_sqlite3.py:26
def __init__(self, args, kwargs)
Definition: odb_sqlite3.py:43
def main(argv, stdout, environ)
Definition: odb_sqlite3.py:204
def listIndices(self, tableName, cursor)
Definition: odb_sqlite3.py:94
def listFieldsDict(self, table_name, cursor)
Definition: odb_sqlite3.py:104
def alterTableToMatch(self, table, cursor)
Definition: odb_sqlite3.py:124
def debugfull()
Definition: log.py:120
def create_fullTextSearchTable(self, tableName, column_list)
Definition: odb_sqlite3.py:189
def warn(args)
Definition: log.py:100
def _tableCreateStatement(self, table_name, cursor)
Definition: odb_sqlite3.py:115
def debug(args)
Definition: log.py:116


pyclearsilver
Author(s): Scott Noob Hassan
autogenerated on Mon Jun 10 2019 15:51:13