#-*- coding: ISO-8859-1 -*- # sqlite3/_sqlite3.py: the jythonsqlite package. # # Copyright (C) 2007 Michael Dubner # # This file is part of jythonsqlite. # # This software is provided 'as-is', without any express or implied # warranty. In no event will the authors be held liable for any damages # arising from the use of this software. # # Permission is granted to anyone to use this software for any purpose, # including commercial applications, and to alter it and redistribute it # freely, subject to the following restrictions: # # 1. The origin of this software must not be misrepresented; you must not # claim that you wrote the original software. If you use this software # in a product, an acknowledgment in the product documentation would be # appreciated but is not required. # 2. Altered source versions must be plainly marked as such, and must not be # misrepresented as being the original software. # 3. This notice may not be removed or altered from any source distribution. import re as _re import thread as _thread from weakref import ref as _ref #from SQLite import Vm as Statement #it is broken for some reason from SQLite import Database as _Database from SQLite import Trace as _Trace from SQLite import Callback as _Callback from SQLite import Authorizer as _Authorizer from SQLite import Function as _Function from SQLite import Exception as _Exception from SQLite.Constants import * version = "1.0" sqlite_version = _Database.version() DEFAULT_TIMEOUT = 5.0 PARSE_DECLTYPES = 1 PARSE_COLNAMES = 2 if SQLITE_DETACH == 25: SQLITE_ALTER_TABLE = 26 SQLITE_REINDEX = 27 SQLITE_ANALYZE = 28 STATEMENT_INVALID = 0 STATEMENT_INSERT = 1 STATEMENT_DELETE = 2 STATEMENT_UPDATE = 3 STATEMENT_REPLACE = 4 STATEMENT_SELECT = 5 STATEMENT_OTHER = 6 class Error(StandardError): pass class Warning(StandardError): pass class InterfaceError(Error): pass class DatabaseError(Error): pass class InternalError(DatabaseError): pass class OperationalError(DatabaseError): pass class ProgrammingError(DatabaseError): pass class IntegrityError(DatabaseError): pass class DataError(DatabaseError): pass class NotSupportedError(DatabaseError): pass class PrepareProtocol(object): pass _enable_callback_tracebacks = 0 converters = {} adapters = {} class Connection(object): """SQLite database connection object.""" class _TraceDebug(_Trace): def trace(self, stmt): print '+', stmt _trace_debug = _TraceDebug() class _UserFunction(_Function): def __init__(self, connection, func): self.func = func self.connection = connection self.agg = {} def function(self, fc, args): args = [_cast_value(val, None, self.connection.text_factory) for val in args] try: res = self.func(*args) except Exception, err: #fc.set_error("user-defined function raised exception") raise OperationalError, "user-defined function raised exception" res = _prepare_args([res])[0] if res is not None: fc.set_result(res) def step(self, fc, args): agg = self.agg.get(id(fc)) if agg is None: try: agg = self.func() except Exception, err: #fc.set_error("user-defined function raised exception") raise OperationalError, "user-defined function raised exception" self.agg[id(fc)] = agg args = [_cast_value(val, None, self.connection.text_factory) for val in args] try: agg.step(*args) except Exception, err: #fc.set_error("user-defined function raised exception") raise OperationalError, "user-defined function raised exception" def last_step(self, fc): agg = self.agg[id(fc)] try: res = agg.finalize(*args) except Exception, err: #fc.set_error("user-defined function raised exception") raise OperationalError, "user-defined function raised exception" res = _prepare_args([res])[0] if res is not None: fc.set_result(res) class _UserAuthorizer(_Authorizer): def __init__(self, connection, func): self.func = func self.connection = connection def authorize(self, action, arg1, arg2, dbname, source): try: return self.func(action, arg1, arg2, dbname, source) #if self.func(action, arg1, arg2, dbname, source) == SQLITE_DENY: # raise DatabaseError, "prohibited" except Exception, err: raise OperationalError, "user-defined function raised exception" def __init__(self, database=":memory:", timeout=DEFAULT_TIMEOUT, detect_types=0, isolation_level="", check_same_thread=1, cached_statements=100): self.row_factory = None self.text_factory = unicode self._isolation_level = None self._begin_statement = None self._statement_cache = None self._statements = None self._timeout = timeout self._detect_types = detect_types try: self.db = _Database() self.db.open(database, 0666) self.db.set_encoding('utf-8') self.db.busy_timeout(int(self._timeout*1000)) self.db.exec('pragma show_datatypes', None) #self.db.trace(self._trace_debug) except _Exception, err: raise OperationalError, err err = self.db.last_error() if err: raise DatabaseError, "self.db.last_error() = "+str(err) self.isolation_level = isolation_level self._statement_cache = Cache(self, cached_statements) self._cursors = [] self._inTransaction = 0 self._user_functions = {} self._thread_ident = _thread.get_ident() self._check_same_thread = check_same_thread self._function_pinboard = {} self._collations = {} self._total_changes = 0 #for strict compatibility self.Warning = Warning self.Error = Error self.InterfaceError = InterfaceError self.DatabaseError = DatabaseError self.DataError = DataError self.OperationalError = OperationalError self.IntegrityError = IntegrityError self.InternalError = InternalError self.ProgrammingError = ProgrammingError self.NotSupportedError = NotSupportedError def _flush_statement_cache(self): self._statement_cache = Cache(self) def _check_connection(self): if self.db is None: raise ProgrammingError, "Cannot operate on a closed database." def _check_thread(self): self._check_connection() if self._check_same_thread and _thread.get_ident() != self._thread_ident: raise ProgrammingError, \ "SQLite objects created in a thread can only be used in that same thread."+ \ "The object was created in thread id %d and this is thread id %d" % \ (self._thread_ident, _thread.get_ident()) def _begin(self): if self._begin_statement: self.db.exec(self._begin_statement, None) self._inTransaction = 1 def __del__(self): db = self.db if db is None: return self._statement_cache = None self.db = None db.close() def __call__(self, sql): self._check_thread() return self.db.compile(sql) def cursor(self, factory=None, row_factory=None): """Return a cursor for the connection.""" self._check_thread() if factory is None: factory = Cursor if row_factory is None: row_factory = self.row_factory cursor = factory(self, row_factory=row_factory) self._cursors.append(_ref(cursor)) return cursor def _reset_all_statements(self): self._cursors = [c for c in self._cursors if c() is not None] for c in self._cursors: c = c() if c is not None and hasattr(c, '_reset'): c._reset() def close(self): """Closes the connection.""" db = self.db if db is None: return self._check_thread() self._reset_all_statements() self._statement_cache = None self.db = None return db.close() def commit(self): """Commit the current transaction.""" self._check_thread() if self._inTransaction: self._reset_all_statements() self.db.exec("COMMIT", None) self._inTransaction = 0 def rollback(self): """Roll back the current transaction.""" self._check_thread() if self._inTransaction: self._reset_all_statements() self.db.exec("ROLLBACK", None) self._inTransaction = 0 def create_function(self, name, nargs, func): """Creates a new function. Non-standard.""" if func is not None and not callable(func): raise TypeError, "parameter must be callable" self._check_thread() key = name,nargs if func is None: if self._user_functions.has_key(key): self._user_functions[key].connection = None self._user_functions[key].func = None del self._user_functions[key] else: if self._user_functions.has_key(key): self._user_functions[key].connection = None self._user_functions[key].func = None func = self._user_functions[key] = self._UserFunction(self, func) try: return self.db.create_function(name, nargs, func) except _Exception, err: dberr = self.db.last_error() raise OperationalError, \ "SQLite.Exception: create_function: name=%s, nargs=%d, func=%s, errcode=%d, errtext=%s, errdetails=%s" % \ (name, nargs, func, dberr, self.db.error_string(dberr), err.getMessage()) def create_aggregate(self, name, nargs, func): """Creates a new aggregate. Non-standard.""" if func is not None and not callable(func): raise TypeError, "parameter must be callable" self._check_thread() key = name,nargs if func is None: if self._user_functions.has_key(key): self._user_functions[key].connection = None self._user_functions[key].func = None del self._user_functions[key] else: if self._user_functions.has_key(key): self._user_functions[key].connection = None self._user_functions[key].func = None func = self._user_functions[key] = self._UserFunction(self, func) try: return self.db.create_aggregate(name, nargs, func) except _Exception, err: dberr = self.db.last_error() raise OperationalError, \ "SQLite.Exception: create_aggregate: name=%s, nargs=%d, func=%s, errcode=%d, errtext=%s, errdetails=%s" % \ (name, nargs, func, dberr, self.db.error_string(dberr), err.getMessage()) def create_collation(self, name, func): """Creates a collation function. Non-standard.""" if func is not None and not callable(func): raise TypeError, "parameter must be callable" self._check_thread() raise NotImplementedError, "Connection.create_collation" return self.db.create_collation(name, func) def set_authorizer(self, func): """Sets authorizer callback. Non-standard.""" self._check_thread() key = None if func is None: if self._user_functions.has_key(key): self._user_functions[key].connection = None self._user_functions[key].func = None del self._user_functions[key] else: if self._user_functions.has_key(key): self._user_functions[key].connection = None self._user_functions[key].func = None func = self._user_functions[key] = self._UserAuthorizer(self, func) try: return self.db.set_authorizer(func) except _Exception, err: dberr = self.db.last_error() raise OperationalError, \ "SQLite.Exception: set_authorizer: name=%s, nargs=%d, func=%s, errcode=%d, errtext=%s, errdetails=%s" % \ (name, nargs, func, dberr, self.db.error_string(dberr), err.getMessage()) def execute(self, sql, args=None): """Executes a SQL statement. Non-standard.""" return self.cursor().execute(sql, args) def executemany(self, sql, args=None): """Repeatedly executes a SQL statement. Non-standard.""" return self.cursor().executemany(sql, args) def executescript(self, sql, args=None): """Executes a multiple SQL statements at once. Non-standard.""" return self.cursor().executescript(sql, args) def interrupt(self): """Abort any pending database operation. Non-standard.""" self._check_connection() return self.db.interrupt() def _get_total_changes(self): self._check_connection() return self._total_changes def _set_isolation_level(self, new_isolation_level): self._isolation_level = new_isolation_level if new_isolation_level is not None: self._begin_statement = "BEGIN "+new_isolation_level else: self.commit() self._begin_statement = None def _get_isolation_level(self): return self._isolation_level def _get_dbversion(self): self._check_connection() return self.db.dbversion() total_changes = property(_get_total_changes) isolation_level = property(_get_isolation_level, _set_isolation_level) dbversion = property(_get_dbversion) class Cursor: """SQLite database cursor class.""" class Callback(_Callback): def __init__(self, cursor): self.cursor = cursor def columns(self, coldata): if self.cursor._next_row is not None: raise InternalError, "Previous data not cleaned" self.cursor._last_columns = tuple(coldata) self.cursor._last_types = None def types(self, types): if self.cursor._last_columns is None: raise InternalError, "columns not called" if self.cursor._next_row is not None: raise InternalError, "Previous data not cleaned" self.cursor._last_types = tuple(types) def newrow(self, rowdata): if self.cursor._last_columns is None: raise InternalError, "columns not called" if self.cursor._next_row is not None: raise InternalError, "Previous data not cleaned" if self.cursor.description is None: self.cursor._build_description() self.cursor._build_row_cast_map() if len(self.cursor.description) != len(rowdata): raise InternalError, "Invalid row length" row = [] for i in xrange(len(rowdata)): val = rowdata[i] coltype = None if self.cursor._last_types is not None: coltype = self.cursor._last_types[i] if coltype: coltype = coltype.strip().lower() if val is not None: if (self.cursor._row_cast_map is not None and self.cursor._row_cast_map[i] is not None): val = self.cursor._row_cast_map[i](val) else: val = _cast_value(val, coltype, self.cursor.connection.text_factory) row.append(val) row = tuple(row) self.cursor._next_row = row return 0 def __init__(self, connection, row_factory=None): self._statement = None self.connection = None if not isinstance(connection, Connection): raise TypeError, "Invalid connection instance passed to Cursor constructor" self.connection = connection self._row_cast_map = None self._last_columns = None self._last_types = None self._last_sql = None self._next_row = None self._on_eos = 1 # End Of Set (=last row prefetched) self.description = None self.lastrowid = None self.arraysize = 1 self.rowcount = -1 self.row_factory = row_factory self._cb = self.Callback(self) self.connection._check_thread() def __del__(self): if self._statement is not None: self._statement.stop() self._statement = None self._on_eos = 1 self.connection = None def _reset(self): if self._statement is not None: self._statement.stop() self._statement = None self._on_eos = 1 def _build_description(self): self.description = [] if self._last_columns is None or self._last_types is None: raise InternalError, "Different number of types and names of columns %s !~ %s (%s)" % \ (repr(self._last_columns), repr(self._last_types), self._last_sql) if len(self._last_columns) != len(self._last_types): raise InternalError, "Different number of types and names of columns %s !~ %s (%s)" % \ (repr(self._last_columns), repr(self._last_types), self._last_sql) for i in xrange(len(self._last_columns)): colname = self._last_columns[i] if colname is not None: colname = colname.split('[',1)[0].strip() coltype = self._last_types[i] self.description.append((colname,coltype,None,None,None,None,None)) def _build_row_cast_map(self): if not self.connection._detect_types: return if self._last_columns is None or self._last_types is None: raise InternalError, "Different number of types and names of columns %s !~ %s (%s)" % \ (repr(self._last_columns), repr(self._last_types), self._last_sql) if len(self._last_columns) != len(self._last_types): raise InternalError, "Different number of types and names of columns %s !~ %s (%s)" % \ (repr(self._last_columns), repr(self._last_types), self._last_sql) self._row_cast_map = [] for i in xrange(len(self._last_columns)): converter = None if self.connection._detect_types & PARSE_COLNAMES: colname = self._last_columns[i] if '[' in colname and colname[-1:] == ']': key = colname.split('[',1)[1][:-1] converter = _get_converter(key) if converter is None and self.connection._detect_types & PARSE_DECLTYPES: coltype = self._last_types[i] if coltype is not None: converter = _get_converter(coltype) self._row_cast_map.append(converter) def _check_cursor(self): if self.connection is None: raise ProgrammingError, "Cannot operate on a closed cursor" self.connection._check_thread() def execute(self, sql, args=None): """Executes a SQL statement.""" if type(sql) not in (type(''),type(u'')): raise ValueError, "sql must be str or unicode not "+str(type(sql)) self._check_cursor() if args: args = _prepare_args(args) sql = _substitute_args(sql, args) if _count_statements(sql)>1: raise Warning, "You can only execute one statement at a time." #initialize attributes self._last_columns = None self._last_types = None self.description = None self._next_row = None self._reset() self.rowcount = -1 #process transaction sqltype = _detect_statement_type(sql) if sqltype in (STATEMENT_UPDATE,STATEMENT_DELETE,STATEMENT_INSERT,STATEMENT_REPLACE): if not self.connection._inTransaction: self.connection._begin() elif sqltype == STATEMENT_OTHER: if self.connection._inTransaction: self.connection.commit() #compile SQL statement import java.lang try: self._last_sql = sql self._statement = self.connection._statement_cache.get(sql) dberr = self.connection.db.last_error() if dberr: raise DatabaseError, db.error_string(dberr) self._on_eos = not self._statement.step(self._cb) self.last_rowid = self.connection.db.last_insert_rowid() changes = self.connection.db.changes() except _Exception, err: db = self.connection.db dberr = db.last_error() raise OperationalError, \ "SQLite.Exception: execute.sql=%s, args=%s, errcode=%d, errtext=%s, errdetails=%s" % \ (sql, args, dberr, db.error_string(dberr), err.getMessage()) #except java.lang.ArrayIndexOutOfBoundsException, err: # raise ProgrammingError, err #except TypeError, err: # print 'Cursor.execute: TypeError:', repr(sql), repr(args) # raise if changes: self.connection._total_changes += changes self.rowcount = changes else: self.rowcount = 0 if self._next_row is not None: self.rowcount += 1 return self def executemany(self, sql, largs): """Repeatedly executes a SQL statement.""" if type(sql) not in (type(''),type(u'')): raise ValueError, "sql must be str or unicode not "+str(type(sql)) #initialize attributes self._check_cursor() self._last_columns = None self._last_types = None self.description = None self._next_row = None self._reset() self.rowcount = 0 #process transaction sqltype = _detect_statement_type(sql) if sqltype in (STATEMENT_UPDATE,STATEMENT_DELETE,STATEMENT_INSERT,STATEMENT_REPLACE): if not self.connection._inTransaction: self.connection._begin() elif sqltype == STATEMENT_OTHER: if self.connection._inTransaction: self.connection.commit() elif sqltype == STATEMENT_SELECT: raise ProgrammingError, "You cannot execute SELECT statements in executemany()." #loop over args for args in largs: args = _prepare_args(args) self._last_sql = _substitute_args(sql, args) try: self.connection.db.exec(self._last_sql, self._cb) changes = self.connection.db.changes() self.last_rowid = self.connection.db.last_insert_rowid() except _Exception, err: db = self.connection.db dberr = db.last_error() raise OperationalError, \ "SQLite.Exception: executemany.sql=%s, args=%s, errcode=%d, errtext=%s, errdetails=%s" % \ (sql, args, dberr, db.error_string(dberr), err.getMessage()) self.connection._total_changes += changes self.rowcount += changes return self def executescript(self, sql, args=None): """Executes a multiple SQL statements at once. Non-standard.""" if type(sql) not in (type(''),type(u'')): raise ValueError, "sql must be str or unicode not "+str(type(sql)) self._check_cursor() if args: args = _prepare_args(args) sql = _substitute_args(sql, args) #commit first as done on C version self.connection.commit() if not complete_statement(sql): raise ProgrammingError, "you did not provide a complete SQL statement" self.rowcount = -1 self._on_eos = 1 self._next_row = None self._last_columns = None self._last_types = None self.description = None self._last_sql = sql try: self.connection.db.exec(sql, None) changes = self.connection.db.changes() self.last_rowid = self.connection.db.last_insert_rowid() except _Exception, err: db = self.connection.db dberr = db.last_error() raise OperationalError, \ "SQLite.Exception: executescript.sql=%s, args=%s, errcode=%d, errtext=%s, errdetails=%s" % \ (sql, args, dberr, db.error_string(dberr), err.getMessage()) self.connection._total_changes += changes self.rowcount = changes return self def fetchone(self): """Fetches several rows from the resultset.""" self._check_cursor() if not self._next_row: return None next_row = self._next_row self._next_row = None if self.row_factory is not None: next_row = self.row_factory(self, next_row) if not self._on_eos: self._on_eos = not self._statement.step(self._cb) if self._next_row is not None: assert not self._on_eos self.rowcount += 1 else: assert self._on_eos while self._statement.compile(): self._last_columns = None self._last_types = None self.description = None self._on_eos = not self._statement.step(self._cb) if self._next_row is not None: assert not self._on_eos self.rowcount += 1 break else: assert self._on_eos return next_row def fetchmany(self, maxrows=None): """Fetches all rows from the resultset.""" if maxrows is None: maxrows = self.arraysize res = [] for i in xrange(maxrows): row = self.fetchone() if row is None: return res res.append(row) return res def fetchall(self): """Fetches one row from the resultset.""" res = [] row = self.fetchone() while row is not None: res.append(row) row = self.fetchone() return res def close(self): """Closes the cursor.""" if self.connection is None: return self._check_cursor() self._reset() self.connection = None def __iter__(self): return self def next(self): row = self.fetchone() if row is None: raise StopIteration return row def setinputsizes(self, sizes): """Required by DB-API. Does nothing in pysqlite.""" pass def setoutputsize(self, size, column=None): """Required by DB-API. Does nothing in pysqlite.""" pass class Cache: def __init__(self, factory, size=None): self.factory = factory def get(self, *args): return self.factory(*args) def display(self): print "No SQL caching due to pre-binding required" #No SQL caching due to pre-binding required #class Cache: # def __init__(self, factory, size=None): # self.factory = factory # if size is None: size = 10 # if type(size) != type(0): # raise TypeError, "type(size) must be int, not "+str(type(size)) # if size < 5: size = 5 # self.size = size # self._cache = {} # # def get(self, key): # if self._cache.has_key(key): # node = self._cache[key] # node[0] += 1 # else: # if len(self._cache) >= size: # items = self._cache.items() # if items: # items.sort(lambda a,b:cmp(a[1][0],b[1][0])) # del self._cache[items[0][0]] # node = [0, self.factory(key)] # self._cache[key] = node # return node[1] # # def display(self): # items = self._cache.items() # if items: # items.sort(lambda a,b:cmp(a[1][0],b[1][0])) # keys = zip(*items)[0] # keys.append(None) # prevkey = None # nextkey = keys[0] # for i in xrange(1,len(keys)): # key = nextkey # nextkey = keys[i] # print '%s <- %s ->%s' % (prevkey, key, nextkey) # prevkey = key class Row: def __init__(self, cursor, rowdata): self.description = cursor.description self.rowdata = rowdata def __len__(self): return len(self.rowdata) def __getitem__(self, index): if type(index) in (type(0),type(0l)): return self.rowdata[index] for i in xrange(len(self.description)): if index.lower() == self.description[i][0].lower(): return self.rowdata[i] raise KeyError, index def __repr__(self): return 'Row('+(', '.join([n[0]+'='+repr(v) for n,v in zip(self.description,self.rowdata)]))+')' OptimizedUnicode = unicode from jarray import array as _jarray class buffer: """Java array of characters - jarray.array(x, 'c')""" def __init__(self, x): self.val = _jarray(x, 'c') for attr in ['__cmp__', '__delattr__', '__delitem__', '__delslice__', '__eq__', '__finditem__', '__ge__', '__getitem__', '__getslice__', '__gt__', '__hash__', '__iter__', '__le__', '__len__', '__lt__', '__ne__', '__nonzero__', '__repr__', '__setattr__', '__setitem__', '__setslice__', '__str__', '__tojava__', 'append', 'array', 'byteswap', 'count', 'extend', 'fromfile', 'fromlist', 'fromstring', 'getItemsize', 'index', 'insert', 'itemsize', 'pop', 'remove', 'reverse', 'tofile', 'tolist', 'tostring']: setattr(self, attr, getattr(self.val, attr)) def __add__(self, b): if isinstance(b, buffer): return buffer(self.val+b.val) return buffer(self.val+b) def __mul__(self, b): if isinstance(b, buffer): return Binary(self.val*b.val) return buffer(self.val*b) def __rmul__(self, b): if isinstance(b, buffer): return buffer(b.val*self.val) return buffer(b*self.val) def zeros(self, l): return buffer(self.val.zeros(l)) def _detect_statement_type(sql): if not sql: return STATEMENT_INVALID sqlcmd = _sql_strip(sql)[:7].strip().lower() if not sqlcmd: return STATEMENT_INVALID elif sqlcmd == "select": return STATEMENT_SELECT elif sqlcmd == "insert": return STATEMENT_INSERT elif sqlcmd == "update": return STATEMENT_UPDATE elif sqlcmd == "delete": return STATEMENT_DELETE elif sqlcmd == "replace": return STATEMENT_REPLACE return STATEMENT_OTHER def _sql_strip(sql): sql = sql.strip() sql = filter(None, sql.split('\n')) in_comment = 0 while sql: if in_comment: i = sql[0].find('*/') if i>=0: sql[0] = sql[0][i+2:] in_comment = 0 else: del sql[0] continue check = sql[0].strip() if check.startswith('--') or check.startswith('//'): del sql[0] elif check.startswith('/*'): del sql[0] in_comment = 1 else: break return '\n'.join(sql) def _count_statements(sql): start = 0 end = sql.find(';',start) count = 0 while end >= 0: if complete_statement(sql[start:end+1]) and _sql_strip(sql[start:end]): count += 1 start = end+1 end = sql.find(';',start) if _sql_strip(sql[start:]): count += 1 return count def _prepare_args(args): if type(args) == type({}): args = args.copy() l = args.items() else: args = list(args[:]) l = enumerate(args) for k,val in l: try: args[k] = val = adapt(val) except ProgrammingError: pass return args _bind = None def _get_bindings(sql): global _bind if _bind is None: _bind = _re.compile('(\?|:(?:[a-z_][a-z0-9_]*))', _re.I) b = _bind.split(sql) nob = [] for i in xrange(1,len(b),2): if not complete_statement(''.join(b[:i-1])+';'): nob.append(i) nob.reverse() for i in nob: b[i-1] = ''.join(b[i-1:i+2]) del b[i:i+2] assert len(b)%2 == 1, "Internal parsing error "+repr(b) return b def _quote(arg): if arg is None: return 'NULL' if type(arg) in (int,long,float): return str(arg) if type(arg) in (str,unicode): #if arg.isunicode(): arg = arg.encode('utf-8') return "'%s'" % (arg.replace("'","''"),) raise InterfaceError, "Unsupported/non-adaptable type "+str(type(arg)) def _substitute_args(sql, args): #if sql.isunicode(): sql = sql.encode('utf-8') b = _get_bindings(sql) if len(b) == 1: return sql keys = b[1::2] if not args: raise ProgrammingError, "Incorrect number of bindings supplied. The current statement uses %d, and there are %d supplied." % (len(keys),0) if keys[0] == '?': if filter(lambda s:s!='?', keys): raise ProgrammingError, "Incorrect statament. Both named and unnamed bindings used." if type(args) == type({}): raise ProgrammingError, "Incorrect bindings supplied. For unnamed bindings list must be passed." if len(keys) != len(args): raise ProgrammingError, "Incorrect number of bindings supplied. The current statement uses %d, and there are %d supplied." % (len(keys),len(args)) try: return sql.replace('%','%%').replace('?', '%s') % tuple(map(_quote, args)) except TypeError, err: raise ProgrammingError, str(err) else: if filter(lambda s:s=='?', keys): raise ProgrammingError, "Incorrect statament. Both named and unnamed bindings used." if type(args) != type({}): raise ProgrammingError, "Incorrect bindings supplied. For named bindings dictionary must be passed." unused = {} for k in args.keys(): unused[k] = 1 for i in xrange(len(b)): if i%2 == 1: k = b[i][1:] if not args.has_key(k): raise ProgrammingError, "You did not supply a value for binding :%s." % (k,) b[i] = _quote(args[k]) if unused.has_key(k): del unused[k] if unused: raise ProgrammingError, "There are unused named bindings: "+repr(unused.keys()) return ''.join(b) def _cast_value(val, coltype=None, text_factory=None): if coltype is None: if type(val) == type(''): try: val = int(val) except ValueError: pass if type(val) == type(''): try: val = long(val) except ValueError: pass if type(val) == type(''): try: val = float(val) except ValueError: pass if type(val) == type('') and text_factory is not None: val = val.decode('utf-8') #if text_factory is unicode: # val = val.decode('utf-8') #el if text_factory not in (str,unicode): val = text_factory(val) #print '_cast_value: result:', repr(val) return val if type(val) == type('') and coltype.find('int') >= 0: try: val = int(val) except ValueError: try: val = long(val) except ValueError: pass elif type(val) == type('') and ( coltype.find('number') >= 0 or coltype.find('float') >= 0 or coltype.find('decimal') >= 0 or coltype.find('double') >= 0 or coltype.find('real') >= 0 or coltype.find('numeric') >= 0): val = float(val) elif type(val) == type('') and coltype == 'binary': val = Binary(val) elif type(val) == type('') and text_factory is not None: val = val.decode('utf-8') #if text_factory is unicode or text_factory is None: # val = val.decode('utf-8') #el if text_factory not in (str,unicode): val = text_factory(val) return val def _get_converter(key): return converters.get(key.upper()) def connect(*args, **kwargs): """Creates a connection.""" factory = Connection if kwargs.has_key('factory'): if kwargs['factory'] is not None: factory = kwargs['factory'] del kwargs['factory'] return factory(*args, **kwargs) def complete_statement(statement): """Checks if a string contains a complete SQL statement. Non-standard.""" return _Database.complete(statement) def enable_shared_cache(do_enable): """Enable or disable shared cache mode for the calling thread. Experimental/Non-standard.""" raise NotImplementedError, "enable_shared_cache" def register_adapter(type_, adapter): """Registers an adapter with pysqlite's adapter registry. Non-standard.""" adapters[type_,PrepareProtocol] = adapter def register_converter(name, converter): """Registers a converter with pysqlite. Non-standard.""" converters[name.upper()] = converter def adapt(obj, proto=PrepareProtocol, alt=None): """adapt(obj, protocol, alternate) -> adapt obj to given protocol. Non-standard.""" type_ = type(obj) adapter = adapters.get((type_,proto)) if adapter is not None: return adapter(obj) if hasattr(proto, '__adapt__'): adapted = proto.__adapt__(obj) if adapted is not None: return adapted if hasattr(obj, '__conform__'): adapted = obj.__conform__(proto) if adapted is not None: return adapted raise ProgrammingError, "can't adapt" def enable_callback_tracebacks(do_enable): """Enable or disable callback functions throwing errors to stderr.""" _enable_callback_tracebacks = do_enable