ai_v/venv/Lib/site-packages/sqlalchemy/testing/suite/test_reflection.py
24024 af7c11d7f9 feat(api): 实现图像生成及后台同步功能
- 新增图像生成接口,支持试用、积分和自定义API Key模式
- 实现生成图片结果异步上传至MinIO存储,带重试机制
- 优化积分预扣除和异常退还逻辑,保障用户积分准确
- 添加获取生成历史记录接口,支持时间范围和分页
- 提供本地字典配置接口,支持模型、比例、提示模板和尺寸
- 实现图片批量上传接口,支持S3兼容对象存储

feat(admin): 增加管理员角色管理与权限分配接口

- 实现角色列表查询、角色创建、更新及删除功能
- 增加权限列表查询接口
- 实现用户角色分配接口,便于统一管理用户权限
- 增加系统字典增删查改接口,支持分类过滤和排序
- 权限控制全面覆盖管理接口,保证安全访问

feat(auth): 完善用户登录注册及权限相关接口与页面

- 实现手机号验证码发送及校验功能,保障注册安全
- 支持手机号注册、登录及退出接口,集成日志记录
- 增加修改密码功能,验证原密码后更新
- 提供动态导航菜单接口,基于权限展示不同菜单
- 实现管理界面路由及日志、角色、字典管理页面访问权限控制
- 添加系统日志查询接口,支持关键词和等级筛选

feat(app): 初始化Flask应用并配置蓝图与数据库

- 创建应用程序工厂,加载配置,初始化数据库和Redis客户端
- 注册认证、API及管理员蓝图,整合路由
- 根路由渲染主页模板
- 应用上下文中自动创建数据库表,保证运行环境准备完毕

feat(database): 提供数据库创建与迁移支持脚本

- 新增数据库创建脚本,支持自动检测是否已存在
- 添加数据库表初始化脚本,支持创建和删除所有表
- 实现RBAC权限初始化,包含基础权限和角色创建
- 新增字段手动修复脚本,添加用户API Key和积分字段
- 强制迁移脚本支持清理连接和修复表结构,初始化默认数据及角色分配

feat(config): 新增系统配置参数

- 配置数据库、Redis、Session和MinIO相关参数
- 添加AI接口地址及试用Key配置
- 集成阿里云短信服务配置及开发模式相关参数

feat(extensions): 初始化数据库、Redis和MinIO客户端

- 创建全局SQLAlchemy数据库实例和Redis客户端
- 配置基于boto3的MinIO兼容S3客户端

chore(logs): 添加示例系统日志文件

- 记录用户请求、验证码发送成功与失败的日志信息
2026-01-12 00:53:31 +08:00

3558 lines
117 KiB
Python

# testing/suite/test_reflection.py
# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
# <see AUTHORS file>
#
# This module is part of SQLAlchemy and is released under
# the MIT License: https://www.opensource.org/licenses/mit-license.php
# mypy: ignore-errors
import contextlib
import operator
import re
import sqlalchemy as sa
from .. import config
from .. import engines
from .. import eq_
from .. import eq_regex
from .. import expect_raises
from .. import expect_raises_message
from .. import expect_warnings
from .. import fixtures
from .. import is_
from ..provision import get_temp_table_name
from ..provision import temp_table_keyword_args
from ..schema import Column
from ..schema import Table
from ... import Boolean
from ... import DateTime
from ... import event
from ... import ForeignKey
from ... import func
from ... import Identity
from ... import inspect
from ... import Integer
from ... import MetaData
from ... import String
from ... import testing
from ... import types as sql_types
from ...engine import Inspector
from ...engine import ObjectKind
from ...engine import ObjectScope
from ...exc import NoSuchTableError
from ...exc import UnreflectableTableError
from ...schema import DDL
from ...schema import Index
from ...sql.elements import quoted_name
from ...sql.schema import BLANK_SCHEMA
from ...testing import ComparesIndexes
from ...testing import ComparesTables
from ...testing import is_false
from ...testing import is_none
from ...testing import is_true
from ...testing import mock
metadata, users = None, None
class OneConnectionTablesTest(fixtures.TablesTest):
@classmethod
def setup_bind(cls):
# TODO: when temp tables are subject to server reset,
# this will also have to disable that server reset from
# happening
if config.requirements.independent_connections.enabled:
from sqlalchemy import pool
return engines.testing_engine(
options=dict(poolclass=pool.StaticPool, scope="class"),
)
else:
return config.db
class HasTableTest(OneConnectionTablesTest):
__sparse_driver_backend__ = True
run_deletes = None
@classmethod
def define_tables(cls, metadata):
Table(
"test_table",
metadata,
Column("id", Integer, primary_key=True),
Column("data", String(50)),
)
if testing.requires.schemas.enabled:
Table(
"test_table_s",
metadata,
Column("id", Integer, primary_key=True),
Column("data", String(50)),
schema=config.test_schema,
)
if testing.requires.view_reflection:
cls.define_views(metadata)
if testing.requires.has_temp_table.enabled:
cls.define_temp_tables(metadata)
@classmethod
def define_views(cls, metadata):
query = "CREATE VIEW vv AS SELECT id, data FROM test_table"
event.listen(metadata, "after_create", DDL(query))
event.listen(metadata, "before_drop", DDL("DROP VIEW vv"))
if testing.requires.schemas.enabled:
query = (
"CREATE VIEW %s.vv AS SELECT id, data FROM %s.test_table_s"
% (
config.test_schema,
config.test_schema,
)
)
event.listen(metadata, "after_create", DDL(query))
event.listen(
metadata,
"before_drop",
DDL("DROP VIEW %s.vv" % (config.test_schema)),
)
@classmethod
def temp_table_name(cls):
return get_temp_table_name(
config, config.db, f"user_tmp_{config.ident}"
)
@classmethod
def define_temp_tables(cls, metadata):
kw = temp_table_keyword_args(config, config.db)
table_name = cls.temp_table_name()
user_tmp = Table(
table_name,
metadata,
Column("id", sa.INT, primary_key=True),
Column("name", sa.VARCHAR(50)),
**kw,
)
if (
testing.requires.view_reflection.enabled
and testing.requires.temporary_views.enabled
):
event.listen(
user_tmp,
"after_create",
DDL(
"create temporary view user_tmp_v as "
"select * from user_tmp_%s" % config.ident
),
)
event.listen(user_tmp, "before_drop", DDL("drop view user_tmp_v"))
def test_has_table(self):
with config.db.begin() as conn:
is_true(config.db.dialect.has_table(conn, "test_table"))
is_false(config.db.dialect.has_table(conn, "test_table_s"))
is_false(config.db.dialect.has_table(conn, "nonexistent_table"))
def test_has_table_cache(self, metadata):
insp = inspect(config.db)
is_true(insp.has_table("test_table"))
nt = Table("new_table", metadata, Column("col", Integer))
is_false(insp.has_table("new_table"))
nt.create(config.db)
try:
is_false(insp.has_table("new_table"))
insp.clear_cache()
is_true(insp.has_table("new_table"))
finally:
nt.drop(config.db)
@testing.requires.schemas
def test_has_table_schema(self):
with config.db.begin() as conn:
is_false(
config.db.dialect.has_table(
conn, "test_table", schema=config.test_schema
)
)
is_true(
config.db.dialect.has_table(
conn, "test_table_s", schema=config.test_schema
)
)
is_false(
config.db.dialect.has_table(
conn, "nonexistent_table", schema=config.test_schema
)
)
@testing.requires.schemas
def test_has_table_nonexistent_schema(self):
with config.db.begin() as conn:
is_false(
config.db.dialect.has_table(
conn, "test_table", schema="nonexistent_schema"
)
)
@testing.requires.views
def test_has_table_view(self, connection):
insp = inspect(connection)
is_true(insp.has_table("vv"))
@testing.requires.has_temp_table
def test_has_table_temp_table(self, connection):
insp = inspect(connection)
temp_table_name = self.temp_table_name()
is_true(insp.has_table(temp_table_name))
@testing.requires.has_temp_table
@testing.requires.view_reflection
@testing.requires.temporary_views
def test_has_table_temp_view(self, connection):
insp = inspect(connection)
is_true(insp.has_table("user_tmp_v"))
@testing.requires.views
@testing.requires.schemas
def test_has_table_view_schema(self, connection):
insp = inspect(connection)
is_true(insp.has_table("vv", config.test_schema))
class HasIndexTest(fixtures.TablesTest):
__sparse_driver_backend__ = True
__requires__ = ("index_reflection",)
@classmethod
def define_tables(cls, metadata):
tt = Table(
"test_table",
metadata,
Column("id", Integer, primary_key=True),
Column("data", String(50)),
Column("data2", String(50)),
)
Index("my_idx", tt.c.data)
if testing.requires.schemas.enabled:
tt = Table(
"test_table",
metadata,
Column("id", Integer, primary_key=True),
Column("data", String(50)),
schema=config.test_schema,
)
Index("my_idx_s", tt.c.data)
kind = testing.combinations("dialect", "inspector", argnames="kind")
def _has_index(self, kind, conn):
if kind == "dialect":
return lambda *a, **k: config.db.dialect.has_index(conn, *a, **k)
else:
return inspect(conn).has_index
@kind
def test_has_index(self, kind, connection, metadata):
meth = self._has_index(kind, connection)
assert meth("test_table", "my_idx")
assert not meth("test_table", "my_idx_s")
assert not meth("nonexistent_table", "my_idx")
assert not meth("test_table", "nonexistent_idx")
assert not meth("test_table", "my_idx_2")
assert not meth("test_table_2", "my_idx_3")
idx = Index("my_idx_2", self.tables.test_table.c.data2)
tbl = Table(
"test_table_2",
metadata,
Column("foo", Integer),
Index("my_idx_3", "foo"),
)
idx.create(connection)
tbl.create(connection)
try:
if kind == "inspector":
assert not meth("test_table", "my_idx_2")
assert not meth("test_table_2", "my_idx_3")
meth.__self__.clear_cache()
assert meth("test_table", "my_idx_2") is True
assert meth("test_table_2", "my_idx_3") is True
finally:
tbl.drop(connection)
idx.drop(connection)
@testing.requires.schemas
@kind
def test_has_index_schema(self, kind, connection):
meth = self._has_index(kind, connection)
assert meth("test_table", "my_idx_s", schema=config.test_schema)
assert not meth("test_table", "my_idx", schema=config.test_schema)
assert not meth(
"nonexistent_table", "my_idx_s", schema=config.test_schema
)
assert not meth(
"test_table", "nonexistent_idx_s", schema=config.test_schema
)
class BizarroCharacterTest(fixtures.TestBase):
__sparse_driver_backend__ = True
def column_names():
return testing.combinations(
("plainname",),
("(3)",),
("col%p",),
("[brack]",),
argnames="columnname",
)
def table_names():
return testing.combinations(
("plain",),
("(2)",),
("per % cent",),
("[brackets]",),
argnames="tablename",
)
@testing.variation("use_composite", [True, False])
@column_names()
@table_names()
@testing.requires.foreign_key_constraint_reflection
def test_fk_ref(
self, connection, metadata, use_composite, tablename, columnname
):
"""tests for #10275"""
tt = Table(
tablename,
metadata,
Column(columnname, Integer, key="id", primary_key=True),
test_needs_fk=True,
)
if use_composite:
tt.append_column(Column("id2", Integer, primary_key=True))
if use_composite:
Table(
"other",
metadata,
Column("id", Integer, primary_key=True),
Column("ref", Integer),
Column("ref2", Integer),
sa.ForeignKeyConstraint(["ref", "ref2"], [tt.c.id, tt.c.id2]),
test_needs_fk=True,
)
else:
Table(
"other",
metadata,
Column("id", Integer, primary_key=True),
Column("ref", ForeignKey(tt.c.id)),
test_needs_fk=True,
)
metadata.create_all(connection)
m2 = MetaData()
o2 = Table("other", m2, autoload_with=connection)
t1 = m2.tables[tablename]
assert o2.c.ref.references(t1.c[0])
if use_composite:
assert o2.c.ref2.references(t1.c[1])
@column_names()
@table_names()
@testing.requires.identity_columns
def test_reflect_identity(
self, tablename, columnname, connection, metadata
):
Table(
tablename,
metadata,
Column(columnname, Integer, Identity(), primary_key=True),
)
metadata.create_all(connection)
insp = inspect(connection)
eq_(insp.get_columns(tablename)[0]["identity"]["start"], 1)
@column_names()
@table_names()
@testing.requires.comment_reflection
def test_reflect_comments(
self, tablename, columnname, connection, metadata
):
Table(
tablename,
metadata,
Column("id", Integer, primary_key=True),
Column(columnname, Integer, comment="some comment"),
)
metadata.create_all(connection)
insp = inspect(connection)
eq_(insp.get_columns(tablename)[1]["comment"], "some comment")
class TempTableElementsTest(fixtures.TestBase):
__sparse_driver_backend__ = True
__requires__ = ("temp_table_reflection",)
@testing.fixture
def tablename(self):
return get_temp_table_name(
config, config.db, f"ident_tmp_{config.ident}"
)
@testing.requires.identity_columns
def test_reflect_identity(self, tablename, connection, metadata):
Table(
tablename,
metadata,
Column("id", Integer, Identity(), primary_key=True),
)
metadata.create_all(connection)
insp = inspect(connection)
eq_(insp.get_columns(tablename)[0]["identity"]["start"], 1)
@testing.requires.temp_table_comment_reflection
def test_reflect_comments(self, tablename, connection, metadata):
Table(
tablename,
metadata,
Column("id", Integer, primary_key=True),
Column("foobar", Integer, comment="some comment"),
)
metadata.create_all(connection)
insp = inspect(connection)
eq_(insp.get_columns(tablename)[1]["comment"], "some comment")
class QuotedNameArgumentTest(fixtures.TablesTest):
run_create_tables = "once"
__sparse_driver_backend__ = True
@classmethod
def define_tables(cls, metadata):
Table(
"quote ' one",
metadata,
Column("id", Integer),
Column("name", String(50)),
Column("data", String(50)),
Column("related_id", Integer),
sa.PrimaryKeyConstraint("id", name="pk quote ' one"),
sa.Index("ix quote ' one", "name"),
sa.UniqueConstraint(
"data",
name="uq quote' one",
),
sa.ForeignKeyConstraint(
["id"], ["related.id"], name="fk quote ' one"
),
sa.CheckConstraint("name != 'foo'", name="ck quote ' one"),
comment=r"""quote ' one comment""",
test_needs_fk=True,
)
if testing.requires.symbol_names_w_double_quote.enabled:
Table(
'quote " two',
metadata,
Column("id", Integer),
Column("name", String(50)),
Column("data", String(50)),
Column("related_id", Integer),
sa.PrimaryKeyConstraint("id", name='pk quote " two'),
sa.Index('ix quote " two', "name"),
sa.UniqueConstraint(
"data",
name='uq quote" two',
),
sa.ForeignKeyConstraint(
["id"], ["related.id"], name='fk quote " two'
),
sa.CheckConstraint("name != 'foo'", name='ck quote " two '),
comment=r"""quote " two comment""",
test_needs_fk=True,
)
Table(
"related",
metadata,
Column("id", Integer, primary_key=True),
Column("related", Integer),
test_needs_fk=True,
)
if testing.requires.view_column_reflection.enabled:
if testing.requires.symbol_names_w_double_quote.enabled:
names = [
"quote ' one",
'quote " two',
]
else:
names = [
"quote ' one",
]
for name in names:
query = "CREATE VIEW %s AS SELECT * FROM %s" % (
config.db.dialect.identifier_preparer.quote(
"view %s" % name
),
config.db.dialect.identifier_preparer.quote(name),
)
event.listen(metadata, "after_create", DDL(query))
event.listen(
metadata,
"before_drop",
DDL(
"DROP VIEW %s"
% config.db.dialect.identifier_preparer.quote(
"view %s" % name
)
),
)
def quote_fixtures(fn):
return testing.combinations(
("quote ' one",),
('quote " two', testing.requires.symbol_names_w_double_quote),
)(fn)
@quote_fixtures
def test_get_table_options(self, name):
insp = inspect(config.db)
if testing.requires.reflect_table_options.enabled:
res = insp.get_table_options(name)
is_true(isinstance(res, dict))
else:
with expect_raises(NotImplementedError):
insp.get_table_options(name)
@quote_fixtures
@testing.requires.view_column_reflection
def test_get_view_definition(self, name):
insp = inspect(config.db)
assert insp.get_view_definition("view %s" % name)
@quote_fixtures
def test_get_columns(self, name):
insp = inspect(config.db)
assert insp.get_columns(name)
@quote_fixtures
def test_get_pk_constraint(self, name):
insp = inspect(config.db)
assert insp.get_pk_constraint(name)
@quote_fixtures
@testing.requires.foreign_key_constraint_reflection
def test_get_foreign_keys(self, name):
insp = inspect(config.db)
assert insp.get_foreign_keys(name)
@quote_fixtures
@testing.requires.index_reflection
def test_get_indexes(self, name):
insp = inspect(config.db)
assert insp.get_indexes(name)
@quote_fixtures
@testing.requires.unique_constraint_reflection
def test_get_unique_constraints(self, name):
insp = inspect(config.db)
assert insp.get_unique_constraints(name)
@quote_fixtures
@testing.requires.comment_reflection
def test_get_table_comment(self, name):
insp = inspect(config.db)
assert insp.get_table_comment(name)
@quote_fixtures
@testing.requires.check_constraint_reflection
def test_get_check_constraints(self, name):
insp = inspect(config.db)
assert insp.get_check_constraints(name)
def _multi_combination(fn):
schema = testing.combinations(
None,
(
lambda: config.test_schema,
testing.requires.schemas,
),
argnames="schema",
)
scope = testing.combinations(
ObjectScope.DEFAULT,
ObjectScope.TEMPORARY,
ObjectScope.ANY,
argnames="scope",
)
kind = testing.combinations(
ObjectKind.TABLE,
ObjectKind.VIEW,
ObjectKind.MATERIALIZED_VIEW,
ObjectKind.ANY,
ObjectKind.ANY_VIEW,
ObjectKind.TABLE | ObjectKind.VIEW,
ObjectKind.TABLE | ObjectKind.MATERIALIZED_VIEW,
argnames="kind",
)
filter_names = testing.combinations(True, False, argnames="use_filter")
return schema(scope(kind(filter_names(fn))))
class ComponentReflectionTest(ComparesTables, OneConnectionTablesTest):
run_inserts = run_deletes = None
__sparse_driver_backend__ = True
@classmethod
def define_tables(cls, metadata):
cls.define_reflected_tables(metadata, None)
if testing.requires.schemas.enabled:
cls.define_reflected_tables(metadata, testing.config.test_schema)
@classmethod
def define_reflected_tables(cls, metadata, schema):
if schema:
schema_prefix = schema + "."
else:
schema_prefix = ""
if testing.requires.self_referential_foreign_keys.enabled:
parent_id_args = (
ForeignKey(
"%susers.user_id" % schema_prefix, name="user_id_fk"
),
)
else:
parent_id_args = ()
users = Table(
"users",
metadata,
Column("user_id", sa.INT, primary_key=True),
Column("test1", sa.CHAR(5), nullable=False),
Column("test2", sa.Float(), nullable=False),
Column("parent_user_id", sa.Integer, *parent_id_args),
sa.CheckConstraint(
"test2 > 0",
name="zz_test2_gt_zero",
comment="users check constraint",
),
sa.CheckConstraint("test2 <= 1000"),
schema=schema,
test_needs_fk=True,
)
Table(
"dingalings",
metadata,
Column("dingaling_id", sa.Integer, primary_key=True),
Column(
"address_id",
sa.Integer,
ForeignKey(
"%semail_addresses.address_id" % schema_prefix,
name="zz_email_add_id_fg",
comment="di fk comment",
),
),
Column(
"id_user",
sa.Integer,
ForeignKey("%susers.user_id" % schema_prefix),
),
Column("data", sa.String(30), unique=True),
sa.CheckConstraint(
"address_id > 0 AND address_id < 1000",
name="address_id_gt_zero",
),
sa.UniqueConstraint(
"address_id",
"dingaling_id",
name="zz_dingalings_multiple",
comment="di unique comment",
),
schema=schema,
test_needs_fk=True,
)
Table(
"email_addresses",
metadata,
Column("address_id", sa.Integer),
Column("remote_user_id", sa.Integer, ForeignKey(users.c.user_id)),
Column("email_address", sa.String(20), index=True),
sa.PrimaryKeyConstraint(
"address_id", name="email_ad_pk", comment="ea pk comment"
),
schema=schema,
test_needs_fk=True,
)
Table(
"comment_test",
metadata,
Column("id", sa.Integer, primary_key=True, comment="id comment"),
Column("data", sa.String(20), comment="data % comment"),
Column(
"d2",
sa.String(20),
comment=r"""Comment types type speedily ' " \ '' Fun!""",
),
Column("d3", sa.String(42), comment="Comment\nwith\rescapes"),
schema=schema,
comment=r"""the test % ' " \ table comment""",
)
Table(
"no_constraints",
metadata,
Column("data", sa.String(20)),
schema=schema,
comment="no\nconstraints\rhas\fescaped\vcomment",
)
if testing.requires.cross_schema_fk_reflection.enabled:
if schema is None:
Table(
"local_table",
metadata,
Column("id", sa.Integer, primary_key=True),
Column("data", sa.String(20)),
Column(
"remote_id",
ForeignKey(
"%s.remote_table_2.id" % testing.config.test_schema
),
),
test_needs_fk=True,
schema=config.db.dialect.default_schema_name,
)
else:
Table(
"remote_table",
metadata,
Column("id", sa.Integer, primary_key=True),
Column(
"local_id",
ForeignKey(
"%s.local_table.id"
% config.db.dialect.default_schema_name
),
),
Column("data", sa.String(20)),
schema=schema,
test_needs_fk=True,
)
Table(
"remote_table_2",
metadata,
Column("id", sa.Integer, primary_key=True),
Column("data", sa.String(20)),
schema=schema,
test_needs_fk=True,
)
if testing.requires.index_reflection.enabled:
Index("users_t_idx", users.c.test1, users.c.test2, unique=True)
Index(
"users_all_idx", users.c.user_id, users.c.test2, users.c.test1
)
if not schema:
# test_needs_fk is at the moment to force MySQL InnoDB
noncol_idx_test_nopk = Table(
"noncol_idx_test_nopk",
metadata,
Column("q", sa.String(5)),
test_needs_fk=True,
)
noncol_idx_test_pk = Table(
"noncol_idx_test_pk",
metadata,
Column("id", sa.Integer, primary_key=True),
Column("q", sa.String(5)),
test_needs_fk=True,
)
if (
testing.requires.indexes_with_ascdesc.enabled
and testing.requires.reflect_indexes_with_ascdesc.enabled
):
Index("noncol_idx_nopk", noncol_idx_test_nopk.c.q.desc())
Index("noncol_idx_pk", noncol_idx_test_pk.c.q.desc())
if testing.requires.view_column_reflection.enabled:
cls.define_views(metadata, schema)
if not schema and testing.requires.temp_table_reflection.enabled:
cls.define_temp_tables(metadata)
@classmethod
def temp_table_name(cls):
return get_temp_table_name(
config, config.db, f"user_tmp_{config.ident}"
)
@classmethod
def define_temp_tables(cls, metadata):
kw = temp_table_keyword_args(config, config.db)
table_name = cls.temp_table_name()
user_tmp = Table(
table_name,
metadata,
Column("id", sa.INT, primary_key=True),
Column("name", sa.VARCHAR(50)),
Column("foo", sa.INT),
# disambiguate temp table unique constraint names. this is
# pretty arbitrary for a generic dialect however we are doing
# it to suit SQL Server which will produce name conflicts for
# unique constraints created against temp tables in different
# databases.
# https://www.arbinada.com/en/node/1645
sa.UniqueConstraint("name", name=f"user_tmp_uq_{config.ident}"),
sa.Index("user_tmp_ix", "foo"),
**kw,
)
if (
testing.requires.view_reflection.enabled
and testing.requires.temporary_views.enabled
):
event.listen(
user_tmp,
"after_create",
DDL(
"create temporary view user_tmp_v as "
"select * from user_tmp_%s" % config.ident
),
)
event.listen(user_tmp, "before_drop", DDL("drop view user_tmp_v"))
@classmethod
def define_views(cls, metadata, schema):
if testing.requires.materialized_views.enabled:
materialized = {"dingalings"}
else:
materialized = set()
for table_name in ("users", "email_addresses", "dingalings"):
fullname = table_name
if schema:
fullname = f"{schema}.{table_name}"
view_name = fullname + "_v"
prefix = "MATERIALIZED " if table_name in materialized else ""
query = (
f"CREATE {prefix}VIEW {view_name} AS SELECT * FROM {fullname}"
)
event.listen(metadata, "after_create", DDL(query))
if table_name in materialized:
index_name = "mat_index"
if schema and testing.against("oracle"):
index_name = f"{schema}.{index_name}"
idx = f"CREATE INDEX {index_name} ON {view_name}(data)"
event.listen(metadata, "after_create", DDL(idx))
event.listen(
metadata, "before_drop", DDL(f"DROP {prefix}VIEW {view_name}")
)
def _resolve_kind(self, kind, tables, views, materialized):
res = {}
if ObjectKind.TABLE in kind:
res.update(tables)
if ObjectKind.VIEW in kind:
res.update(views)
if ObjectKind.MATERIALIZED_VIEW in kind:
res.update(materialized)
return res
def _resolve_views(self, views, materialized):
if not testing.requires.view_column_reflection.enabled:
materialized.clear()
views.clear()
elif not testing.requires.materialized_views.enabled:
views.update(materialized)
materialized.clear()
def _resolve_names(self, schema, scope, filter_names, values):
scope_filter = lambda _: True # noqa: E731
if scope is ObjectScope.DEFAULT:
scope_filter = lambda k: "tmp" not in k[1] # noqa: E731
if scope is ObjectScope.TEMPORARY:
scope_filter = lambda k: "tmp" in k[1] # noqa: E731
removed = {
None: {"remote_table", "remote_table_2"},
testing.config.test_schema: {
"local_table",
"noncol_idx_test_nopk",
"noncol_idx_test_pk",
"user_tmp_v",
self.temp_table_name(),
},
}
if not testing.requires.cross_schema_fk_reflection.enabled:
removed[None].add("local_table")
removed[testing.config.test_schema].update(
["remote_table", "remote_table_2"]
)
if not testing.requires.index_reflection.enabled:
removed[None].update(
["noncol_idx_test_nopk", "noncol_idx_test_pk"]
)
if (
not testing.requires.temp_table_reflection.enabled
or not testing.requires.temp_table_names.enabled
):
removed[None].update(["user_tmp_v", self.temp_table_name()])
if not testing.requires.temporary_views.enabled:
removed[None].update(["user_tmp_v"])
res = {
k: v
for k, v in values.items()
if scope_filter(k)
and k[1] not in removed[schema]
and (not filter_names or k[1] in filter_names)
}
return res
def exp_options(
self,
schema=None,
scope=ObjectScope.ANY,
kind=ObjectKind.ANY,
filter_names=None,
):
materialized = {(schema, "dingalings_v"): mock.ANY}
views = {
(schema, "email_addresses_v"): mock.ANY,
(schema, "users_v"): mock.ANY,
(schema, "user_tmp_v"): mock.ANY,
}
self._resolve_views(views, materialized)
tables = {
(schema, "users"): mock.ANY,
(schema, "dingalings"): mock.ANY,
(schema, "email_addresses"): mock.ANY,
(schema, "comment_test"): mock.ANY,
(schema, "no_constraints"): mock.ANY,
(schema, "local_table"): mock.ANY,
(schema, "remote_table"): mock.ANY,
(schema, "remote_table_2"): mock.ANY,
(schema, "noncol_idx_test_nopk"): mock.ANY,
(schema, "noncol_idx_test_pk"): mock.ANY,
(schema, self.temp_table_name()): mock.ANY,
}
res = self._resolve_kind(kind, tables, views, materialized)
res = self._resolve_names(schema, scope, filter_names, res)
return res
def exp_comments(
self,
schema=None,
scope=ObjectScope.ANY,
kind=ObjectKind.ANY,
filter_names=None,
):
empty = {"text": None}
materialized = {(schema, "dingalings_v"): empty}
views = {
(schema, "email_addresses_v"): empty,
(schema, "users_v"): empty,
(schema, "user_tmp_v"): empty,
}
self._resolve_views(views, materialized)
tables = {
(schema, "users"): empty,
(schema, "dingalings"): empty,
(schema, "email_addresses"): empty,
(schema, "comment_test"): {
"text": r"""the test % ' " \ table comment"""
},
(schema, "no_constraints"): {
"text": "no\nconstraints\rhas\fescaped\vcomment"
},
(schema, "local_table"): empty,
(schema, "remote_table"): empty,
(schema, "remote_table_2"): empty,
(schema, "noncol_idx_test_nopk"): empty,
(schema, "noncol_idx_test_pk"): empty,
(schema, self.temp_table_name()): empty,
}
res = self._resolve_kind(kind, tables, views, materialized)
res = self._resolve_names(schema, scope, filter_names, res)
return res
def exp_columns(
self,
schema=None,
scope=ObjectScope.ANY,
kind=ObjectKind.ANY,
filter_names=None,
):
def col(
name, auto=False, default=mock.ANY, comment=None, nullable=True
):
res = {
"name": name,
"autoincrement": auto,
"type": mock.ANY,
"default": default,
"comment": comment,
"nullable": nullable,
}
if auto == "omit":
res.pop("autoincrement")
return res
def pk(name, **kw):
kw = {"auto": True, "default": mock.ANY, "nullable": False, **kw}
return col(name, **kw)
materialized = {
(schema, "dingalings_v"): [
col("dingaling_id", auto="omit", nullable=mock.ANY),
col("address_id"),
col("id_user"),
col("data"),
]
}
views = {
(schema, "email_addresses_v"): [
col("address_id", auto="omit", nullable=mock.ANY),
col("remote_user_id"),
col("email_address"),
],
(schema, "users_v"): [
col("user_id", auto="omit", nullable=mock.ANY),
col("test1", nullable=mock.ANY),
col("test2", nullable=mock.ANY),
col("parent_user_id"),
],
(schema, "user_tmp_v"): [
col("id", auto="omit", nullable=mock.ANY),
col("name"),
col("foo"),
],
}
self._resolve_views(views, materialized)
tables = {
(schema, "users"): [
pk("user_id"),
col("test1", nullable=False),
col("test2", nullable=False),
col("parent_user_id"),
],
(schema, "dingalings"): [
pk("dingaling_id"),
col("address_id"),
col("id_user"),
col("data"),
],
(schema, "email_addresses"): [
pk("address_id"),
col("remote_user_id"),
col("email_address"),
],
(schema, "comment_test"): [
pk("id", comment="id comment"),
col("data", comment="data % comment"),
col(
"d2",
comment=r"""Comment types type speedily ' " \ '' Fun!""",
),
col("d3", comment="Comment\nwith\rescapes"),
],
(schema, "no_constraints"): [col("data")],
(schema, "local_table"): [pk("id"), col("data"), col("remote_id")],
(schema, "remote_table"): [pk("id"), col("local_id"), col("data")],
(schema, "remote_table_2"): [pk("id"), col("data")],
(schema, "noncol_idx_test_nopk"): [col("q")],
(schema, "noncol_idx_test_pk"): [pk("id"), col("q")],
(schema, self.temp_table_name()): [
pk("id"),
col("name"),
col("foo"),
],
}
res = self._resolve_kind(kind, tables, views, materialized)
res = self._resolve_names(schema, scope, filter_names, res)
return res
@property
def _required_column_keys(self):
return {"name", "type", "nullable", "default"}
def exp_pks(
self,
schema=None,
scope=ObjectScope.ANY,
kind=ObjectKind.ANY,
filter_names=None,
):
def pk(*cols, name=mock.ANY, comment=None):
return {
"constrained_columns": list(cols),
"name": name,
"comment": comment,
}
empty = pk(name=None)
if testing.requires.materialized_views_reflect_pk.enabled:
materialized = {(schema, "dingalings_v"): pk("dingaling_id")}
else:
materialized = {(schema, "dingalings_v"): empty}
views = {
(schema, "email_addresses_v"): empty,
(schema, "users_v"): empty,
(schema, "user_tmp_v"): empty,
}
self._resolve_views(views, materialized)
tables = {
(schema, "users"): pk("user_id"),
(schema, "dingalings"): pk("dingaling_id"),
(schema, "email_addresses"): pk(
"address_id", name="email_ad_pk", comment="ea pk comment"
),
(schema, "comment_test"): pk("id"),
(schema, "no_constraints"): empty,
(schema, "local_table"): pk("id"),
(schema, "remote_table"): pk("id"),
(schema, "remote_table_2"): pk("id"),
(schema, "noncol_idx_test_nopk"): empty,
(schema, "noncol_idx_test_pk"): pk("id"),
(schema, self.temp_table_name()): pk("id"),
}
if not testing.requires.reflects_pk_names.enabled:
for val in tables.values():
if val["name"] is not None:
val["name"] = mock.ANY
res = self._resolve_kind(kind, tables, views, materialized)
res = self._resolve_names(schema, scope, filter_names, res)
return res
@property
def _required_pk_keys(self):
return {"name", "constrained_columns"}
def exp_fks(
self,
schema=None,
scope=ObjectScope.ANY,
kind=ObjectKind.ANY,
filter_names=None,
):
class tt:
def __eq__(self, other):
return (
other is None
or config.db.dialect.default_schema_name == other
)
def fk(
cols,
ref_col,
ref_table,
ref_schema=schema,
name=mock.ANY,
comment=None,
):
return {
"constrained_columns": cols,
"referred_columns": ref_col,
"name": name,
"options": mock.ANY,
"referred_schema": (
ref_schema if ref_schema is not None else tt()
),
"referred_table": ref_table,
"comment": comment,
}
materialized = {(schema, "dingalings_v"): []}
views = {
(schema, "email_addresses_v"): [],
(schema, "users_v"): [],
(schema, "user_tmp_v"): [],
}
self._resolve_views(views, materialized)
tables = {
(schema, "users"): [
fk(["parent_user_id"], ["user_id"], "users", name="user_id_fk")
],
(schema, "dingalings"): [
fk(["id_user"], ["user_id"], "users"),
fk(
["address_id"],
["address_id"],
"email_addresses",
name="zz_email_add_id_fg",
comment="di fk comment",
),
],
(schema, "email_addresses"): [
fk(["remote_user_id"], ["user_id"], "users")
],
(schema, "comment_test"): [],
(schema, "no_constraints"): [],
(schema, "local_table"): [
fk(
["remote_id"],
["id"],
"remote_table_2",
ref_schema=config.test_schema,
)
],
(schema, "remote_table"): [
fk(["local_id"], ["id"], "local_table", ref_schema=None)
],
(schema, "remote_table_2"): [],
(schema, "noncol_idx_test_nopk"): [],
(schema, "noncol_idx_test_pk"): [],
(schema, self.temp_table_name()): [],
}
if not testing.requires.self_referential_foreign_keys.enabled:
tables[(schema, "users")].clear()
if not testing.requires.named_constraints.enabled:
for vals in tables.values():
for val in vals:
if val["name"] is not mock.ANY:
val["name"] = mock.ANY
res = self._resolve_kind(kind, tables, views, materialized)
res = self._resolve_names(schema, scope, filter_names, res)
return res
@property
def _required_fk_keys(self):
return {
"name",
"constrained_columns",
"referred_schema",
"referred_table",
"referred_columns",
}
def exp_indexes(
self,
schema=None,
scope=ObjectScope.ANY,
kind=ObjectKind.ANY,
filter_names=None,
):
def idx(
*cols,
name,
unique=False,
column_sorting=None,
duplicates=False,
fk=False,
):
fk_req = testing.requires.foreign_keys_reflect_as_index
dup_req = testing.requires.unique_constraints_reflect_as_index
sorting_expression = (
testing.requires.reflect_indexes_with_ascdesc_as_expression
)
if (fk and not fk_req.enabled) or (
duplicates and not dup_req.enabled
):
return ()
res = {
"unique": unique,
"column_names": list(cols),
"name": name,
"dialect_options": mock.ANY,
"include_columns": [],
}
if column_sorting:
res["column_sorting"] = column_sorting
if sorting_expression.enabled:
res["expressions"] = orig = res["column_names"]
res["column_names"] = [
None if c in column_sorting else c for c in orig
]
if duplicates:
res["duplicates_constraint"] = name
return [res]
materialized = {(schema, "dingalings_v"): []}
views = {
(schema, "email_addresses_v"): [],
(schema, "users_v"): [],
(schema, "user_tmp_v"): [],
}
self._resolve_views(views, materialized)
if materialized:
materialized[(schema, "dingalings_v")].extend(
idx("data", name="mat_index")
)
tables = {
(schema, "users"): [
*idx("parent_user_id", name="user_id_fk", fk=True),
*idx("user_id", "test2", "test1", name="users_all_idx"),
*idx("test1", "test2", name="users_t_idx", unique=True),
],
(schema, "dingalings"): [
*idx("data", name=mock.ANY, unique=True, duplicates=True),
*idx("id_user", name=mock.ANY, fk=True),
*idx(
"address_id",
"dingaling_id",
name="zz_dingalings_multiple",
unique=True,
duplicates=True,
),
],
(schema, "email_addresses"): [
*idx("email_address", name=mock.ANY),
*idx("remote_user_id", name=mock.ANY, fk=True),
],
(schema, "comment_test"): [],
(schema, "no_constraints"): [],
(schema, "local_table"): [
*idx("remote_id", name=mock.ANY, fk=True)
],
(schema, "remote_table"): [
*idx("local_id", name=mock.ANY, fk=True)
],
(schema, "remote_table_2"): [],
(schema, "noncol_idx_test_nopk"): [
*idx(
"q",
name="noncol_idx_nopk",
column_sorting={"q": ("desc",)},
)
],
(schema, "noncol_idx_test_pk"): [
*idx(
"q", name="noncol_idx_pk", column_sorting={"q": ("desc",)}
)
],
(schema, self.temp_table_name()): [
*idx("foo", name="user_tmp_ix"),
*idx(
"name",
name=f"user_tmp_uq_{config.ident}",
duplicates=True,
unique=True,
),
],
}
if (
not testing.requires.indexes_with_ascdesc.enabled
or not testing.requires.reflect_indexes_with_ascdesc.enabled
):
tables[(schema, "noncol_idx_test_nopk")].clear()
tables[(schema, "noncol_idx_test_pk")].clear()
res = self._resolve_kind(kind, tables, views, materialized)
res = self._resolve_names(schema, scope, filter_names, res)
return res
@property
def _required_index_keys(self):
return {"name", "column_names", "unique"}
def exp_ucs(
self,
schema=None,
scope=ObjectScope.ANY,
kind=ObjectKind.ANY,
filter_names=None,
all_=False,
):
def uc(
*cols, name, duplicates_index=None, is_index=False, comment=None
):
req = testing.requires.unique_index_reflect_as_unique_constraints
if is_index and not req.enabled:
return ()
res = {
"column_names": list(cols),
"name": name,
"comment": comment,
}
if duplicates_index:
res["duplicates_index"] = duplicates_index
return [res]
materialized = {(schema, "dingalings_v"): []}
views = {
(schema, "email_addresses_v"): [],
(schema, "users_v"): [],
(schema, "user_tmp_v"): [],
}
self._resolve_views(views, materialized)
tables = {
(schema, "users"): [
*uc(
"test1",
"test2",
name="users_t_idx",
duplicates_index="users_t_idx",
is_index=True,
)
],
(schema, "dingalings"): [
*uc("data", name=mock.ANY, duplicates_index=mock.ANY),
*uc(
"address_id",
"dingaling_id",
name="zz_dingalings_multiple",
duplicates_index="zz_dingalings_multiple",
comment="di unique comment",
),
],
(schema, "email_addresses"): [],
(schema, "comment_test"): [],
(schema, "no_constraints"): [],
(schema, "local_table"): [],
(schema, "remote_table"): [],
(schema, "remote_table_2"): [],
(schema, "noncol_idx_test_nopk"): [],
(schema, "noncol_idx_test_pk"): [],
(schema, self.temp_table_name()): [
*uc("name", name=f"user_tmp_uq_{config.ident}")
],
}
if all_:
return {**materialized, **views, **tables}
else:
res = self._resolve_kind(kind, tables, views, materialized)
res = self._resolve_names(schema, scope, filter_names, res)
return res
@property
def _required_unique_cst_keys(self):
return {"name", "column_names"}
def exp_ccs(
self,
schema=None,
scope=ObjectScope.ANY,
kind=ObjectKind.ANY,
filter_names=None,
):
class tt(str):
def __eq__(self, other):
res = (
other.lower()
.replace("(", "")
.replace(")", "")
.replace("`", "")
)
return self in res
def cc(text, name, comment=None):
return {"sqltext": tt(text), "name": name, "comment": comment}
# print({1: "test2 > (0)::double precision"} == {1: tt("test2 > 0")})
# assert 0
materialized = {(schema, "dingalings_v"): []}
views = {
(schema, "email_addresses_v"): [],
(schema, "users_v"): [],
(schema, "user_tmp_v"): [],
}
self._resolve_views(views, materialized)
tables = {
(schema, "users"): [
cc("test2 <= 1000", mock.ANY),
cc(
"test2 > 0",
"zz_test2_gt_zero",
comment="users check constraint",
),
],
(schema, "dingalings"): [
cc(
"address_id > 0 and address_id < 1000",
name="address_id_gt_zero",
),
],
(schema, "email_addresses"): [],
(schema, "comment_test"): [],
(schema, "no_constraints"): [],
(schema, "local_table"): [],
(schema, "remote_table"): [],
(schema, "remote_table_2"): [],
(schema, "noncol_idx_test_nopk"): [],
(schema, "noncol_idx_test_pk"): [],
(schema, self.temp_table_name()): [],
}
res = self._resolve_kind(kind, tables, views, materialized)
res = self._resolve_names(schema, scope, filter_names, res)
return res
@property
def _required_cc_keys(self):
return {"name", "sqltext"}
@testing.requires.schema_reflection
def test_get_schema_names(self, connection):
insp = inspect(connection)
is_true(testing.config.test_schema in insp.get_schema_names())
@testing.requires.schema_reflection
def test_has_schema(self, connection):
insp = inspect(connection)
is_true(insp.has_schema(testing.config.test_schema))
is_false(insp.has_schema("sa_fake_schema_foo"))
@testing.requires.schema_reflection
def test_get_schema_names_w_translate_map(self, connection):
"""test #7300"""
connection = connection.execution_options(
schema_translate_map={
"foo": "bar",
BLANK_SCHEMA: testing.config.test_schema,
}
)
insp = inspect(connection)
is_true(testing.config.test_schema in insp.get_schema_names())
@testing.requires.schema_reflection
def test_has_schema_w_translate_map(self, connection):
connection = connection.execution_options(
schema_translate_map={
"foo": "bar",
BLANK_SCHEMA: testing.config.test_schema,
}
)
insp = inspect(connection)
is_true(insp.has_schema(testing.config.test_schema))
is_false(insp.has_schema("sa_fake_schema_foo"))
@testing.requires.schema_reflection
@testing.requires.schema_create_delete
def test_schema_cache(self, connection):
insp = inspect(connection)
is_false("foo_bar" in insp.get_schema_names())
is_false(insp.has_schema("foo_bar"))
connection.execute(DDL("CREATE SCHEMA foo_bar"))
try:
is_false("foo_bar" in insp.get_schema_names())
is_false(insp.has_schema("foo_bar"))
insp.clear_cache()
is_true("foo_bar" in insp.get_schema_names())
is_true(insp.has_schema("foo_bar"))
finally:
connection.execute(DDL("DROP SCHEMA foo_bar"))
@testing.requires.schema_reflection
def test_dialect_initialize(self):
engine = engines.testing_engine()
inspect(engine)
assert hasattr(engine.dialect, "default_schema_name")
@testing.requires.schema_reflection
def test_get_default_schema_name(self, connection):
insp = inspect(connection)
eq_(insp.default_schema_name, connection.dialect.default_schema_name)
@testing.combinations(
None,
("foreign_key", testing.requires.foreign_key_constraint_reflection),
argnames="order_by",
)
@testing.combinations(
(True, testing.requires.schemas), False, argnames="use_schema"
)
def test_get_table_names(self, connection, order_by, use_schema):
if use_schema:
schema = config.test_schema
else:
schema = None
_ignore_tables = {
"comment_test",
"noncol_idx_test_pk",
"noncol_idx_test_nopk",
"local_table",
"remote_table",
"remote_table_2",
"no_constraints",
}
insp = inspect(connection)
if order_by:
tables = [
rec[0]
for rec in insp.get_sorted_table_and_fkc_names(schema)
if rec[0]
]
else:
tables = insp.get_table_names(schema)
table_names = [t for t in tables if t not in _ignore_tables]
if order_by == "foreign_key":
answer = ["users", "email_addresses", "dingalings"]
eq_(table_names, answer)
else:
answer = ["dingalings", "email_addresses", "users"]
eq_(sorted(table_names), answer)
@testing.combinations(
(True, testing.requires.schemas), False, argnames="use_schema"
)
def test_get_view_names(self, connection, use_schema):
insp = inspect(connection)
if use_schema:
schema = config.test_schema
else:
schema = None
table_names = insp.get_view_names(schema)
if testing.requires.materialized_views.enabled:
eq_(sorted(table_names), ["email_addresses_v", "users_v"])
eq_(insp.get_materialized_view_names(schema), ["dingalings_v"])
else:
answer = ["dingalings_v", "email_addresses_v", "users_v"]
eq_(sorted(table_names), answer)
@testing.requires.temp_table_names
def test_get_temp_table_names(self, connection):
insp = inspect(connection)
temp_table_names = insp.get_temp_table_names()
eq_(sorted(temp_table_names), [f"user_tmp_{config.ident}"])
@testing.requires.view_reflection
@testing.requires.temporary_views
def test_get_temp_view_names(self, connection):
insp = inspect(connection)
temp_table_names = insp.get_temp_view_names()
eq_(sorted(temp_table_names), ["user_tmp_v"])
@testing.requires.comment_reflection
def test_get_comments(self, connection):
self._test_get_comments(connection)
@testing.requires.comment_reflection
@testing.requires.schemas
def test_get_comments_with_schema(self, connection):
self._test_get_comments(connection, testing.config.test_schema)
def _test_get_comments(self, connection, schema=None):
insp = inspect(connection)
exp = self.exp_comments(schema=schema)
eq_(
insp.get_table_comment("comment_test", schema=schema),
exp[(schema, "comment_test")],
)
eq_(
insp.get_table_comment("users", schema=schema),
exp[(schema, "users")],
)
eq_(
insp.get_table_comment("comment_test", schema=schema),
exp[(schema, "comment_test")],
)
no_cst = self.tables.no_constraints.name
eq_(
insp.get_table_comment(no_cst, schema=schema),
exp[(schema, no_cst)],
)
@testing.combinations(
(False, False),
(False, True, testing.requires.schemas),
(True, False, testing.requires.view_reflection),
(
True,
True,
testing.requires.schemas + testing.requires.view_reflection,
),
argnames="use_views,use_schema",
)
def test_get_columns(self, connection, use_views, use_schema):
if use_schema:
schema = config.test_schema
else:
schema = None
users, addresses = (self.tables.users, self.tables.email_addresses)
if use_views:
table_names = ["users_v", "email_addresses_v", "dingalings_v"]
else:
table_names = ["users", "email_addresses"]
insp = inspect(connection)
for table_name, table in zip(table_names, (users, addresses)):
schema_name = schema
cols = insp.get_columns(table_name, schema=schema_name)
is_true(len(cols) > 0, len(cols))
# should be in order
for i, col in enumerate(table.columns):
eq_(col.name, cols[i]["name"])
ctype = cols[i]["type"].__class__
ctype_def = col.type
if isinstance(ctype_def, sa.types.TypeEngine):
ctype_def = ctype_def.__class__
# Oracle returns Date for DateTime.
if testing.against("oracle") and ctype_def in (
sql_types.Date,
sql_types.DateTime,
):
ctype_def = sql_types.Date
# assert that the desired type and return type share
# a base within one of the generic types.
is_true(
len(
set(ctype.__mro__)
.intersection(ctype_def.__mro__)
.intersection(
[
sql_types.Integer,
sql_types.Numeric,
sql_types.DateTime,
sql_types.Date,
sql_types.Time,
sql_types.String,
sql_types._Binary,
]
)
)
> 0,
"%s(%s), %s(%s)"
% (col.name, col.type, cols[i]["name"], ctype),
)
if not col.primary_key:
assert cols[i]["default"] is None
# The case of a table with no column
# is tested below in TableNoColumnsTest
@testing.requires.temp_table_reflection
def test_reflect_table_temp_table(self, connection):
table_name = self.temp_table_name()
user_tmp = self.tables[table_name]
reflected_user_tmp = Table(
table_name, MetaData(), autoload_with=connection
)
self.assert_tables_equal(
user_tmp, reflected_user_tmp, strict_constraints=False
)
@testing.requires.temp_table_reflection
def test_get_temp_table_columns(self, connection):
table_name = self.temp_table_name()
user_tmp = self.tables[table_name]
insp = inspect(connection)
cols = insp.get_columns(table_name)
is_true(len(cols) > 0, len(cols))
for i, col in enumerate(user_tmp.columns):
eq_(col.name, cols[i]["name"])
@testing.requires.temp_table_reflection
@testing.requires.view_column_reflection
@testing.requires.temporary_views
def test_get_temp_view_columns(self, connection):
insp = inspect(connection)
cols = insp.get_columns("user_tmp_v")
eq_([col["name"] for col in cols], ["id", "name", "foo"])
@testing.combinations(
(False,), (True, testing.requires.schemas), argnames="use_schema"
)
@testing.requires.primary_key_constraint_reflection
def test_get_pk_constraint(self, connection, use_schema):
if use_schema:
schema = testing.config.test_schema
else:
schema = None
users, addresses = self.tables.users, self.tables.email_addresses
insp = inspect(connection)
exp = self.exp_pks(schema=schema)
users_cons = insp.get_pk_constraint(users.name, schema=schema)
self._check_list(
[users_cons], [exp[(schema, users.name)]], self._required_pk_keys
)
addr_cons = insp.get_pk_constraint(addresses.name, schema=schema)
exp_cols = exp[(schema, addresses.name)]["constrained_columns"]
eq_(addr_cons["constrained_columns"], exp_cols)
with testing.requires.reflects_pk_names.fail_if():
eq_(addr_cons["name"], "email_ad_pk")
no_cst = self.tables.no_constraints.name
self._check_list(
[insp.get_pk_constraint(no_cst, schema=schema)],
[exp[(schema, no_cst)]],
self._required_pk_keys,
)
@testing.combinations(
"PK_test_table",
"pk_test_table",
"mixedCasePK",
"pk.with.dots",
argnames="pk_name",
)
@testing.requires.primary_key_constraint_reflection
@testing.requires.reflects_pk_names
def test_get_pk_constraint_quoted_name(
self, connection, metadata, pk_name
):
"""Test that primary key constraint names with various casing are
properly reflected."""
Table(
"test_table",
metadata,
Column("id", Integer),
Column("data", String(50)),
sa.PrimaryKeyConstraint("id", name=pk_name),
)
metadata.create_all(connection)
insp = inspect(connection)
pk_cons = insp.get_pk_constraint("test_table")
eq_(pk_cons["name"], pk_name)
eq_(pk_cons["constrained_columns"], ["id"])
@testing.combinations(
(False,), (True, testing.requires.schemas), argnames="use_schema"
)
@testing.requires.foreign_key_constraint_reflection
def test_get_foreign_keys(self, connection, use_schema):
if use_schema:
schema = config.test_schema
else:
schema = None
users, addresses = (self.tables.users, self.tables.email_addresses)
insp = inspect(connection)
expected_schema = schema
# users
if testing.requires.self_referential_foreign_keys.enabled:
users_fkeys = insp.get_foreign_keys(users.name, schema=schema)
fkey1 = users_fkeys[0]
with testing.requires.named_constraints.fail_if():
eq_(fkey1["name"], "user_id_fk")
eq_(fkey1["referred_schema"], expected_schema)
eq_(fkey1["referred_table"], users.name)
eq_(fkey1["referred_columns"], ["user_id"])
eq_(fkey1["constrained_columns"], ["parent_user_id"])
# addresses
addr_fkeys = insp.get_foreign_keys(addresses.name, schema=schema)
fkey1 = addr_fkeys[0]
with testing.requires.implicitly_named_constraints.fail_if():
is_true(fkey1["name"] is not None)
eq_(fkey1["referred_schema"], expected_schema)
eq_(fkey1["referred_table"], users.name)
eq_(fkey1["referred_columns"], ["user_id"])
eq_(fkey1["constrained_columns"], ["remote_user_id"])
no_cst = self.tables.no_constraints.name
eq_(insp.get_foreign_keys(no_cst, schema=schema), [])
@testing.combinations(
"FK_users_id",
"fk_users_id",
"mixedCaseName",
"fk.with.dots",
argnames="fk_name",
)
@testing.requires.foreign_key_constraint_reflection
def test_get_foreign_keys_quoted_name(self, connection, metadata, fk_name):
"""Test that foreign key constraint names with various casing are
properly reflected."""
Table(
"users_ref",
metadata,
Column("user_id", Integer, primary_key=True),
test_needs_fk=True,
)
Table(
"user_orders",
metadata,
Column("order_id", Integer, primary_key=True),
Column("user_id", Integer),
sa.ForeignKeyConstraint(
["user_id"],
["users_ref.user_id"],
name=fk_name,
),
test_needs_fk=True,
)
metadata.create_all(connection)
insp = inspect(connection)
fkeys = insp.get_foreign_keys("user_orders")
eq_(len(fkeys), 1)
fkey = fkeys[0]
with testing.requires.named_constraints.fail_if():
eq_(fkey["name"], fk_name)
eq_(fkey["referred_table"], "users_ref")
eq_(fkey["referred_columns"], ["user_id"])
eq_(fkey["constrained_columns"], ["user_id"])
@testing.requires.cross_schema_fk_reflection
@testing.requires.schemas
def test_get_inter_schema_foreign_keys(self, connection):
local_table, remote_table, remote_table_2 = self.tables(
"%s.local_table" % connection.dialect.default_schema_name,
"%s.remote_table" % testing.config.test_schema,
"%s.remote_table_2" % testing.config.test_schema,
)
insp = inspect(connection)
local_fkeys = insp.get_foreign_keys(local_table.name)
eq_(len(local_fkeys), 1)
fkey1 = local_fkeys[0]
eq_(fkey1["referred_schema"], testing.config.test_schema)
eq_(fkey1["referred_table"], remote_table_2.name)
eq_(fkey1["referred_columns"], ["id"])
eq_(fkey1["constrained_columns"], ["remote_id"])
remote_fkeys = insp.get_foreign_keys(
remote_table.name, schema=testing.config.test_schema
)
eq_(len(remote_fkeys), 1)
fkey2 = remote_fkeys[0]
is_true(
fkey2["referred_schema"]
in (
None,
connection.dialect.default_schema_name,
)
)
eq_(fkey2["referred_table"], local_table.name)
eq_(fkey2["referred_columns"], ["id"])
eq_(fkey2["constrained_columns"], ["local_id"])
@testing.combinations(
(False,), (True, testing.requires.schemas), argnames="use_schema"
)
@testing.requires.index_reflection
def test_get_indexes(self, connection, use_schema):
if use_schema:
schema = config.test_schema
else:
schema = None
# The database may decide to create indexes for foreign keys, etc.
# so there may be more indexes than expected.
insp = inspect(connection)
indexes = insp.get_indexes("users", schema=schema)
exp = self.exp_indexes(schema=schema)
self._check_list(
indexes, exp[(schema, "users")], self._required_index_keys
)
no_cst = self.tables.no_constraints.name
self._check_list(
insp.get_indexes(no_cst, schema=schema),
exp[(schema, no_cst)],
self._required_index_keys,
)
@testing.combinations(
("noncol_idx_test_nopk", "noncol_idx_nopk"),
("noncol_idx_test_pk", "noncol_idx_pk"),
argnames="tname,ixname",
)
@testing.requires.index_reflection
@testing.requires.indexes_with_ascdesc
@testing.requires.reflect_indexes_with_ascdesc
def test_get_noncol_index(self, connection, tname, ixname):
insp = inspect(connection)
indexes = insp.get_indexes(tname)
# reflecting an index that has "x DESC" in it as the column.
# the DB may or may not give us "x", but make sure we get the index
# back, it has a name, it's connected to the table.
expected_indexes = self.exp_indexes()[(None, tname)]
self._check_list(indexes, expected_indexes, self._required_index_keys)
t = Table(tname, MetaData(), autoload_with=connection)
eq_(len(t.indexes), 1)
is_(list(t.indexes)[0].table, t)
eq_(list(t.indexes)[0].name, ixname)
@testing.combinations(
"IX_test_data",
"ix_test_data",
"mixedCaseIndex",
"ix.with.dots",
argnames="idx_name",
)
@testing.requires.index_reflection
def test_get_indexes_quoted_name(self, connection, metadata, idx_name):
"""Test that index names with various casing are properly reflected."""
t = Table(
"test_table",
metadata,
Column("id", Integer, primary_key=True),
Column("data", String(50)),
)
Index(idx_name, t.c.data)
metadata.create_all(connection)
insp = inspect(connection)
indexes = insp.get_indexes("test_table")
index_names = [idx["name"] for idx in indexes]
assert idx_name in index_names, f"Expected {idx_name} in {index_names}"
# Find the specific index
matching_idx = [idx for idx in indexes if idx["name"] == idx_name]
eq_(len(matching_idx), 1)
eq_(matching_idx[0]["column_names"], ["data"])
@testing.requires.temp_table_reflection
@testing.requires.unique_constraint_reflection
def test_get_temp_table_unique_constraints(self, connection):
insp = inspect(connection)
name = self.temp_table_name()
reflected = insp.get_unique_constraints(name)
exp = self.exp_ucs(all_=True)[(None, name)]
self._check_list(reflected, exp, self._required_index_keys)
@testing.requires.temp_table_reflect_indexes
def test_get_temp_table_indexes(self, connection):
insp = inspect(connection)
table_name = self.temp_table_name()
indexes = insp.get_indexes(table_name)
for ind in indexes:
ind.pop("dialect_options", None)
expected = [
{"unique": False, "column_names": ["foo"], "name": "user_tmp_ix"}
]
if testing.requires.index_reflects_included_columns.enabled:
expected[0]["include_columns"] = []
eq_(
[idx for idx in indexes if idx["name"] == "user_tmp_ix"],
expected,
)
@testing.combinations(
(True, testing.requires.schemas), (False,), argnames="use_schema"
)
@testing.requires.unique_constraint_reflection
def test_get_unique_constraints(self, metadata, connection, use_schema):
# SQLite dialect needs to parse the names of the constraints
# separately from what it gets from PRAGMA index_list(), and
# then matches them up. so same set of column_names in two
# constraints will confuse it. Perhaps we should no longer
# bother with index_list() here since we have the whole
# CREATE TABLE?
if use_schema:
schema = config.test_schema
else:
schema = None
uniques = sorted(
[
{"name": "unique_a", "column_names": ["a"]},
{"name": "unique_a_b_c", "column_names": ["a", "b", "c"]},
{"name": "unique_c_a_b", "column_names": ["c", "a", "b"]},
{"name": "unique_asc_key", "column_names": ["asc", "key"]},
{"name": "i.have.dots", "column_names": ["b"]},
{"name": "i have spaces", "column_names": ["c"]},
],
key=operator.itemgetter("name"),
)
table = Table(
"testtbl",
metadata,
Column("a", sa.String(20)),
Column("b", sa.String(30)),
Column("c", sa.Integer),
# reserved identifiers
Column("asc", sa.String(30)),
Column("key", sa.String(30)),
schema=schema,
)
for uc in uniques:
table.append_constraint(
sa.UniqueConstraint(*uc["column_names"], name=uc["name"])
)
table.create(connection)
insp = inspect(connection)
reflected = sorted(
insp.get_unique_constraints("testtbl", schema=schema),
key=operator.itemgetter("name"),
)
names_that_duplicate_index = set()
eq_(len(uniques), len(reflected))
for orig, refl in zip(uniques, reflected):
# Different dialects handle duplicate index and constraints
# differently, so ignore this flag
dupe = refl.pop("duplicates_index", None)
if dupe:
names_that_duplicate_index.add(dupe)
eq_(refl.pop("comment", None), None)
# ignore dialect_options
refl.pop("dialect_options", None)
eq_(orig, refl)
reflected_metadata = MetaData()
reflected = Table(
"testtbl",
reflected_metadata,
autoload_with=connection,
schema=schema,
)
# test "deduplicates for index" logic. MySQL and Oracle
# "unique constraints" are actually unique indexes (with possible
# exception of a unique that is a dupe of another one in the case
# of Oracle). make sure # they aren't duplicated.
idx_names = {idx.name for idx in reflected.indexes}
uq_names = {
uq.name
for uq in reflected.constraints
if isinstance(uq, sa.UniqueConstraint)
}.difference(["unique_c_a_b"])
assert not idx_names.intersection(uq_names)
if names_that_duplicate_index:
eq_(names_that_duplicate_index, idx_names)
eq_(uq_names, set())
no_cst = self.tables.no_constraints.name
eq_(insp.get_unique_constraints(no_cst, schema=schema), [])
@testing.combinations(
"UQ_email",
"uq_email",
"mixedCaseUQ",
"uq.with.dots",
argnames="uq_name",
)
@testing.requires.unique_constraint_reflection
def test_get_unique_constraints_quoted_name(
self, connection, metadata, uq_name
):
"""Test that unique constraint names with various casing are
properly reflected."""
Table(
"test_table",
metadata,
Column("id", Integer, primary_key=True),
Column("email", String(50)),
sa.UniqueConstraint("email", name=uq_name),
)
metadata.create_all(connection)
insp = inspect(connection)
uq_cons = insp.get_unique_constraints("test_table")
eq_(len(uq_cons), 1)
eq_(uq_cons[0]["name"], uq_name)
eq_(uq_cons[0]["column_names"], ["email"])
@testing.requires.view_reflection
@testing.combinations(
(False,), (True, testing.requires.schemas), argnames="use_schema"
)
def test_get_view_definition(self, connection, use_schema):
if use_schema:
schema = config.test_schema
else:
schema = None
insp = inspect(connection)
for view in ["users_v", "email_addresses_v", "dingalings_v"]:
v = insp.get_view_definition(view, schema=schema)
is_true(bool(v))
@testing.requires.view_reflection
def test_get_view_definition_does_not_exist(self, connection):
insp = inspect(connection)
with expect_raises(NoSuchTableError):
insp.get_view_definition("view_does_not_exist")
with expect_raises(NoSuchTableError):
insp.get_view_definition("users") # a table
@testing.requires.table_reflection
def test_autoincrement_col(self, connection):
"""test that 'autoincrement' is reflected according to sqla's policy.
Don't mark this test as unsupported for any backend !
(technically it fails with MySQL InnoDB since "id" comes before "id2")
A backend is better off not returning "autoincrement" at all,
instead of potentially returning "False" for an auto-incrementing
primary key column.
"""
insp = inspect(connection)
for tname, cname in [
("users", "user_id"),
("email_addresses", "address_id"),
("dingalings", "dingaling_id"),
]:
cols = insp.get_columns(tname)
id_ = {c["name"]: c for c in cols}[cname]
assert id_.get("autoincrement", True)
@testing.combinations(
(True, testing.requires.schemas), (False,), argnames="use_schema"
)
def test_get_table_options(self, use_schema):
insp = inspect(config.db)
schema = config.test_schema if use_schema else None
if testing.requires.reflect_table_options.enabled:
res = insp.get_table_options("users", schema=schema)
is_true(isinstance(res, dict))
# NOTE: can't really create a table with no option
res = insp.get_table_options("no_constraints", schema=schema)
is_true(isinstance(res, dict))
else:
with expect_raises(NotImplementedError):
insp.get_table_options("users", schema=schema)
@testing.combinations((True, testing.requires.schemas), False)
def test_multi_get_table_options(self, use_schema):
insp = inspect(config.db)
if testing.requires.reflect_table_options.enabled:
schema = config.test_schema if use_schema else None
res = insp.get_multi_table_options(schema=schema)
exp = {
(schema, table): insp.get_table_options(table, schema=schema)
for table in insp.get_table_names(schema=schema)
}
eq_(res, exp)
else:
with expect_raises(NotImplementedError):
insp.get_multi_table_options()
@testing.fixture
def get_multi_exp(self, connection):
def provide_fixture(
schema, scope, kind, use_filter, single_reflect_fn, exp_method
):
insp = inspect(connection)
# call the reflection function at least once to avoid
# "Unexpected success" errors if the result is actually empty
# and NotImplementedError is not raised
single_reflect_fn(insp, "email_addresses")
kw = {"scope": scope, "kind": kind}
if schema:
schema = schema()
filter_names = []
if ObjectKind.TABLE in kind:
filter_names.extend(
["comment_test", "users", "does-not-exist"]
)
if ObjectKind.VIEW in kind:
filter_names.extend(["email_addresses_v", "does-not-exist"])
if ObjectKind.MATERIALIZED_VIEW in kind:
filter_names.extend(["dingalings_v", "does-not-exist"])
if schema:
kw["schema"] = schema
if use_filter:
kw["filter_names"] = filter_names
exp = exp_method(
schema=schema,
scope=scope,
kind=kind,
filter_names=kw.get("filter_names"),
)
kws = [kw]
if scope == ObjectScope.DEFAULT:
nkw = kw.copy()
nkw.pop("scope")
kws.append(nkw)
if kind == ObjectKind.TABLE:
nkw = kw.copy()
nkw.pop("kind")
kws.append(nkw)
return inspect(connection), kws, exp
return provide_fixture
@testing.requires.reflect_table_options
@_multi_combination
def test_multi_get_table_options_tables(
self, get_multi_exp, schema, scope, kind, use_filter
):
insp, kws, exp = get_multi_exp(
schema,
scope,
kind,
use_filter,
Inspector.get_table_options,
self.exp_options,
)
for kw in kws:
insp.clear_cache()
result = insp.get_multi_table_options(**kw)
eq_(result, exp)
@testing.requires.comment_reflection
@_multi_combination
def test_get_multi_table_comment(
self, get_multi_exp, schema, scope, kind, use_filter
):
insp, kws, exp = get_multi_exp(
schema,
scope,
kind,
use_filter,
Inspector.get_table_comment,
self.exp_comments,
)
for kw in kws:
insp.clear_cache()
eq_(insp.get_multi_table_comment(**kw), exp)
def _check_expressions(self, result, exp, err_msg):
def _clean(text: str):
return re.sub(r"['\" ]", "", text).lower()
if isinstance(exp, dict):
eq_({_clean(e): v for e, v in result.items()}, exp, err_msg)
else:
eq_([_clean(e) for e in result], exp, err_msg)
def _check_list(self, result, exp, req_keys=None, msg=None):
if req_keys is None:
eq_(result, exp, msg)
else:
eq_(len(result), len(exp), msg)
for r, e in zip(result, exp):
for k in set(r) | set(e):
if k in req_keys or (k in r and k in e):
err_msg = f"{msg} - {k} - {r}"
if k in ("expressions", "column_sorting"):
self._check_expressions(r[k], e[k], err_msg)
else:
eq_(r[k], e[k], err_msg)
def _check_table_dict(self, result, exp, req_keys=None, make_lists=False):
eq_(set(result.keys()), set(exp.keys()))
for k in result:
r, e = result[k], exp[k]
if make_lists:
r, e = [r], [e]
self._check_list(r, e, req_keys, k)
@_multi_combination
def test_get_multi_columns(
self, get_multi_exp, schema, scope, kind, use_filter
):
insp, kws, exp = get_multi_exp(
schema,
scope,
kind,
use_filter,
Inspector.get_columns,
self.exp_columns,
)
for kw in kws:
insp.clear_cache()
result = insp.get_multi_columns(**kw)
self._check_table_dict(result, exp, self._required_column_keys)
@testing.requires.primary_key_constraint_reflection
@_multi_combination
def test_get_multi_pk_constraint(
self, get_multi_exp, schema, scope, kind, use_filter
):
insp, kws, exp = get_multi_exp(
schema,
scope,
kind,
use_filter,
Inspector.get_pk_constraint,
self.exp_pks,
)
for kw in kws:
insp.clear_cache()
result = insp.get_multi_pk_constraint(**kw)
self._check_table_dict(
result, exp, self._required_pk_keys, make_lists=True
)
def _adjust_sort(self, result, expected, key):
if not testing.requires.implicitly_named_constraints.enabled:
for obj in [result, expected]:
for val in obj.values():
if len(val) > 1 and any(
v.get("name") in (None, mock.ANY) for v in val
):
val.sort(key=key)
@testing.requires.foreign_key_constraint_reflection
@_multi_combination
def test_get_multi_foreign_keys(
self, get_multi_exp, schema, scope, kind, use_filter
):
insp, kws, exp = get_multi_exp(
schema,
scope,
kind,
use_filter,
Inspector.get_foreign_keys,
self.exp_fks,
)
for kw in kws:
insp.clear_cache()
result = insp.get_multi_foreign_keys(**kw)
self._adjust_sort(
result, exp, lambda d: tuple(d["constrained_columns"])
)
self._check_table_dict(result, exp, self._required_fk_keys)
@testing.requires.index_reflection
@_multi_combination
def test_get_multi_indexes(
self, get_multi_exp, schema, scope, kind, use_filter
):
insp, kws, exp = get_multi_exp(
schema,
scope,
kind,
use_filter,
Inspector.get_indexes,
self.exp_indexes,
)
for kw in kws:
insp.clear_cache()
result = insp.get_multi_indexes(**kw)
self._check_table_dict(result, exp, self._required_index_keys)
@testing.requires.unique_constraint_reflection
@_multi_combination
def test_get_multi_unique_constraints(
self, get_multi_exp, schema, scope, kind, use_filter
):
insp, kws, exp = get_multi_exp(
schema,
scope,
kind,
use_filter,
Inspector.get_unique_constraints,
self.exp_ucs,
)
for kw in kws:
insp.clear_cache()
result = insp.get_multi_unique_constraints(**kw)
self._adjust_sort(result, exp, lambda d: tuple(d["column_names"]))
self._check_table_dict(result, exp, self._required_unique_cst_keys)
@testing.requires.check_constraint_reflection
@_multi_combination
def test_get_multi_check_constraints(
self, get_multi_exp, schema, scope, kind, use_filter
):
insp, kws, exp = get_multi_exp(
schema,
scope,
kind,
use_filter,
Inspector.get_check_constraints,
self.exp_ccs,
)
for kw in kws:
insp.clear_cache()
result = insp.get_multi_check_constraints(**kw)
self._adjust_sort(result, exp, lambda d: tuple(d["sqltext"]))
self._check_table_dict(result, exp, self._required_cc_keys)
@testing.combinations(
("get_table_options", testing.requires.reflect_table_options),
"get_columns",
(
"get_pk_constraint",
testing.requires.primary_key_constraint_reflection,
),
(
"get_foreign_keys",
testing.requires.foreign_key_constraint_reflection,
),
("get_indexes", testing.requires.index_reflection),
(
"get_unique_constraints",
testing.requires.unique_constraint_reflection,
),
(
"get_check_constraints",
testing.requires.check_constraint_reflection,
),
("get_table_comment", testing.requires.comment_reflection),
argnames="method",
)
def test_not_existing_table(self, method, connection):
insp = inspect(connection)
meth = getattr(insp, method)
with expect_raises(NoSuchTableError):
meth("table_does_not_exists")
def test_unreflectable(self, connection):
mc = Inspector.get_multi_columns
def patched(*a, **k):
ur = k.setdefault("unreflectable", {})
ur[(None, "some_table")] = UnreflectableTableError("err")
return mc(*a, **k)
with mock.patch.object(Inspector, "get_multi_columns", patched):
with expect_raises_message(UnreflectableTableError, "err"):
inspect(connection).reflect_table(
Table("some_table", MetaData()), None
)
@testing.combinations(True, False, argnames="use_schema")
@testing.combinations(
(True, testing.requires.views), False, argnames="views"
)
def test_metadata(self, connection, use_schema, views):
m = MetaData()
schema = config.test_schema if use_schema else None
m.reflect(connection, schema=schema, views=views, resolve_fks=False)
insp = inspect(connection)
tables = insp.get_table_names(schema)
if views:
tables += insp.get_view_names(schema)
try:
tables += insp.get_materialized_view_names(schema)
except NotImplementedError:
pass
if schema:
tables = [f"{schema}.{t}" for t in tables]
eq_(sorted(m.tables), sorted(tables))
@testing.requires.comment_reflection
def test_comments_unicode(self, connection, metadata):
Table(
"unicode_comments",
metadata,
Column("unicode", Integer, comment="é試蛇ẟΩ"),
Column("emoji", Integer, comment="☁️✨"),
comment="試蛇ẟΩ✨",
)
metadata.create_all(connection)
insp = inspect(connection)
tc = insp.get_table_comment("unicode_comments")
eq_(tc, {"text": "試蛇ẟΩ✨"})
cols = insp.get_columns("unicode_comments")
value = {c["name"]: c["comment"] for c in cols}
exp = {"unicode": "é試蛇ẟΩ", "emoji": "☁️✨"}
eq_(value, exp)
@testing.requires.comment_reflection_full_unicode
def test_comments_unicode_full(self, connection, metadata):
Table(
"unicode_comments",
metadata,
Column("emoji", Integer, comment="🐍🧙🝝🧙‍♂️🧙‍♀️"),
comment="🎩🁰🝑🤷‍♀️🤷‍♂️",
)
metadata.create_all(connection)
insp = inspect(connection)
tc = insp.get_table_comment("unicode_comments")
eq_(tc, {"text": "🎩🁰🝑🤷‍♀️🤷‍♂️"})
c = insp.get_columns("unicode_comments")[0]
eq_({c["name"]: c["comment"]}, {"emoji": "🐍🧙🝝🧙‍♂️🧙‍♀️"})
@testing.requires.column_collation_reflection
@testing.requires.order_by_collation
def test_column_collation_reflection(self, connection, metadata):
collation = testing.requires.get_order_by_collation(config)
Table(
"t",
metadata,
Column("collated", sa.String(collation=collation)),
Column("not_collated", sa.String()),
)
metadata.create_all(connection)
m2 = MetaData()
t2 = Table("t", m2, autoload_with=connection)
eq_(t2.c.collated.type.collation, collation)
is_none(t2.c.not_collated.type.collation)
insp = inspect(connection)
collated, not_collated = insp.get_columns("t")
eq_(collated["type"].collation, collation)
is_none(not_collated["type"].collation)
class TableNoColumnsTest(fixtures.TestBase):
__requires__ = ("reflect_tables_no_columns",)
__sparse_driver_backend__ = True
@testing.fixture
def table_no_columns(self, connection, metadata):
Table("empty", metadata)
metadata.create_all(connection)
@testing.fixture
def view_no_columns(self, connection, metadata):
Table("empty", metadata)
event.listen(
metadata,
"after_create",
DDL("CREATE VIEW empty_v AS SELECT * FROM empty"),
)
# for transactional DDL the transaction is rolled back before this
# drop statement is invoked
event.listen(
metadata, "before_drop", DDL("DROP VIEW IF EXISTS empty_v")
)
metadata.create_all(connection)
def test_reflect_table_no_columns(self, connection, table_no_columns):
t2 = Table("empty", MetaData(), autoload_with=connection)
eq_(list(t2.c), [])
def test_get_columns_table_no_columns(self, connection, table_no_columns):
insp = inspect(connection)
eq_(insp.get_columns("empty"), [])
multi = insp.get_multi_columns()
eq_(multi, {(None, "empty"): []})
def test_reflect_incl_table_no_columns(self, connection, table_no_columns):
m = MetaData()
m.reflect(connection)
assert set(m.tables).intersection(["empty"])
@testing.requires.views
def test_reflect_view_no_columns(self, connection, view_no_columns):
t2 = Table("empty_v", MetaData(), autoload_with=connection)
eq_(list(t2.c), [])
@testing.requires.views
def test_get_columns_view_no_columns(self, connection, view_no_columns):
insp = inspect(connection)
eq_(insp.get_columns("empty_v"), [])
multi = insp.get_multi_columns(kind=ObjectKind.VIEW)
eq_(multi, {(None, "empty_v"): []})
class ComponentReflectionTestExtra(ComparesIndexes, fixtures.TestBase):
__sparse_driver_backend__ = True
@testing.fixture(params=[True, False])
def use_schema_fixture(self, request):
if request.param:
return config.test_schema
else:
return None
@testing.fixture()
def inspect_for_table(self, metadata, connection, use_schema_fixture):
@contextlib.contextmanager
def go(tablename):
yield use_schema_fixture, inspect(connection)
metadata.create_all(connection)
return go
def ck_eq(self, reflected, expected):
# trying to minimize effect of quoting, parenthesis, etc.
# may need to add more to this as new dialects get CHECK
# constraint reflection support
def normalize(sqltext):
return " ".join(
re.findall(r"and|\d|=|a|b|c|or|<|>", sqltext.lower(), re.I)
)
reflected = sorted(
[
{"name": item["name"], "sqltext": normalize(item["sqltext"])}
for item in reflected
],
key=lambda item: (item["sqltext"]),
)
expected = sorted(
expected,
key=lambda item: (item["sqltext"]),
)
eq_(reflected, expected)
@testing.requires.check_constraint_reflection
def test_check_constraint_no_constraint(self, metadata, inspect_for_table):
with inspect_for_table("no_constraints") as (schema, inspector):
Table(
"no_constraints",
metadata,
Column("data", sa.String(20)),
schema=schema,
)
self.ck_eq(
inspector.get_check_constraints("no_constraints", schema=schema),
[],
)
@testing.requires.inline_check_constraint_reflection
@testing.combinations(
"my_inline", "MyInline", None, argnames="constraint_name"
)
def test_check_constraint_inline(
self, metadata, inspect_for_table, constraint_name
):
with inspect_for_table("sa_cc") as (schema, inspector):
Table(
"sa_cc",
metadata,
Column("id", Integer(), primary_key=True),
Column(
"a",
Integer(),
sa.CheckConstraint(
"a > 1 AND a < 5", name=constraint_name
),
),
Column("data", String(50)),
schema=schema,
)
reflected = inspector.get_check_constraints("sa_cc", schema=schema)
self.ck_eq(
reflected,
[
{
"name": constraint_name or mock.ANY,
"sqltext": "a > 1 and a < 5",
},
],
)
@testing.requires.check_constraint_reflection
@testing.combinations(
"my_ck_const", "MyCkConst", None, argnames="constraint_name"
)
def test_check_constraint_standalone(
self, metadata, inspect_for_table, constraint_name
):
with inspect_for_table("sa_cc") as (schema, inspector):
Table(
"sa_cc",
metadata,
Column("a", Integer()),
sa.CheckConstraint(
"a = 1 OR (a > 2 AND a < 5)", name=constraint_name
),
schema=schema,
)
reflected = inspector.get_check_constraints("sa_cc", schema=schema)
self.ck_eq(
reflected,
[
{
"name": constraint_name or mock.ANY,
"sqltext": "a = 1 or a > 2 and a < 5",
},
],
)
@testing.requires.inline_check_constraint_reflection
def test_check_constraint_mixed(self, metadata, inspect_for_table):
with inspect_for_table("sa_cc") as (schema, inspector):
Table(
"sa_cc",
metadata,
Column("id", Integer(), primary_key=True),
Column("a", Integer(), sa.CheckConstraint("a > 1 AND a < 5")),
Column(
"b",
Integer(),
sa.CheckConstraint("b > 1 AND b < 5", name="my_inline"),
),
Column("c", Integer()),
Column("data", String(50)),
sa.UniqueConstraint("data", name="some_uq"),
sa.CheckConstraint("c > 1 AND c < 5", name="cc1"),
sa.UniqueConstraint("c", name="some_c_uq"),
schema=schema,
)
reflected = inspector.get_check_constraints("sa_cc", schema=schema)
self.ck_eq(
reflected,
[
{"name": "cc1", "sqltext": "c > 1 and c < 5"},
{"name": "my_inline", "sqltext": "b > 1 and b < 5"},
{"name": mock.ANY, "sqltext": "a > 1 and a < 5"},
],
)
@testing.requires.indexes_check_column_order
def test_index_column_order(self, metadata, inspect_for_table):
"""test for #12894"""
with inspect_for_table("sa_multi_index") as (schema, inspector):
test_table = Table(
"sa_multi_index",
metadata,
Column("Column1", Integer, primary_key=True),
Column("Column2", Integer),
Column("Column3", Integer),
)
Index(
"Index_Example",
test_table.c.Column3,
test_table.c.Column1,
test_table.c.Column2,
)
indexes = inspector.get_indexes("sa_multi_index")
eq_(indexes[0]["column_names"], ["Column3", "Column1", "Column2"])
@testing.requires.indexes_with_expressions
def test_reflect_expression_based_indexes(self, metadata, connection):
t = Table(
"t",
metadata,
Column("x", String(30)),
Column("y", String(30)),
Column("z", String(30)),
)
Index("t_idx", func.lower(t.c.x), t.c.z, func.lower(t.c.y))
long_str = "long string " * 100
Index("t_idx_long", func.coalesce(t.c.x, long_str))
Index("t_idx_2", t.c.x)
metadata.create_all(connection)
insp = inspect(connection)
expected = [
{
"name": "t_idx_2",
"column_names": ["x"],
"unique": False,
"dialect_options": {},
}
]
def completeIndex(entry):
if testing.requires.index_reflects_included_columns.enabled:
entry["include_columns"] = []
entry["dialect_options"] = {
f"{connection.engine.name}_include": []
}
else:
entry.setdefault("dialect_options", {})
completeIndex(expected[0])
class lower_index_str(str):
def __eq__(self, other):
ol = other.lower()
# test that lower and x or y are in the string
return "lower" in ol and ("x" in ol or "y" in ol)
class coalesce_index_str(str):
def __eq__(self, other):
# test that coalesce and the string is in other
return "coalesce" in other.lower() and long_str in other
if testing.requires.reflect_indexes_with_expressions.enabled:
expr_index = {
"name": "t_idx",
"column_names": [None, "z", None],
"expressions": [
lower_index_str("lower(x)"),
"z",
lower_index_str("lower(y)"),
],
"unique": False,
}
completeIndex(expr_index)
expected.insert(0, expr_index)
expr_index_long = {
"name": "t_idx_long",
"column_names": [None],
"expressions": [
coalesce_index_str(f"coalesce(x, '{long_str}')")
],
"unique": False,
}
completeIndex(expr_index_long)
expected.append(expr_index_long)
eq_(insp.get_indexes("t"), expected)
m2 = MetaData()
t2 = Table("t", m2, autoload_with=connection)
else:
with expect_warnings(
"Skipped unsupported reflection of expression-based "
"index t_idx"
):
eq_(insp.get_indexes("t"), expected)
m2 = MetaData()
t2 = Table("t", m2, autoload_with=connection)
self.compare_table_index_with_expected(
t2, expected, connection.engine.name
)
@testing.requires.index_reflects_included_columns
def test_reflect_covering_index(self, metadata, connection):
t = Table(
"t",
metadata,
Column("x", String(30)),
Column("y", String(30)),
)
idx = Index("t_idx", t.c.x)
idx.dialect_options[connection.engine.name]["include"] = ["y"]
metadata.create_all(connection)
insp = inspect(connection)
get_indexes = insp.get_indexes("t")
eq_(
get_indexes,
[
{
"name": "t_idx",
"column_names": ["x"],
"include_columns": ["y"],
"unique": False,
"dialect_options": mock.ANY,
}
],
)
eq_(
get_indexes[0]["dialect_options"][
"%s_include" % connection.engine.name
],
["y"],
)
t2 = Table("t", MetaData(), autoload_with=connection)
eq_(
list(t2.indexes)[0].dialect_options[connection.engine.name][
"include"
],
["y"],
)
def _type_round_trip(self, connection, metadata, *types):
t = Table(
"t",
metadata,
*[Column("t%d" % i, type_) for i, type_ in enumerate(types)],
)
t.create(connection)
return [c["type"] for c in inspect(connection).get_columns("t")]
@testing.requires.table_reflection
def test_numeric_reflection(self, connection, metadata):
for typ in self._type_round_trip(
connection, metadata, sql_types.Numeric(18, 5)
):
assert isinstance(typ, sql_types.Numeric)
eq_(typ.precision, 18)
eq_(typ.scale, 5)
@testing.requires.table_reflection
@testing.combinations(
sql_types.String,
sql_types.VARCHAR,
sql_types.CHAR,
(sql_types.NVARCHAR, testing.requires.nvarchar_types),
(sql_types.NCHAR, testing.requires.nvarchar_types),
argnames="type_",
)
def test_string_length_reflection(self, connection, metadata, type_):
typ = self._type_round_trip(connection, metadata, type_(52))[0]
if issubclass(type_, sql_types.VARCHAR):
assert isinstance(typ, sql_types.VARCHAR)
elif issubclass(type_, sql_types.CHAR):
assert isinstance(typ, sql_types.CHAR)
else:
assert isinstance(typ, sql_types.String)
eq_(typ.length, 52)
assert isinstance(typ.length, int)
@testing.requires.table_reflection
def test_nullable_reflection(self, connection, metadata):
t = Table(
"t",
metadata,
Column("a", Integer, nullable=True),
Column("b", Integer, nullable=False),
)
t.create(connection)
eq_(
{
col["name"]: col["nullable"]
for col in inspect(connection).get_columns("t")
},
{"a": True, "b": False},
)
@testing.combinations(
(
None,
"CASCADE",
None,
testing.requires.foreign_key_constraint_option_reflection_ondelete,
),
(
None,
None,
"SET NULL",
testing.requires.foreign_key_constraint_option_reflection_onupdate,
),
(
{},
None,
"NO ACTION",
testing.requires.foreign_key_constraint_option_reflection_onupdate,
),
(
{},
"NO ACTION",
None,
testing.requires.fk_constraint_option_reflection_ondelete_noaction,
),
(
None,
None,
"RESTRICT",
testing.requires.fk_constraint_option_reflection_onupdate_restrict,
),
(
None,
"RESTRICT",
None,
testing.requires.fk_constraint_option_reflection_ondelete_restrict,
),
argnames="expected,ondelete,onupdate",
)
def test_get_foreign_key_options(
self, connection, metadata, expected, ondelete, onupdate
):
options = {}
if ondelete:
options["ondelete"] = ondelete
if onupdate:
options["onupdate"] = onupdate
if expected is None:
expected = options
Table(
"x",
metadata,
Column("id", Integer, primary_key=True),
test_needs_fk=True,
)
Table(
"table",
metadata,
Column("id", Integer, primary_key=True),
Column("x_id", Integer, ForeignKey("x.id", name="xid")),
Column("test", String(10)),
test_needs_fk=True,
)
Table(
"user",
metadata,
Column("id", Integer, primary_key=True),
Column("name", String(50), nullable=False),
Column("tid", Integer),
sa.ForeignKeyConstraint(
["tid"], ["table.id"], name="myfk", **options
),
test_needs_fk=True,
)
metadata.create_all(connection)
insp = inspect(connection)
# test 'options' is always present for a backend
# that can reflect these, since alembic looks for this
opts = insp.get_foreign_keys("table")[0]["options"]
eq_({k: opts[k] for k in opts if opts[k]}, {})
opts = insp.get_foreign_keys("user")[0]["options"]
eq_(opts, expected)
# eq_(dict((k, opts[k]) for k in opts if opts[k]), expected)
@testing.combinations(
(Integer, sa.text("10"), r"'?10'?"),
(Integer, "10", r"'?10'?"),
(Boolean, sa.true(), r"1|true"),
(
Integer,
sa.text("3 + 5"),
r"3\+5",
testing.requires.expression_server_defaults,
),
(
Integer,
sa.text("(3 * 5)"),
r"3\*5",
testing.requires.expression_server_defaults,
),
(DateTime, func.now(), r"current_timestamp|now|getdate"),
(
Integer,
sa.literal_column("3") + sa.literal_column("5"),
r"3\+5",
testing.requires.expression_server_defaults,
),
argnames="datatype, default, expected_reg",
)
@testing.requires.server_defaults
def test_server_defaults(
self, metadata, connection, datatype, default, expected_reg
):
t = Table(
"t",
metadata,
Column("id", Integer, primary_key=True),
Column("thecol", datatype, server_default=default),
)
t.create(connection)
reflected = inspect(connection).get_columns("t")[1]["default"]
reflected_sanitized = re.sub(r"[\(\) \']", "", reflected)
eq_regex(reflected_sanitized, expected_reg, flags=re.IGNORECASE)
class NormalizedNameTest(fixtures.TablesTest):
__requires__ = ("denormalized_names",)
__sparse_driver_backend__ = True
@classmethod
def define_tables(cls, metadata):
Table(
quoted_name("t1", quote=True),
metadata,
Column("id", Integer, primary_key=True),
)
Table(
quoted_name("t2", quote=True),
metadata,
Column("id", Integer, primary_key=True),
Column("t1id", ForeignKey("t1.id")),
)
def test_reflect_lowercase_forced_tables(self):
m2 = MetaData()
t2_ref = Table(
quoted_name("t2", quote=True), m2, autoload_with=config.db
)
t1_ref = m2.tables["t1"]
assert t2_ref.c.t1id.references(t1_ref.c.id)
m3 = MetaData()
m3.reflect(
config.db, only=lambda name, m: name.lower() in ("t1", "t2")
)
assert m3.tables["t2"].c.t1id.references(m3.tables["t1"].c.id)
def test_get_table_names(self):
tablenames = [
t
for t in inspect(config.db).get_table_names()
if t.lower() in ("t1", "t2")
]
eq_(tablenames[0].upper(), tablenames[0].lower())
eq_(tablenames[1].upper(), tablenames[1].lower())
class ComputedReflectionTest(fixtures.ComputedReflectionFixtureTest):
def test_computed_col_default_not_set(self):
insp = inspect(config.db)
cols = insp.get_columns("computed_default_table")
col_data = {c["name"]: c for c in cols}
is_true("42" in col_data["with_default"]["default"])
is_(col_data["normal"]["default"], None)
is_(col_data["computed_col"]["default"], None)
def test_get_column_returns_computed(self):
insp = inspect(config.db)
cols = insp.get_columns("computed_default_table")
data = {c["name"]: c for c in cols}
for key in ("id", "normal", "with_default"):
is_true("computed" not in data[key])
compData = data["computed_col"]
is_true("computed" in compData)
is_true("sqltext" in compData["computed"])
eq_(self.normalize(compData["computed"]["sqltext"]), "normal+42")
eq_(
"persisted" in compData["computed"],
testing.requires.computed_columns_reflect_persisted.enabled,
)
if testing.requires.computed_columns_reflect_persisted.enabled:
eq_(
compData["computed"]["persisted"],
testing.requires.computed_columns_default_persisted.enabled,
)
def check_column(self, data, column, sqltext, persisted):
is_true("computed" in data[column])
compData = data[column]["computed"]
eq_(self.normalize(compData["sqltext"]), sqltext)
if testing.requires.computed_columns_reflect_persisted.enabled:
is_true("persisted" in compData)
is_(compData["persisted"], persisted)
def test_get_column_returns_persisted(self):
insp = inspect(config.db)
cols = insp.get_columns("computed_column_table")
data = {c["name"]: c for c in cols}
self.check_column(
data,
"computed_no_flag",
"normal+42",
testing.requires.computed_columns_default_persisted.enabled,
)
if testing.requires.computed_columns_virtual.enabled:
self.check_column(
data,
"computed_virtual",
"normal+2",
False,
)
if testing.requires.computed_columns_stored.enabled:
self.check_column(
data,
"computed_stored",
"normal-42",
True,
)
@testing.requires.schemas
def test_get_column_returns_persisted_with_schema(self):
insp = inspect(config.db)
cols = insp.get_columns(
"computed_column_table", schema=config.test_schema
)
data = {c["name"]: c for c in cols}
self.check_column(
data,
"computed_no_flag",
"normal/42",
testing.requires.computed_columns_default_persisted.enabled,
)
if testing.requires.computed_columns_virtual.enabled:
self.check_column(
data,
"computed_virtual",
"normal/2",
False,
)
if testing.requires.computed_columns_stored.enabled:
self.check_column(
data,
"computed_stored",
"normal*42",
True,
)
class IdentityReflectionTest(fixtures.TablesTest):
run_inserts = run_deletes = None
__sparse_driver_backend__ = True
__requires__ = ("identity_columns", "table_reflection")
@classmethod
def define_tables(cls, metadata):
Table(
"t1",
metadata,
Column("normal", Integer),
Column("id1", Integer, Identity()),
)
Table(
"t2",
metadata,
Column(
"id2",
Integer,
Identity(
always=True,
start=2,
increment=3,
minvalue=-2,
maxvalue=42,
cycle=True,
cache=4,
),
),
)
if testing.requires.schemas.enabled:
Table(
"t1",
metadata,
Column("normal", Integer),
Column("id1", Integer, Identity(always=True, start=20)),
schema=config.test_schema,
)
def check(self, value, exp, approx):
if testing.requires.identity_columns_standard.enabled:
common_keys = (
"always",
"start",
"increment",
"minvalue",
"maxvalue",
"cycle",
"cache",
)
for k in list(value):
if k not in common_keys:
value.pop(k)
if approx:
eq_(len(value), len(exp))
for k in value:
if k == "minvalue":
is_true(value[k] <= exp[k])
elif k in {"maxvalue", "cache"}:
is_true(value[k] >= exp[k])
else:
eq_(value[k], exp[k], k)
else:
eq_(value, exp)
else:
eq_(value["start"], exp["start"])
eq_(value["increment"], exp["increment"])
def test_reflect_identity(self):
insp = inspect(config.db)
cols = insp.get_columns("t1") + insp.get_columns("t2")
for col in cols:
if col["name"] == "normal":
is_false("identity" in col)
elif col["name"] == "id1":
if "autoincrement" in col:
is_true(col["autoincrement"])
eq_(col["default"], None)
is_true("identity" in col)
self.check(
col["identity"],
dict(
always=False,
start=1,
increment=1,
minvalue=1,
maxvalue=2147483647,
cycle=False,
cache=1,
),
approx=True,
)
elif col["name"] == "id2":
if "autoincrement" in col:
is_true(col["autoincrement"])
eq_(col["default"], None)
is_true("identity" in col)
self.check(
col["identity"],
dict(
always=True,
start=2,
increment=3,
minvalue=-2,
maxvalue=42,
cycle=True,
cache=4,
),
approx=False,
)
@testing.requires.schemas
def test_reflect_identity_schema(self):
insp = inspect(config.db)
cols = insp.get_columns("t1", schema=config.test_schema)
for col in cols:
if col["name"] == "normal":
is_false("identity" in col)
elif col["name"] == "id1":
if "autoincrement" in col:
is_true(col["autoincrement"])
eq_(col["default"], None)
is_true("identity" in col)
self.check(
col["identity"],
dict(
always=True,
start=20,
increment=1,
minvalue=1,
maxvalue=2147483647,
cycle=False,
cache=1,
),
approx=True,
)
class CompositeKeyReflectionTest(fixtures.TablesTest):
__sparse_driver_backend__ = True
@classmethod
def define_tables(cls, metadata):
tb1 = Table(
"tb1",
metadata,
Column("id", Integer),
Column("attr", Integer),
Column("name", sql_types.VARCHAR(20)),
sa.PrimaryKeyConstraint("name", "id", "attr", name="pk_tb1"),
schema=None,
test_needs_fk=True,
)
Table(
"tb2",
metadata,
Column("id", Integer, primary_key=True),
Column("pid", Integer),
Column("pattr", Integer),
Column("pname", sql_types.VARCHAR(20)),
sa.ForeignKeyConstraint(
["pname", "pid", "pattr"],
[tb1.c.name, tb1.c.id, tb1.c.attr],
name="fk_tb1_name_id_attr",
),
schema=None,
test_needs_fk=True,
)
@testing.requires.primary_key_constraint_reflection
def test_pk_column_order(self, connection):
# test for issue #5661
insp = inspect(connection)
primary_key = insp.get_pk_constraint(self.tables.tb1.name)
eq_(primary_key.get("constrained_columns"), ["name", "id", "attr"])
@testing.requires.foreign_key_constraint_reflection
def test_fk_column_order(self, connection):
# test for issue #5661
insp = inspect(connection)
foreign_keys = insp.get_foreign_keys(self.tables.tb2.name)
eq_(len(foreign_keys), 1)
fkey1 = foreign_keys[0]
eq_(fkey1.get("referred_columns"), ["name", "id", "attr"])
eq_(fkey1.get("constrained_columns"), ["pname", "pid", "pattr"])
__all__ = (
"ComponentReflectionTest",
"ComponentReflectionTestExtra",
"TableNoColumnsTest",
"QuotedNameArgumentTest",
"BizarroCharacterTest",
"HasTableTest",
"HasIndexTest",
"NormalizedNameTest",
"ComputedReflectionTest",
"IdentityReflectionTest",
"CompositeKeyReflectionTest",
"TempTableElementsTest",
)