From 3f72142df053a9e0e447c967455871d6744ed703 Mon Sep 17 00:00:00 2001 From: Mike Bayer Date: Fri, 14 Dec 2012 10:32:56 -0500 Subject: [PATCH] More adjustment to this SQLite related issue which was released in 0.7.9, to intercept legacy SQLite quoting characters when reflecting foreign keys. In addition to intercepting double quotes, other quoting characters such as brackets, backticks, and single quotes are now also intercepted. [ticket:2568] --- doc/build/changelog/changelog_07.rst | 10 ++++ lib/sqlalchemy/dialects/sqlite/base.py | 68 +++++++++++++++----------- test/dialect/test_sqlite.py | 28 ++++++++++- 3 files changed, 75 insertions(+), 31 deletions(-) diff --git a/doc/build/changelog/changelog_07.rst b/doc/build/changelog/changelog_07.rst index a4e6b442a..b62e86df6 100644 --- a/doc/build/changelog/changelog_07.rst +++ b/doc/build/changelog/changelog_07.rst @@ -27,6 +27,16 @@ to the MSSQL dialect's "schema rendering" logic's failure to take .key into account. + .. change:: + :tags: sqlite, bug + :tickets: 2568 + + More adjustment to this SQLite related issue which was released in + 0.7.9, to intercept legacy SQLite quoting characters when reflecting + foreign keys. In addition to intercepting double quotes, other + quoting characters such as brackets, backticks, and single quotes + are now also intercepted. + .. change:: :tags: sql, bug :tickets: 2631 diff --git a/lib/sqlalchemy/dialects/sqlite/base.py b/lib/sqlalchemy/dialects/sqlite/base.py index 6d114bcef..b45b4099c 100644 --- a/lib/sqlalchemy/dialects/sqlite/base.py +++ b/lib/sqlalchemy/dialects/sqlite/base.py @@ -550,6 +550,8 @@ class SQLiteDialect(default.DefaultDialect): supports_cast = True supports_default_values = True + _broken_fk_pragma_quotes = False + def __init__(self, isolation_level=None, native_datetime=False, **kwargs): default.DefaultDialect.__init__(self, **kwargs) self.isolation_level = isolation_level @@ -566,6 +568,12 @@ class SQLiteDialect(default.DefaultDialect): self.supports_cast = \ self.dbapi.sqlite_version_info >= (3, 2, 3) + # see http://www.sqlalchemy.org/trac/ticket/2568 + # as well as http://www.sqlite.org/src/info/600482d161 + self._broken_fk_pragma_quotes = \ + self.dbapi.sqlite_version_info < (3, 6, 14) + + _isolation_lookup = { 'READ UNCOMMITTED':1, 'SERIALIZABLE':0 @@ -772,7 +780,8 @@ class SQLiteDialect(default.DefaultDialect): else: pragma = "PRAGMA " qtable = quote(table_name) - c = _pragma_cursor(connection.execute("%sforeign_key_list(%s)" % (pragma, qtable))) + statement = "%sforeign_key_list(%s)" % (pragma, qtable) + c = _pragma_cursor(connection.execute(statement)) fkeys = [] fks = {} while True: @@ -780,37 +789,38 @@ class SQLiteDialect(default.DefaultDialect): if row is None: break (numerical_id, rtbl, lcol, rcol) = (row[0], row[2], row[3], row[4]) - # sqlite won't return rcol if the table - # was created with REFERENCES , no col - if rcol is None: - rcol = lcol - # see http://www.sqlalchemy.org/trac/ticket/2568 - # as well as http://www.sqlite.org/src/info/600482d161 - if self.dbapi.sqlite_version_info < (3, 6, 14): - rtbl = re.sub(r'^\"|\"$', '', rtbl) - - try: - fk = fks[numerical_id] - except KeyError: - fk = { - 'name': None, - 'constrained_columns' : [], - 'referred_schema' : None, - 'referred_table' : rtbl, - 'referred_columns' : [] - } - fkeys.append(fk) - fks[numerical_id] = fk - - # look up the table based on the given table's engine, not 'self', - # since it could be a ProxyEngine - if lcol not in fk['constrained_columns']: - fk['constrained_columns'].append(lcol) - if rcol not in fk['referred_columns']: - fk['referred_columns'].append(rcol) + self._parse_fk(fks, fkeys, numerical_id, rtbl, lcol, rcol) return fkeys + def _parse_fk(self, fks, fkeys, numerical_id, rtbl, lcol, rcol): + # sqlite won't return rcol if the table + # was created with REFERENCES , no col + if rcol is None: + rcol = lcol + + if self._broken_fk_pragma_quotes: + rtbl = re.sub(r'^[\"\[`\']|[\"\]`\']$', '', rtbl) + + try: + fk = fks[numerical_id] + except KeyError: + fk = { + 'name': None, + 'constrained_columns': [], + 'referred_schema': None, + 'referred_table': rtbl, + 'referred_columns': [] + } + fkeys.append(fk) + fks[numerical_id] = fk + + if lcol not in fk['constrained_columns']: + fk['constrained_columns'].append(lcol) + if rcol not in fk['referred_columns']: + fk['referred_columns'].append(rcol) + return fk + @reflection.cache def get_indexes(self, connection, table_name, schema=None, **kw): quote = self.identifier_preparer.quote_identifier diff --git a/test/dialect/test_sqlite.py b/test/dialect/test_sqlite.py index b7b61640e..d3aa61102 100644 --- a/test/dialect/test_sqlite.py +++ b/test/dialect/test_sqlite.py @@ -314,7 +314,7 @@ class DialectTest(fixtures.TestBase, AssertsExecutionResults): meta.drop_all() @testing.provide_metadata - def test_quoted_identifiers_one(self): + def test_quoted_identifiers_functional_one(self): """Tests autoload of tables created with quoted column names.""" metadata = self.metadata @@ -340,7 +340,7 @@ class DialectTest(fixtures.TestBase, AssertsExecutionResults): == table2.c.id) @testing.provide_metadata - def test_quoted_identifiers_two(self): + def test_quoted_identifiers_functional_two(self): """"test the edgiest of edge cases, quoted table/col names that start and end with quotes. @@ -375,6 +375,30 @@ class DialectTest(fixtures.TestBase, AssertsExecutionResults): #assert j.onclause.compare(table1.c['"id"'] # == table2.c['"aid"']) + def test_legacy_quoted_identifiers_unit(self): + dialect = sqlite.dialect() + dialect._broken_fk_pragma_quotes = True + + + for row in [ + (0, 'target', 'tid', 'id'), + (0, '"target"', 'tid', 'id'), + (0, '[target]', 'tid', 'id'), + (0, "'target'", 'tid', 'id'), + (0, '`target`', 'tid', 'id'), + ]: + fks = {} + fkeys = [] + dialect._parse_fk(fks, fkeys, *row) + eq_(fkeys, [{ + 'referred_table': 'target', + 'referred_columns': ['id'], + 'referred_schema': None, + 'name': None, + 'constrained_columns': ['tid'] + }]) + + def test_attached_as_schema(self): cx = testing.db.connect() try: