More adjustment to this SQLite related issue which was released in

0.7.9, to intercept legacy SQLite quoting characters when reflecting
foreign keys.  In addition to intercepting double quotes, other
quoting characters such as brackets, backticks, and single quotes
are now also intercepted. [ticket:2568]
This commit is contained in:
Mike Bayer 2012-12-14 10:32:56 -05:00
parent d64dba17bb
commit 3f72142df0
3 changed files with 75 additions and 31 deletions

View File

@ -27,6 +27,16 @@
to the MSSQL dialect's "schema rendering"
logic's failure to take .key into account.
.. change::
:tags: sqlite, bug
:tickets: 2568
More adjustment to this SQLite related issue which was released in
0.7.9, to intercept legacy SQLite quoting characters when reflecting
foreign keys. In addition to intercepting double quotes, other
quoting characters such as brackets, backticks, and single quotes
are now also intercepted.
.. change::
:tags: sql, bug
:tickets: 2631

View File

@ -550,6 +550,8 @@ class SQLiteDialect(default.DefaultDialect):
supports_cast = True
supports_default_values = True
_broken_fk_pragma_quotes = False
def __init__(self, isolation_level=None, native_datetime=False, **kwargs):
default.DefaultDialect.__init__(self, **kwargs)
self.isolation_level = isolation_level
@ -566,6 +568,12 @@ class SQLiteDialect(default.DefaultDialect):
self.supports_cast = \
self.dbapi.sqlite_version_info >= (3, 2, 3)
# see http://www.sqlalchemy.org/trac/ticket/2568
# as well as http://www.sqlite.org/src/info/600482d161
self._broken_fk_pragma_quotes = \
self.dbapi.sqlite_version_info < (3, 6, 14)
_isolation_lookup = {
'READ UNCOMMITTED':1,
'SERIALIZABLE':0
@ -772,7 +780,8 @@ class SQLiteDialect(default.DefaultDialect):
else:
pragma = "PRAGMA "
qtable = quote(table_name)
c = _pragma_cursor(connection.execute("%sforeign_key_list(%s)" % (pragma, qtable)))
statement = "%sforeign_key_list(%s)" % (pragma, qtable)
c = _pragma_cursor(connection.execute(statement))
fkeys = []
fks = {}
while True:
@ -780,37 +789,38 @@ class SQLiteDialect(default.DefaultDialect):
if row is None:
break
(numerical_id, rtbl, lcol, rcol) = (row[0], row[2], row[3], row[4])
# sqlite won't return rcol if the table
# was created with REFERENCES <tablename>, no col
if rcol is None:
rcol = lcol
# see http://www.sqlalchemy.org/trac/ticket/2568
# as well as http://www.sqlite.org/src/info/600482d161
if self.dbapi.sqlite_version_info < (3, 6, 14):
rtbl = re.sub(r'^\"|\"$', '', rtbl)
try:
fk = fks[numerical_id]
except KeyError:
fk = {
'name': None,
'constrained_columns' : [],
'referred_schema' : None,
'referred_table' : rtbl,
'referred_columns' : []
}
fkeys.append(fk)
fks[numerical_id] = fk
# look up the table based on the given table's engine, not 'self',
# since it could be a ProxyEngine
if lcol not in fk['constrained_columns']:
fk['constrained_columns'].append(lcol)
if rcol not in fk['referred_columns']:
fk['referred_columns'].append(rcol)
self._parse_fk(fks, fkeys, numerical_id, rtbl, lcol, rcol)
return fkeys
def _parse_fk(self, fks, fkeys, numerical_id, rtbl, lcol, rcol):
# sqlite won't return rcol if the table
# was created with REFERENCES <tablename>, no col
if rcol is None:
rcol = lcol
if self._broken_fk_pragma_quotes:
rtbl = re.sub(r'^[\"\[`\']|[\"\]`\']$', '', rtbl)
try:
fk = fks[numerical_id]
except KeyError:
fk = {
'name': None,
'constrained_columns': [],
'referred_schema': None,
'referred_table': rtbl,
'referred_columns': []
}
fkeys.append(fk)
fks[numerical_id] = fk
if lcol not in fk['constrained_columns']:
fk['constrained_columns'].append(lcol)
if rcol not in fk['referred_columns']:
fk['referred_columns'].append(rcol)
return fk
@reflection.cache
def get_indexes(self, connection, table_name, schema=None, **kw):
quote = self.identifier_preparer.quote_identifier

View File

@ -314,7 +314,7 @@ class DialectTest(fixtures.TestBase, AssertsExecutionResults):
meta.drop_all()
@testing.provide_metadata
def test_quoted_identifiers_one(self):
def test_quoted_identifiers_functional_one(self):
"""Tests autoload of tables created with quoted column names."""
metadata = self.metadata
@ -340,7 +340,7 @@ class DialectTest(fixtures.TestBase, AssertsExecutionResults):
== table2.c.id)
@testing.provide_metadata
def test_quoted_identifiers_two(self):
def test_quoted_identifiers_functional_two(self):
""""test the edgiest of edge cases, quoted table/col names
that start and end with quotes.
@ -375,6 +375,30 @@ class DialectTest(fixtures.TestBase, AssertsExecutionResults):
#assert j.onclause.compare(table1.c['"id"']
# == table2.c['"aid"'])
def test_legacy_quoted_identifiers_unit(self):
dialect = sqlite.dialect()
dialect._broken_fk_pragma_quotes = True
for row in [
(0, 'target', 'tid', 'id'),
(0, '"target"', 'tid', 'id'),
(0, '[target]', 'tid', 'id'),
(0, "'target'", 'tid', 'id'),
(0, '`target`', 'tid', 'id'),
]:
fks = {}
fkeys = []
dialect._parse_fk(fks, fkeys, *row)
eq_(fkeys, [{
'referred_table': 'target',
'referred_columns': ['id'],
'referred_schema': None,
'name': None,
'constrained_columns': ['tid']
}])
def test_attached_as_schema(self):
cx = testing.db.connect()
try: