diff --git a/doc/build/changelog/changelog_07.rst b/doc/build/changelog/changelog_07.rst index a4e6b442a..b62e86df6 100644 --- a/doc/build/changelog/changelog_07.rst +++ b/doc/build/changelog/changelog_07.rst @@ -27,6 +27,16 @@ to the MSSQL dialect's "schema rendering" logic's failure to take .key into account. + .. change:: + :tags: sqlite, bug + :tickets: 2568 + + More adjustment to this SQLite related issue which was released in + 0.7.9, to intercept legacy SQLite quoting characters when reflecting + foreign keys. In addition to intercepting double quotes, other + quoting characters such as brackets, backticks, and single quotes + are now also intercepted. + .. change:: :tags: sql, bug :tickets: 2631 diff --git a/lib/sqlalchemy/dialects/sqlite/base.py b/lib/sqlalchemy/dialects/sqlite/base.py index 6d114bcef..b45b4099c 100644 --- a/lib/sqlalchemy/dialects/sqlite/base.py +++ b/lib/sqlalchemy/dialects/sqlite/base.py @@ -550,6 +550,8 @@ class SQLiteDialect(default.DefaultDialect): supports_cast = True supports_default_values = True + _broken_fk_pragma_quotes = False + def __init__(self, isolation_level=None, native_datetime=False, **kwargs): default.DefaultDialect.__init__(self, **kwargs) self.isolation_level = isolation_level @@ -566,6 +568,12 @@ class SQLiteDialect(default.DefaultDialect): self.supports_cast = \ self.dbapi.sqlite_version_info >= (3, 2, 3) + # see http://www.sqlalchemy.org/trac/ticket/2568 + # as well as http://www.sqlite.org/src/info/600482d161 + self._broken_fk_pragma_quotes = \ + self.dbapi.sqlite_version_info < (3, 6, 14) + + _isolation_lookup = { 'READ UNCOMMITTED':1, 'SERIALIZABLE':0 @@ -772,7 +780,8 @@ class SQLiteDialect(default.DefaultDialect): else: pragma = "PRAGMA " qtable = quote(table_name) - c = _pragma_cursor(connection.execute("%sforeign_key_list(%s)" % (pragma, qtable))) + statement = "%sforeign_key_list(%s)" % (pragma, qtable) + c = _pragma_cursor(connection.execute(statement)) fkeys = [] fks = {} while True: @@ -780,37 +789,38 @@ class SQLiteDialect(default.DefaultDialect): if row is None: break (numerical_id, rtbl, lcol, rcol) = (row[0], row[2], row[3], row[4]) - # sqlite won't return rcol if the table - # was created with REFERENCES , no col - if rcol is None: - rcol = lcol - # see http://www.sqlalchemy.org/trac/ticket/2568 - # as well as http://www.sqlite.org/src/info/600482d161 - if self.dbapi.sqlite_version_info < (3, 6, 14): - rtbl = re.sub(r'^\"|\"$', '', rtbl) - - try: - fk = fks[numerical_id] - except KeyError: - fk = { - 'name': None, - 'constrained_columns' : [], - 'referred_schema' : None, - 'referred_table' : rtbl, - 'referred_columns' : [] - } - fkeys.append(fk) - fks[numerical_id] = fk - - # look up the table based on the given table's engine, not 'self', - # since it could be a ProxyEngine - if lcol not in fk['constrained_columns']: - fk['constrained_columns'].append(lcol) - if rcol not in fk['referred_columns']: - fk['referred_columns'].append(rcol) + self._parse_fk(fks, fkeys, numerical_id, rtbl, lcol, rcol) return fkeys + def _parse_fk(self, fks, fkeys, numerical_id, rtbl, lcol, rcol): + # sqlite won't return rcol if the table + # was created with REFERENCES , no col + if rcol is None: + rcol = lcol + + if self._broken_fk_pragma_quotes: + rtbl = re.sub(r'^[\"\[`\']|[\"\]`\']$', '', rtbl) + + try: + fk = fks[numerical_id] + except KeyError: + fk = { + 'name': None, + 'constrained_columns': [], + 'referred_schema': None, + 'referred_table': rtbl, + 'referred_columns': [] + } + fkeys.append(fk) + fks[numerical_id] = fk + + if lcol not in fk['constrained_columns']: + fk['constrained_columns'].append(lcol) + if rcol not in fk['referred_columns']: + fk['referred_columns'].append(rcol) + return fk + @reflection.cache def get_indexes(self, connection, table_name, schema=None, **kw): quote = self.identifier_preparer.quote_identifier diff --git a/test/dialect/test_sqlite.py b/test/dialect/test_sqlite.py index b7b61640e..d3aa61102 100644 --- a/test/dialect/test_sqlite.py +++ b/test/dialect/test_sqlite.py @@ -314,7 +314,7 @@ class DialectTest(fixtures.TestBase, AssertsExecutionResults): meta.drop_all() @testing.provide_metadata - def test_quoted_identifiers_one(self): + def test_quoted_identifiers_functional_one(self): """Tests autoload of tables created with quoted column names.""" metadata = self.metadata @@ -340,7 +340,7 @@ class DialectTest(fixtures.TestBase, AssertsExecutionResults): == table2.c.id) @testing.provide_metadata - def test_quoted_identifiers_two(self): + def test_quoted_identifiers_functional_two(self): """"test the edgiest of edge cases, quoted table/col names that start and end with quotes. @@ -375,6 +375,30 @@ class DialectTest(fixtures.TestBase, AssertsExecutionResults): #assert j.onclause.compare(table1.c['"id"'] # == table2.c['"aid"']) + def test_legacy_quoted_identifiers_unit(self): + dialect = sqlite.dialect() + dialect._broken_fk_pragma_quotes = True + + + for row in [ + (0, 'target', 'tid', 'id'), + (0, '"target"', 'tid', 'id'), + (0, '[target]', 'tid', 'id'), + (0, "'target'", 'tid', 'id'), + (0, '`target`', 'tid', 'id'), + ]: + fks = {} + fkeys = [] + dialect._parse_fk(fks, fkeys, *row) + eq_(fkeys, [{ + 'referred_table': 'target', + 'referred_columns': ['id'], + 'referred_schema': None, + 'name': None, + 'constrained_columns': ['tid'] + }]) + + def test_attached_as_schema(self): cx = testing.db.connect() try: