ai_v/venv/Lib/site-packages/sqlalchemy/testing/suite/test_select.py
24024 af7c11d7f9 feat(api): 实现图像生成及后台同步功能
- 新增图像生成接口,支持试用、积分和自定义API Key模式
- 实现生成图片结果异步上传至MinIO存储,带重试机制
- 优化积分预扣除和异常退还逻辑,保障用户积分准确
- 添加获取生成历史记录接口,支持时间范围和分页
- 提供本地字典配置接口,支持模型、比例、提示模板和尺寸
- 实现图片批量上传接口,支持S3兼容对象存储

feat(admin): 增加管理员角色管理与权限分配接口

- 实现角色列表查询、角色创建、更新及删除功能
- 增加权限列表查询接口
- 实现用户角色分配接口,便于统一管理用户权限
- 增加系统字典增删查改接口,支持分类过滤和排序
- 权限控制全面覆盖管理接口,保证安全访问

feat(auth): 完善用户登录注册及权限相关接口与页面

- 实现手机号验证码发送及校验功能,保障注册安全
- 支持手机号注册、登录及退出接口,集成日志记录
- 增加修改密码功能,验证原密码后更新
- 提供动态导航菜单接口,基于权限展示不同菜单
- 实现管理界面路由及日志、角色、字典管理页面访问权限控制
- 添加系统日志查询接口,支持关键词和等级筛选

feat(app): 初始化Flask应用并配置蓝图与数据库

- 创建应用程序工厂,加载配置,初始化数据库和Redis客户端
- 注册认证、API及管理员蓝图,整合路由
- 根路由渲染主页模板
- 应用上下文中自动创建数据库表,保证运行环境准备完毕

feat(database): 提供数据库创建与迁移支持脚本

- 新增数据库创建脚本,支持自动检测是否已存在
- 添加数据库表初始化脚本,支持创建和删除所有表
- 实现RBAC权限初始化,包含基础权限和角色创建
- 新增字段手动修复脚本,添加用户API Key和积分字段
- 强制迁移脚本支持清理连接和修复表结构,初始化默认数据及角色分配

feat(config): 新增系统配置参数

- 配置数据库、Redis、Session和MinIO相关参数
- 添加AI接口地址及试用Key配置
- 集成阿里云短信服务配置及开发模式相关参数

feat(extensions): 初始化数据库、Redis和MinIO客户端

- 创建全局SQLAlchemy数据库实例和Redis客户端
- 配置基于boto3的MinIO兼容S3客户端

chore(logs): 添加示例系统日志文件

- 记录用户请求、验证码发送成功与失败的日志信息
2026-01-12 00:53:31 +08:00

2011 lines
61 KiB
Python

# testing/suite/test_select.py
# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
# <see AUTHORS file>
#
# This module is part of SQLAlchemy and is released under
# the MIT License: https://www.opensource.org/licenses/mit-license.php
# mypy: ignore-errors
import collections.abc as collections_abc
import itertools
from .. import AssertsCompiledSQL
from .. import AssertsExecutionResults
from .. import config
from .. import fixtures
from ..assertions import assert_raises
from ..assertions import eq_
from ..assertions import in_
from ..assertsql import CursorSQL
from ..schema import Column
from ..schema import Table
from ... import bindparam
from ... import case
from ... import column
from ... import Computed
from ... import exists
from ... import false
from ... import ForeignKey
from ... import func
from ... import Identity
from ... import Integer
from ... import literal
from ... import literal_column
from ... import null
from ... import select
from ... import String
from ... import table
from ... import testing
from ... import text
from ... import true
from ... import tuple_
from ... import TupleType
from ... import union
from ... import values
from ...exc import DatabaseError
from ...exc import ProgrammingError
class CollateTest(fixtures.TablesTest):
__sparse_driver_backend__ = True
@classmethod
def define_tables(cls, metadata):
Table(
"some_table",
metadata,
Column("id", Integer, primary_key=True),
Column("data", String(100)),
)
@classmethod
def insert_data(cls, connection):
connection.execute(
cls.tables.some_table.insert(),
[
{"id": 1, "data": "collate data1"},
{"id": 2, "data": "collate data2"},
],
)
def _assert_result(self, select, result):
with config.db.connect() as conn:
eq_(conn.execute(select).fetchall(), result)
@testing.requires.order_by_collation
def test_collate_order_by(self):
collation = testing.requires.get_order_by_collation(testing.config)
self._assert_result(
select(self.tables.some_table).order_by(
self.tables.some_table.c.data.collate(collation).asc()
),
[(1, "collate data1"), (2, "collate data2")],
)
class OrderByLabelTest(fixtures.TablesTest):
"""Test the dialect sends appropriate ORDER BY expressions when
labels are used.
This essentially exercises the "supports_simple_order_by_label"
setting.
"""
__sparse_driver_backend__ = True
@classmethod
def define_tables(cls, metadata):
Table(
"some_table",
metadata,
Column("id", Integer, primary_key=True),
Column("x", Integer),
Column("y", Integer),
Column("q", String(50)),
Column("p", String(50)),
)
@classmethod
def insert_data(cls, connection):
connection.execute(
cls.tables.some_table.insert(),
[
{"id": 1, "x": 1, "y": 2, "q": "q1", "p": "p3"},
{"id": 2, "x": 2, "y": 3, "q": "q2", "p": "p2"},
{"id": 3, "x": 3, "y": 4, "q": "q3", "p": "p1"},
],
)
def _assert_result(self, select, result):
with config.db.connect() as conn:
eq_(conn.execute(select).fetchall(), result)
def test_plain(self):
table = self.tables.some_table
lx = table.c.x.label("lx")
self._assert_result(select(lx).order_by(lx), [(1,), (2,), (3,)])
def test_composed_int(self):
table = self.tables.some_table
lx = (table.c.x + table.c.y).label("lx")
self._assert_result(select(lx).order_by(lx), [(3,), (5,), (7,)])
def test_composed_multiple(self):
table = self.tables.some_table
lx = (table.c.x + table.c.y).label("lx")
ly = (func.lower(table.c.q) + table.c.p).label("ly")
self._assert_result(
select(lx, ly).order_by(lx, ly.desc()),
[(3, "q1p3"), (5, "q2p2"), (7, "q3p1")],
)
def test_plain_desc(self):
table = self.tables.some_table
lx = table.c.x.label("lx")
self._assert_result(select(lx).order_by(lx.desc()), [(3,), (2,), (1,)])
def test_composed_int_desc(self):
table = self.tables.some_table
lx = (table.c.x + table.c.y).label("lx")
self._assert_result(select(lx).order_by(lx.desc()), [(7,), (5,), (3,)])
@testing.requires.group_by_complex_expression
def test_group_by_composed(self):
table = self.tables.some_table
expr = (table.c.x + table.c.y).label("lx")
stmt = (
select(func.count(table.c.id), expr).group_by(expr).order_by(expr)
)
self._assert_result(stmt, [(1, 3), (1, 5), (1, 7)])
class ValuesExpressionTest(fixtures.TestBase):
__requires__ = ("table_value_constructor",)
__sparse_driver_backend__ = True
def test_tuples(self, connection):
value_expr = values(
column("id", Integer), column("name", String), name="my_values"
).data([(1, "name1"), (2, "name2"), (3, "name3")])
eq_(
connection.execute(select(value_expr)).all(),
[(1, "name1"), (2, "name2"), (3, "name3")],
)
class FetchLimitOffsetTest(fixtures.TablesTest):
__backend__ = True
@classmethod
def define_tables(cls, metadata):
Table(
"some_table",
metadata,
Column("id", Integer, primary_key=True),
Column("x", Integer),
Column("y", Integer),
)
@classmethod
def insert_data(cls, connection):
connection.execute(
cls.tables.some_table.insert(),
[
{"id": 1, "x": 1, "y": 2},
{"id": 2, "x": 2, "y": 3},
{"id": 3, "x": 3, "y": 4},
{"id": 4, "x": 4, "y": 5},
{"id": 5, "x": 4, "y": 6},
],
)
def _assert_result(
self, connection, select, result, params=(), set_=False
):
if set_:
query_res = connection.execute(select, params).fetchall()
eq_(len(query_res), len(result))
eq_(set(query_res), set(result))
else:
eq_(connection.execute(select, params).fetchall(), result)
def _assert_result_str(self, select, result, params=()):
with config.db.connect() as conn:
eq_(conn.exec_driver_sql(select, params).fetchall(), result)
def test_simple_limit(self, connection):
table = self.tables.some_table
stmt = select(table).order_by(table.c.id)
self._assert_result(
connection,
stmt.limit(2),
[(1, 1, 2), (2, 2, 3)],
)
self._assert_result(
connection,
stmt.limit(3),
[(1, 1, 2), (2, 2, 3), (3, 3, 4)],
)
def test_limit_render_multiple_times(self, connection):
table = self.tables.some_table
stmt = (
select(table.c.id).order_by(table.c.id).limit(1).scalar_subquery()
)
u = union(select(stmt), select(stmt)).subquery().select()
self._assert_result(
connection,
u,
[
(1,),
],
)
@testing.requires.fetch_first
def test_simple_fetch(self, connection):
table = self.tables.some_table
self._assert_result(
connection,
select(table).order_by(table.c.id).fetch(2),
[(1, 1, 2), (2, 2, 3)],
)
self._assert_result(
connection,
select(table).order_by(table.c.id).fetch(3),
[(1, 1, 2), (2, 2, 3), (3, 3, 4)],
)
@testing.requires.offset
def test_simple_offset(self, connection):
table = self.tables.some_table
self._assert_result(
connection,
select(table).order_by(table.c.id).offset(2),
[(3, 3, 4), (4, 4, 5), (5, 4, 6)],
)
self._assert_result(
connection,
select(table).order_by(table.c.id).offset(3),
[(4, 4, 5), (5, 4, 6)],
)
@testing.combinations(
([(2, 0), (2, 1), (3, 2)]),
([(2, 1), (2, 0), (3, 2)]),
([(3, 1), (2, 1), (3, 1)]),
argnames="cases",
)
@testing.requires.offset
def test_simple_limit_offset(self, connection, cases):
table = self.tables.some_table
connection = connection.execution_options(compiled_cache={})
assert_data = [(1, 1, 2), (2, 2, 3), (3, 3, 4), (4, 4, 5), (5, 4, 6)]
for limit, offset in cases:
expected = assert_data[offset : offset + limit]
self._assert_result(
connection,
select(table).order_by(table.c.id).limit(limit).offset(offset),
expected,
)
@testing.requires.fetch_first
def test_simple_fetch_offset(self, connection):
table = self.tables.some_table
self._assert_result(
connection,
select(table).order_by(table.c.id).fetch(2).offset(1),
[(2, 2, 3), (3, 3, 4)],
)
self._assert_result(
connection,
select(table).order_by(table.c.id).fetch(3).offset(2),
[(3, 3, 4), (4, 4, 5), (5, 4, 6)],
)
@testing.requires.fetch_no_order_by
def test_fetch_offset_no_order(self, connection):
table = self.tables.some_table
self._assert_result(
connection,
select(table).fetch(10),
[(1, 1, 2), (2, 2, 3), (3, 3, 4), (4, 4, 5), (5, 4, 6)],
set_=True,
)
@testing.requires.offset
def test_simple_offset_zero(self, connection):
table = self.tables.some_table
self._assert_result(
connection,
select(table).order_by(table.c.id).offset(0),
[(1, 1, 2), (2, 2, 3), (3, 3, 4), (4, 4, 5), (5, 4, 6)],
)
self._assert_result(
connection,
select(table).order_by(table.c.id).offset(1),
[(2, 2, 3), (3, 3, 4), (4, 4, 5), (5, 4, 6)],
)
@testing.requires.offset
def test_limit_offset_nobinds(self):
"""test that 'literal binds' mode works - no bound params."""
table = self.tables.some_table
stmt = select(table).order_by(table.c.id).limit(2).offset(1)
sql = stmt.compile(
dialect=config.db.dialect, compile_kwargs={"literal_binds": True}
)
sql = str(sql)
self._assert_result_str(sql, [(2, 2, 3), (3, 3, 4)])
@testing.requires.fetch_first
def test_fetch_offset_nobinds(self):
"""test that 'literal binds' mode works - no bound params."""
table = self.tables.some_table
stmt = select(table).order_by(table.c.id).fetch(2).offset(1)
sql = stmt.compile(
dialect=config.db.dialect, compile_kwargs={"literal_binds": True}
)
sql = str(sql)
self._assert_result_str(sql, [(2, 2, 3), (3, 3, 4)])
@testing.requires.bound_limit_offset
def test_bound_limit(self, connection):
table = self.tables.some_table
self._assert_result(
connection,
select(table).order_by(table.c.id).limit(bindparam("l")),
[(1, 1, 2), (2, 2, 3)],
params={"l": 2},
)
self._assert_result(
connection,
select(table).order_by(table.c.id).limit(bindparam("l")),
[(1, 1, 2), (2, 2, 3), (3, 3, 4)],
params={"l": 3},
)
@testing.requires.bound_limit_offset
def test_bound_offset(self, connection):
table = self.tables.some_table
self._assert_result(
connection,
select(table).order_by(table.c.id).offset(bindparam("o")),
[(3, 3, 4), (4, 4, 5), (5, 4, 6)],
params={"o": 2},
)
self._assert_result(
connection,
select(table).order_by(table.c.id).offset(bindparam("o")),
[(2, 2, 3), (3, 3, 4), (4, 4, 5), (5, 4, 6)],
params={"o": 1},
)
@testing.requires.bound_limit_offset
def test_bound_limit_offset(self, connection):
table = self.tables.some_table
self._assert_result(
connection,
select(table)
.order_by(table.c.id)
.limit(bindparam("l"))
.offset(bindparam("o")),
[(2, 2, 3), (3, 3, 4)],
params={"l": 2, "o": 1},
)
self._assert_result(
connection,
select(table)
.order_by(table.c.id)
.limit(bindparam("l"))
.offset(bindparam("o")),
[(3, 3, 4), (4, 4, 5), (5, 4, 6)],
params={"l": 3, "o": 2},
)
@testing.requires.fetch_first
def test_bound_fetch_offset(self, connection):
table = self.tables.some_table
self._assert_result(
connection,
select(table)
.order_by(table.c.id)
.fetch(bindparam("f"))
.offset(bindparam("o")),
[(2, 2, 3), (3, 3, 4)],
params={"f": 2, "o": 1},
)
self._assert_result(
connection,
select(table)
.order_by(table.c.id)
.fetch(bindparam("f"))
.offset(bindparam("o")),
[(3, 3, 4), (4, 4, 5), (5, 4, 6)],
params={"f": 3, "o": 2},
)
@testing.requires.sql_expression_limit_offset
def test_expr_offset(self, connection):
table = self.tables.some_table
self._assert_result(
connection,
select(table)
.order_by(table.c.id)
.offset(literal_column("1") + literal_column("2")),
[(4, 4, 5), (5, 4, 6)],
)
@testing.requires.sql_expression_limit_offset
def test_expr_limit(self, connection):
table = self.tables.some_table
self._assert_result(
connection,
select(table)
.order_by(table.c.id)
.limit(literal_column("1") + literal_column("2")),
[(1, 1, 2), (2, 2, 3), (3, 3, 4)],
)
@testing.requires.sql_expression_limit_offset
def test_expr_limit_offset(self, connection):
table = self.tables.some_table
self._assert_result(
connection,
select(table)
.order_by(table.c.id)
.limit(literal_column("1") + literal_column("1"))
.offset(literal_column("1") + literal_column("1")),
[(3, 3, 4), (4, 4, 5)],
)
@testing.requires.fetch_first
@testing.requires.fetch_expression
def test_expr_fetch_offset(self, connection):
table = self.tables.some_table
self._assert_result(
connection,
select(table)
.order_by(table.c.id)
.fetch(literal_column("1") + literal_column("1"))
.offset(literal_column("1") + literal_column("1")),
[(3, 3, 4), (4, 4, 5)],
)
@testing.requires.sql_expression_limit_offset
def test_simple_limit_expr_offset(self, connection):
table = self.tables.some_table
self._assert_result(
connection,
select(table)
.order_by(table.c.id)
.limit(2)
.offset(literal_column("1") + literal_column("1")),
[(3, 3, 4), (4, 4, 5)],
)
self._assert_result(
connection,
select(table)
.order_by(table.c.id)
.limit(3)
.offset(literal_column("1") + literal_column("1")),
[(3, 3, 4), (4, 4, 5), (5, 4, 6)],
)
@testing.requires.sql_expression_limit_offset
def test_expr_limit_simple_offset(self, connection):
table = self.tables.some_table
self._assert_result(
connection,
select(table)
.order_by(table.c.id)
.limit(literal_column("1") + literal_column("1"))
.offset(2),
[(3, 3, 4), (4, 4, 5)],
)
self._assert_result(
connection,
select(table)
.order_by(table.c.id)
.limit(literal_column("1") + literal_column("1"))
.offset(1),
[(2, 2, 3), (3, 3, 4)],
)
@testing.requires.fetch_ties
def test_simple_fetch_ties(self, connection):
table = self.tables.some_table
self._assert_result(
connection,
select(table).order_by(table.c.x.desc()).fetch(1, with_ties=True),
[(4, 4, 5), (5, 4, 6)],
set_=True,
)
self._assert_result(
connection,
select(table).order_by(table.c.x.desc()).fetch(3, with_ties=True),
[(3, 3, 4), (4, 4, 5), (5, 4, 6)],
set_=True,
)
@testing.requires.fetch_ties
@testing.requires.fetch_offset_with_options
def test_fetch_offset_ties(self, connection):
table = self.tables.some_table
fa = connection.execute(
select(table)
.order_by(table.c.x)
.fetch(2, with_ties=True)
.offset(2)
).fetchall()
eq_(fa[0], (3, 3, 4))
eq_(set(fa), {(3, 3, 4), (4, 4, 5), (5, 4, 6)})
@testing.requires.fetch_ties
@testing.requires.fetch_offset_with_options
def test_fetch_offset_ties_exact_number(self, connection):
table = self.tables.some_table
self._assert_result(
connection,
select(table)
.order_by(table.c.x)
.fetch(2, with_ties=True)
.offset(1),
[(2, 2, 3), (3, 3, 4)],
)
self._assert_result(
connection,
select(table)
.order_by(table.c.x)
.fetch(3, with_ties=True)
.offset(3),
[(4, 4, 5), (5, 4, 6)],
)
@testing.requires.fetch_percent
def test_simple_fetch_percent(self, connection):
table = self.tables.some_table
self._assert_result(
connection,
select(table).order_by(table.c.id).fetch(20, percent=True),
[(1, 1, 2)],
)
@testing.requires.fetch_percent
@testing.requires.fetch_offset_with_options
def test_fetch_offset_percent(self, connection):
table = self.tables.some_table
self._assert_result(
connection,
select(table)
.order_by(table.c.id)
.fetch(40, percent=True)
.offset(1),
[(2, 2, 3), (3, 3, 4)],
)
@testing.requires.fetch_ties
@testing.requires.fetch_percent
def test_simple_fetch_percent_ties(self, connection):
table = self.tables.some_table
self._assert_result(
connection,
select(table)
.order_by(table.c.x.desc())
.fetch(20, percent=True, with_ties=True),
[(4, 4, 5), (5, 4, 6)],
set_=True,
)
@testing.requires.fetch_ties
@testing.requires.fetch_percent
@testing.requires.fetch_offset_with_options
def test_fetch_offset_percent_ties(self, connection):
table = self.tables.some_table
fa = connection.execute(
select(table)
.order_by(table.c.x)
.fetch(40, percent=True, with_ties=True)
.offset(2)
).fetchall()
eq_(fa[0], (3, 3, 4))
eq_(set(fa), {(3, 3, 4), (4, 4, 5), (5, 4, 6)})
class SameNamedSchemaTableTest(fixtures.TablesTest):
"""tests for #7471"""
__sparse_driver_backend__ = True
__requires__ = ("schemas",)
@classmethod
def define_tables(cls, metadata):
Table(
"some_table",
metadata,
Column("id", Integer, primary_key=True),
schema=config.test_schema,
)
Table(
"some_table",
metadata,
Column("id", Integer, primary_key=True),
Column(
"some_table_id",
Integer,
# ForeignKey("%s.some_table.id" % config.test_schema),
nullable=False,
),
)
@classmethod
def insert_data(cls, connection):
some_table, some_table_schema = cls.tables(
"some_table", "%s.some_table" % config.test_schema
)
connection.execute(some_table_schema.insert(), {"id": 1})
connection.execute(some_table.insert(), {"id": 1, "some_table_id": 1})
def test_simple_join_both_tables(self, connection):
some_table, some_table_schema = self.tables(
"some_table", "%s.some_table" % config.test_schema
)
eq_(
connection.execute(
select(some_table, some_table_schema).join_from(
some_table,
some_table_schema,
some_table.c.some_table_id == some_table_schema.c.id,
)
).first(),
(1, 1, 1),
)
def test_simple_join_whereclause_only(self, connection):
some_table, some_table_schema = self.tables(
"some_table", "%s.some_table" % config.test_schema
)
eq_(
connection.execute(
select(some_table)
.join_from(
some_table,
some_table_schema,
some_table.c.some_table_id == some_table_schema.c.id,
)
.where(some_table.c.id == 1)
).first(),
(1, 1),
)
def test_subquery(self, connection):
some_table, some_table_schema = self.tables(
"some_table", "%s.some_table" % config.test_schema
)
subq = (
select(some_table)
.join_from(
some_table,
some_table_schema,
some_table.c.some_table_id == some_table_schema.c.id,
)
.where(some_table.c.id == 1)
.subquery()
)
eq_(
connection.execute(
select(some_table, subq.c.id)
.join_from(
some_table,
subq,
some_table.c.some_table_id == subq.c.id,
)
.where(some_table.c.id == 1)
).first(),
(1, 1, 1),
)
class JoinTest(fixtures.TablesTest):
__sparse_driver_backend__ = True
def _assert_result(self, select, result, params=()):
with config.db.connect() as conn:
eq_(conn.execute(select, params).fetchall(), result)
@classmethod
def define_tables(cls, metadata):
Table("a", metadata, Column("id", Integer, primary_key=True))
Table(
"b",
metadata,
Column("id", Integer, primary_key=True),
Column("a_id", ForeignKey("a.id"), nullable=False),
)
@classmethod
def insert_data(cls, connection):
connection.execute(
cls.tables.a.insert(),
[{"id": 1}, {"id": 2}, {"id": 3}, {"id": 4}, {"id": 5}],
)
connection.execute(
cls.tables.b.insert(),
[
{"id": 1, "a_id": 1},
{"id": 2, "a_id": 1},
{"id": 4, "a_id": 2},
{"id": 5, "a_id": 3},
],
)
def test_inner_join_fk(self):
a, b = self.tables("a", "b")
stmt = select(a, b).select_from(a.join(b)).order_by(a.c.id, b.c.id)
self._assert_result(stmt, [(1, 1, 1), (1, 2, 1), (2, 4, 2), (3, 5, 3)])
def test_inner_join_true(self):
a, b = self.tables("a", "b")
stmt = (
select(a, b)
.select_from(a.join(b, true()))
.order_by(a.c.id, b.c.id)
)
self._assert_result(
stmt,
[
(a, b, c)
for (a,), (b, c) in itertools.product(
[(1,), (2,), (3,), (4,), (5,)],
[(1, 1), (2, 1), (4, 2), (5, 3)],
)
],
)
def test_inner_join_false(self):
a, b = self.tables("a", "b")
stmt = (
select(a, b)
.select_from(a.join(b, false()))
.order_by(a.c.id, b.c.id)
)
self._assert_result(stmt, [])
def test_outer_join_false(self):
a, b = self.tables("a", "b")
stmt = (
select(a, b)
.select_from(a.outerjoin(b, false()))
.order_by(a.c.id, b.c.id)
)
self._assert_result(
stmt,
[
(1, None, None),
(2, None, None),
(3, None, None),
(4, None, None),
(5, None, None),
],
)
def test_outer_join_fk(self):
a, b = self.tables("a", "b")
stmt = select(a, b).select_from(a.join(b)).order_by(a.c.id, b.c.id)
self._assert_result(stmt, [(1, 1, 1), (1, 2, 1), (2, 4, 2), (3, 5, 3)])
class CompoundSelectTest(fixtures.TablesTest):
__sparse_driver_backend__ = True
@classmethod
def define_tables(cls, metadata):
Table(
"some_table",
metadata,
Column("id", Integer, primary_key=True),
Column("x", Integer),
Column("y", Integer),
)
@classmethod
def insert_data(cls, connection):
connection.execute(
cls.tables.some_table.insert(),
[
{"id": 1, "x": 1, "y": 2},
{"id": 2, "x": 2, "y": 3},
{"id": 3, "x": 3, "y": 4},
{"id": 4, "x": 4, "y": 5},
],
)
def _assert_result(self, select, result, params=()):
with config.db.connect() as conn:
eq_(conn.execute(select, params).fetchall(), result)
def test_plain_union(self):
table = self.tables.some_table
s1 = select(table).where(table.c.id == 2)
s2 = select(table).where(table.c.id == 3)
u1 = union(s1, s2)
self._assert_result(
u1.order_by(u1.selected_columns.id), [(2, 2, 3), (3, 3, 4)]
)
def test_select_from_plain_union(self):
table = self.tables.some_table
s1 = select(table).where(table.c.id == 2)
s2 = select(table).where(table.c.id == 3)
u1 = union(s1, s2).alias().select()
self._assert_result(
u1.order_by(u1.selected_columns.id), [(2, 2, 3), (3, 3, 4)]
)
@testing.requires.order_by_col_from_union
@testing.requires.parens_in_union_contained_select_w_limit_offset
def test_limit_offset_selectable_in_unions(self):
table = self.tables.some_table
s1 = select(table).where(table.c.id == 2).limit(1).order_by(table.c.id)
s2 = select(table).where(table.c.id == 3).limit(1).order_by(table.c.id)
u1 = union(s1, s2).limit(2)
self._assert_result(
u1.order_by(u1.selected_columns.id), [(2, 2, 3), (3, 3, 4)]
)
@testing.requires.parens_in_union_contained_select_wo_limit_offset
def test_order_by_selectable_in_unions(self):
table = self.tables.some_table
s1 = select(table).where(table.c.id == 2).order_by(table.c.id)
s2 = select(table).where(table.c.id == 3).order_by(table.c.id)
u1 = union(s1, s2).limit(2)
self._assert_result(
u1.order_by(u1.selected_columns.id), [(2, 2, 3), (3, 3, 4)]
)
def test_distinct_selectable_in_unions(self):
table = self.tables.some_table
s1 = select(table).where(table.c.id == 2).distinct()
s2 = select(table).where(table.c.id == 3).distinct()
u1 = union(s1, s2).limit(2)
self._assert_result(
u1.order_by(u1.selected_columns.id), [(2, 2, 3), (3, 3, 4)]
)
@testing.requires.parens_in_union_contained_select_w_limit_offset
def test_limit_offset_in_unions_from_alias(self):
table = self.tables.some_table
s1 = select(table).where(table.c.id == 2).limit(1).order_by(table.c.id)
s2 = select(table).where(table.c.id == 3).limit(1).order_by(table.c.id)
# this necessarily has double parens
u1 = union(s1, s2).alias()
self._assert_result(
u1.select().limit(2).order_by(u1.c.id), [(2, 2, 3), (3, 3, 4)]
)
def test_limit_offset_aliased_selectable_in_unions(self):
table = self.tables.some_table
s1 = (
select(table)
.where(table.c.id == 2)
.limit(1)
.order_by(table.c.id)
.alias()
.select()
)
s2 = (
select(table)
.where(table.c.id == 3)
.limit(1)
.order_by(table.c.id)
.alias()
.select()
)
u1 = union(s1, s2).limit(2)
self._assert_result(
u1.order_by(u1.selected_columns.id), [(2, 2, 3), (3, 3, 4)]
)
class PostCompileParamsTest(
AssertsExecutionResults, AssertsCompiledSQL, fixtures.TablesTest
):
__backend__ = True
__requires__ = ("standard_cursor_sql",)
@classmethod
def define_tables(cls, metadata):
Table(
"some_table",
metadata,
Column("id", Integer, primary_key=True),
Column("x", Integer),
Column("y", Integer),
Column("z", String(50)),
)
@classmethod
def insert_data(cls, connection):
connection.execute(
cls.tables.some_table.insert(),
[
{"id": 1, "x": 1, "y": 2, "z": "z1"},
{"id": 2, "x": 2, "y": 3, "z": "z2"},
{"id": 3, "x": 3, "y": 4, "z": "z3"},
{"id": 4, "x": 4, "y": 5, "z": "z4"},
],
)
def test_compile(self):
table = self.tables.some_table
stmt = select(table.c.id).where(
table.c.x == bindparam("q", literal_execute=True)
)
self.assert_compile(
stmt,
"SELECT some_table.id FROM some_table "
"WHERE some_table.x = __[POSTCOMPILE_q]",
{},
)
def test_compile_literal_binds(self):
table = self.tables.some_table
stmt = select(table.c.id).where(
table.c.x == bindparam("q", 10, literal_execute=True)
)
self.assert_compile(
stmt,
"SELECT some_table.id FROM some_table WHERE some_table.x = 10",
{},
literal_binds=True,
)
def test_execute(self):
table = self.tables.some_table
stmt = select(table.c.id).where(
table.c.x == bindparam("q", literal_execute=True)
)
with self.sql_execution_asserter() as asserter:
with config.db.connect() as conn:
conn.execute(stmt, dict(q=10))
asserter.assert_(
CursorSQL(
"SELECT some_table.id \nFROM some_table "
"\nWHERE some_table.x = 10",
() if config.db.dialect.positional else {},
)
)
def test_execute_expanding_plus_literal_execute(self):
table = self.tables.some_table
stmt = select(table.c.id).where(
table.c.x.in_(bindparam("q", expanding=True, literal_execute=True))
)
with self.sql_execution_asserter() as asserter:
with config.db.connect() as conn:
conn.execute(stmt, dict(q=[5, 6, 7]))
asserter.assert_(
CursorSQL(
"SELECT some_table.id \nFROM some_table "
"\nWHERE some_table.x IN (5, 6, 7)",
() if config.db.dialect.positional else {},
)
)
@testing.requires.tuple_in
def test_execute_tuple_expanding_plus_literal_execute(self):
table = self.tables.some_table
stmt = select(table.c.id).where(
tuple_(table.c.x, table.c.y).in_(
bindparam("q", expanding=True, literal_execute=True)
)
)
with self.sql_execution_asserter() as asserter:
with config.db.connect() as conn:
conn.execute(stmt, dict(q=[(5, 10), (12, 18)]))
asserter.assert_(
CursorSQL(
"SELECT some_table.id \nFROM some_table "
"\nWHERE (some_table.x, some_table.y) "
"IN (%s(5, 10), (12, 18))"
% ("VALUES " if config.db.dialect.tuple_in_values else ""),
() if config.db.dialect.positional else {},
)
)
@testing.requires.tuple_in
def test_execute_tuple_expanding_plus_literal_heterogeneous_execute(self):
table = self.tables.some_table
stmt = select(table.c.id).where(
tuple_(table.c.x, table.c.z).in_(
bindparam("q", expanding=True, literal_execute=True)
)
)
with self.sql_execution_asserter() as asserter:
with config.db.connect() as conn:
conn.execute(stmt, dict(q=[(5, "z1"), (12, "z3")]))
asserter.assert_(
CursorSQL(
"SELECT some_table.id \nFROM some_table "
"\nWHERE (some_table.x, some_table.z) "
"IN (%s(5, 'z1'), (12, 'z3'))"
% ("VALUES " if config.db.dialect.tuple_in_values else ""),
() if config.db.dialect.positional else {},
)
)
class ExpandingBoundInTest(fixtures.TablesTest):
__backend__ = True
@classmethod
def define_tables(cls, metadata):
Table(
"some_table",
metadata,
Column("id", Integer, primary_key=True),
Column("x", Integer),
Column("y", Integer),
Column("z", String(50)),
)
@classmethod
def insert_data(cls, connection):
connection.execute(
cls.tables.some_table.insert(),
[
{"id": 1, "x": 1, "y": 2, "z": "z1"},
{"id": 2, "x": 2, "y": 3, "z": "z2"},
{"id": 3, "x": 3, "y": 4, "z": "z3"},
{"id": 4, "x": 4, "y": 5, "z": "z4"},
],
)
def _assert_result(self, select, result, params=()):
with config.db.connect() as conn:
eq_(conn.execute(select, params).fetchall(), result)
def test_multiple_empty_sets_bindparam(self):
# test that any anonymous aliasing used by the dialect
# is fine with duplicates
table = self.tables.some_table
stmt = (
select(table.c.id)
.where(table.c.x.in_(bindparam("q")))
.where(table.c.y.in_(bindparam("p")))
.order_by(table.c.id)
)
self._assert_result(stmt, [], params={"q": [], "p": []})
def test_multiple_empty_sets_direct(self):
# test that any anonymous aliasing used by the dialect
# is fine with duplicates
table = self.tables.some_table
stmt = (
select(table.c.id)
.where(table.c.x.in_([]))
.where(table.c.y.in_([]))
.order_by(table.c.id)
)
self._assert_result(stmt, [])
@testing.requires.tuple_in_w_empty
def test_empty_heterogeneous_tuples_bindparam(self):
table = self.tables.some_table
stmt = (
select(table.c.id)
.where(tuple_(table.c.x, table.c.z).in_(bindparam("q")))
.order_by(table.c.id)
)
self._assert_result(stmt, [], params={"q": []})
@testing.requires.tuple_in_w_empty
def test_empty_heterogeneous_tuples_direct(self):
table = self.tables.some_table
def go(val, expected):
stmt = (
select(table.c.id)
.where(tuple_(table.c.x, table.c.z).in_(val))
.order_by(table.c.id)
)
self._assert_result(stmt, expected)
go([], [])
go([(2, "z2"), (3, "z3"), (4, "z4")], [(2,), (3,), (4,)])
go([], [])
@testing.requires.tuple_in_w_empty
def test_empty_homogeneous_tuples_bindparam(self):
table = self.tables.some_table
stmt = (
select(table.c.id)
.where(tuple_(table.c.x, table.c.y).in_(bindparam("q")))
.order_by(table.c.id)
)
self._assert_result(stmt, [], params={"q": []})
@testing.requires.tuple_in_w_empty
def test_empty_homogeneous_tuples_direct(self):
table = self.tables.some_table
def go(val, expected):
stmt = (
select(table.c.id)
.where(tuple_(table.c.x, table.c.y).in_(val))
.order_by(table.c.id)
)
self._assert_result(stmt, expected)
go([], [])
go([(1, 2), (2, 3), (3, 4)], [(1,), (2,), (3,)])
go([], [])
def test_bound_in_scalar_bindparam(self):
table = self.tables.some_table
stmt = (
select(table.c.id)
.where(table.c.x.in_(bindparam("q")))
.order_by(table.c.id)
)
self._assert_result(stmt, [(2,), (3,), (4,)], params={"q": [2, 3, 4]})
def test_bound_in_scalar_direct(self):
table = self.tables.some_table
stmt = (
select(table.c.id)
.where(table.c.x.in_([2, 3, 4]))
.order_by(table.c.id)
)
self._assert_result(stmt, [(2,), (3,), (4,)])
def test_nonempty_in_plus_empty_notin(self):
table = self.tables.some_table
stmt = (
select(table.c.id)
.where(table.c.x.in_([2, 3]))
.where(table.c.id.not_in([]))
.order_by(table.c.id)
)
self._assert_result(stmt, [(2,), (3,)])
def test_empty_in_plus_notempty_notin(self):
table = self.tables.some_table
stmt = (
select(table.c.id)
.where(table.c.x.in_([]))
.where(table.c.id.not_in([2, 3]))
.order_by(table.c.id)
)
self._assert_result(stmt, [])
def test_typed_str_in(self):
"""test related to #7292.
as a type is given to the bound param, there is no ambiguity
to the type of element.
"""
stmt = text(
"select id FROM some_table WHERE z IN :q ORDER BY id"
).bindparams(bindparam("q", type_=String, expanding=True))
self._assert_result(
stmt,
[(2,), (3,), (4,)],
params={"q": ["z2", "z3", "z4"]},
)
def test_untyped_str_in(self):
"""test related to #7292.
for untyped expression, we look at the types of elements.
Test for Sequence to detect tuple in. but not strings or bytes!
as always....
"""
stmt = text(
"select id FROM some_table WHERE z IN :q ORDER BY id"
).bindparams(bindparam("q", expanding=True))
self._assert_result(
stmt,
[(2,), (3,), (4,)],
params={"q": ["z2", "z3", "z4"]},
)
@testing.requires.tuple_in
def test_bound_in_two_tuple_bindparam(self):
table = self.tables.some_table
stmt = (
select(table.c.id)
.where(tuple_(table.c.x, table.c.y).in_(bindparam("q")))
.order_by(table.c.id)
)
self._assert_result(
stmt, [(2,), (3,), (4,)], params={"q": [(2, 3), (3, 4), (4, 5)]}
)
@testing.requires.tuple_in
def test_bound_in_two_tuple_direct(self):
table = self.tables.some_table
stmt = (
select(table.c.id)
.where(tuple_(table.c.x, table.c.y).in_([(2, 3), (3, 4), (4, 5)]))
.order_by(table.c.id)
)
self._assert_result(stmt, [(2,), (3,), (4,)])
@testing.requires.tuple_in
def test_bound_in_heterogeneous_two_tuple_bindparam(self):
table = self.tables.some_table
stmt = (
select(table.c.id)
.where(tuple_(table.c.x, table.c.z).in_(bindparam("q")))
.order_by(table.c.id)
)
self._assert_result(
stmt,
[(2,), (3,), (4,)],
params={"q": [(2, "z2"), (3, "z3"), (4, "z4")]},
)
@testing.requires.tuple_in
def test_bound_in_heterogeneous_two_tuple_direct(self):
table = self.tables.some_table
stmt = (
select(table.c.id)
.where(
tuple_(table.c.x, table.c.z).in_(
[(2, "z2"), (3, "z3"), (4, "z4")]
)
)
.order_by(table.c.id)
)
self._assert_result(
stmt,
[(2,), (3,), (4,)],
)
@testing.requires.tuple_in
def test_bound_in_heterogeneous_two_tuple_text_bindparam(self):
# note this becomes ARRAY if we dont use expanding
# explicitly right now
stmt = text(
"select id FROM some_table WHERE (x, z) IN :q ORDER BY id"
).bindparams(bindparam("q", expanding=True))
self._assert_result(
stmt,
[(2,), (3,), (4,)],
params={"q": [(2, "z2"), (3, "z3"), (4, "z4")]},
)
@testing.requires.tuple_in
def test_bound_in_heterogeneous_two_tuple_typed_bindparam_non_tuple(self):
class LikeATuple(collections_abc.Sequence):
def __init__(self, *data):
self._data = data
def __iter__(self):
return iter(self._data)
def __getitem__(self, idx):
return self._data[idx]
def __len__(self):
return len(self._data)
stmt = text(
"select id FROM some_table WHERE (x, z) IN :q ORDER BY id"
).bindparams(
bindparam(
"q", type_=TupleType(Integer(), String()), expanding=True
)
)
self._assert_result(
stmt,
[(2,), (3,), (4,)],
params={
"q": [
LikeATuple(2, "z2"),
LikeATuple(3, "z3"),
LikeATuple(4, "z4"),
]
},
)
@testing.requires.tuple_in
def test_bound_in_heterogeneous_two_tuple_text_bindparam_non_tuple(self):
# note this becomes ARRAY if we dont use expanding
# explicitly right now
class LikeATuple(collections_abc.Sequence):
def __init__(self, *data):
self._data = data
def __iter__(self):
return iter(self._data)
def __getitem__(self, idx):
return self._data[idx]
def __len__(self):
return len(self._data)
stmt = text(
"select id FROM some_table WHERE (x, z) IN :q ORDER BY id"
).bindparams(bindparam("q", expanding=True))
self._assert_result(
stmt,
[(2,), (3,), (4,)],
params={
"q": [
LikeATuple(2, "z2"),
LikeATuple(3, "z3"),
LikeATuple(4, "z4"),
]
},
)
def test_empty_set_against_integer_bindparam(self):
table = self.tables.some_table
stmt = (
select(table.c.id)
.where(table.c.x.in_(bindparam("q")))
.order_by(table.c.id)
)
self._assert_result(stmt, [], params={"q": []})
def test_empty_set_against_integer_direct(self):
table = self.tables.some_table
stmt = select(table.c.id).where(table.c.x.in_([])).order_by(table.c.id)
self._assert_result(stmt, [])
def test_empty_set_against_integer_negation_bindparam(self):
table = self.tables.some_table
stmt = (
select(table.c.id)
.where(table.c.x.not_in(bindparam("q")))
.order_by(table.c.id)
)
self._assert_result(stmt, [(1,), (2,), (3,), (4,)], params={"q": []})
def test_empty_set_against_integer_negation_direct(self):
table = self.tables.some_table
stmt = (
select(table.c.id).where(table.c.x.not_in([])).order_by(table.c.id)
)
self._assert_result(stmt, [(1,), (2,), (3,), (4,)])
def test_empty_set_against_string_bindparam(self):
table = self.tables.some_table
stmt = (
select(table.c.id)
.where(table.c.z.in_(bindparam("q")))
.order_by(table.c.id)
)
self._assert_result(stmt, [], params={"q": []})
def test_empty_set_against_string_direct(self):
table = self.tables.some_table
stmt = select(table.c.id).where(table.c.z.in_([])).order_by(table.c.id)
self._assert_result(stmt, [])
def test_empty_set_against_string_negation_bindparam(self):
table = self.tables.some_table
stmt = (
select(table.c.id)
.where(table.c.z.not_in(bindparam("q")))
.order_by(table.c.id)
)
self._assert_result(stmt, [(1,), (2,), (3,), (4,)], params={"q": []})
def test_empty_set_against_string_negation_direct(self):
table = self.tables.some_table
stmt = (
select(table.c.id).where(table.c.z.not_in([])).order_by(table.c.id)
)
self._assert_result(stmt, [(1,), (2,), (3,), (4,)])
def test_null_in_empty_set_is_false_bindparam(self, connection):
stmt = select(
case(
(
null().in_(bindparam("foo", value=())),
true(),
),
else_=false(),
)
)
in_(connection.execute(stmt).fetchone()[0], (False, 0))
def test_null_in_empty_set_is_false_direct(self, connection):
stmt = select(
case(
(
null().in_([]),
true(),
),
else_=false(),
)
)
in_(connection.execute(stmt).fetchone()[0], (False, 0))
class LikeFunctionsTest(fixtures.TablesTest):
__sparse_driver_backend__ = True
run_inserts = "once"
run_deletes = None
@classmethod
def define_tables(cls, metadata):
Table(
"some_table",
metadata,
Column("id", Integer, primary_key=True),
Column("data", String(50)),
)
@classmethod
def insert_data(cls, connection):
connection.execute(
cls.tables.some_table.insert(),
[
{"id": 1, "data": "abcdefg"},
{"id": 2, "data": "ab/cdefg"},
{"id": 3, "data": "ab%cdefg"},
{"id": 4, "data": "ab_cdefg"},
{"id": 5, "data": "abcde/fg"},
{"id": 6, "data": "abcde%fg"},
{"id": 7, "data": "ab#cdefg"},
{"id": 8, "data": "ab9cdefg"},
{"id": 9, "data": "abcde#fg"},
{"id": 10, "data": "abcd9fg"},
{"id": 11, "data": None},
],
)
def _test(self, expr, expected):
some_table = self.tables.some_table
with config.db.connect() as conn:
rows = {
value
for value, in conn.execute(select(some_table.c.id).where(expr))
}
eq_(rows, expected)
def test_startswith_unescaped(self):
col = self.tables.some_table.c.data
self._test(col.startswith("ab%c"), {1, 2, 3, 4, 5, 6, 7, 8, 9, 10})
@testing.requires.like_escapes
def test_startswith_autoescape(self):
col = self.tables.some_table.c.data
self._test(col.startswith("ab%c", autoescape=True), {3})
def test_startswith_sqlexpr(self):
col = self.tables.some_table.c.data
self._test(
col.startswith(literal_column("'ab%c'")),
{1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
)
@testing.requires.like_escapes
def test_startswith_escape(self):
col = self.tables.some_table.c.data
self._test(col.startswith("ab##c", escape="#"), {7})
@testing.requires.like_escapes
def test_startswith_autoescape_escape(self):
col = self.tables.some_table.c.data
self._test(col.startswith("ab%c", autoescape=True, escape="#"), {3})
self._test(col.startswith("ab#c", autoescape=True, escape="#"), {7})
def test_endswith_unescaped(self):
col = self.tables.some_table.c.data
self._test(col.endswith("e%fg"), {1, 2, 3, 4, 5, 6, 7, 8, 9})
def test_endswith_sqlexpr(self):
col = self.tables.some_table.c.data
self._test(
col.endswith(literal_column("'e%fg'")), {1, 2, 3, 4, 5, 6, 7, 8, 9}
)
@testing.requires.like_escapes
def test_endswith_autoescape(self):
col = self.tables.some_table.c.data
self._test(col.endswith("e%fg", autoescape=True), {6})
@testing.requires.like_escapes
def test_endswith_escape(self):
col = self.tables.some_table.c.data
self._test(col.endswith("e##fg", escape="#"), {9})
@testing.requires.like_escapes
def test_endswith_autoescape_escape(self):
col = self.tables.some_table.c.data
self._test(col.endswith("e%fg", autoescape=True, escape="#"), {6})
self._test(col.endswith("e#fg", autoescape=True, escape="#"), {9})
def test_contains_unescaped(self):
col = self.tables.some_table.c.data
self._test(col.contains("b%cde"), {1, 2, 3, 4, 5, 6, 7, 8, 9})
@testing.requires.like_escapes
def test_contains_autoescape(self):
col = self.tables.some_table.c.data
self._test(col.contains("b%cde", autoescape=True), {3})
@testing.requires.like_escapes
def test_contains_escape(self):
col = self.tables.some_table.c.data
self._test(col.contains("b##cde", escape="#"), {7})
@testing.requires.like_escapes
def test_contains_autoescape_escape(self):
col = self.tables.some_table.c.data
self._test(col.contains("b%cd", autoescape=True, escape="#"), {3})
self._test(col.contains("b#cd", autoescape=True, escape="#"), {7})
@testing.requires.regexp_match
def test_not_regexp_match(self):
col = self.tables.some_table.c.data
self._test(~col.regexp_match("a.cde"), {2, 3, 4, 7, 8, 10})
@testing.requires.regexp_replace
def test_regexp_replace(self):
col = self.tables.some_table.c.data
self._test(
col.regexp_replace("a.cde", "FOO").contains("FOO"), {1, 5, 6, 9}
)
@testing.requires.regexp_match
@testing.combinations(
("a.cde", {1, 5, 6, 9}),
("abc", {1, 5, 6, 9, 10}),
("^abc", {1, 5, 6, 9, 10}),
("9cde", {8}),
("^a", set(range(1, 11))),
("(b|c)", set(range(1, 11))),
("^(b|c)", set()),
)
def test_regexp_match(self, text, expected):
col = self.tables.some_table.c.data
self._test(col.regexp_match(text), expected)
class ComputedColumnTest(fixtures.TablesTest):
__sparse_driver_backend__ = True
__requires__ = ("computed_columns",)
@classmethod
def define_tables(cls, metadata):
Table(
"square",
metadata,
Column("id", Integer, primary_key=True),
Column("side", Integer),
Column("area", Integer, Computed("side * side")),
Column("perimeter", Integer, Computed("4 * side")),
)
@classmethod
def insert_data(cls, connection):
connection.execute(
cls.tables.square.insert(),
[{"id": 1, "side": 10}, {"id": 10, "side": 42}],
)
def test_select_all(self):
with config.db.connect() as conn:
res = conn.execute(
select(text("*"))
.select_from(self.tables.square)
.order_by(self.tables.square.c.id)
).fetchall()
eq_(res, [(1, 10, 100, 40), (10, 42, 1764, 168)])
def test_select_columns(self):
with config.db.connect() as conn:
res = conn.execute(
select(
self.tables.square.c.area, self.tables.square.c.perimeter
)
.select_from(self.tables.square)
.order_by(self.tables.square.c.id)
).fetchall()
eq_(res, [(100, 40), (1764, 168)])
class IdentityColumnTest(fixtures.TablesTest):
__backend__ = True
__requires__ = ("identity_columns",)
run_inserts = "once"
run_deletes = "once"
@classmethod
def define_tables(cls, metadata):
Table(
"tbl_a",
metadata,
Column(
"id",
Integer,
Identity(
always=True, start=42, nominvalue=True, nomaxvalue=True
),
primary_key=True,
),
Column("desc", String(100)),
)
Table(
"tbl_b",
metadata,
Column(
"id",
Integer,
Identity(increment=-5, start=0, minvalue=-1000, maxvalue=0),
primary_key=True,
),
Column("desc", String(100)),
)
@classmethod
def insert_data(cls, connection):
connection.execute(
cls.tables.tbl_a.insert(),
[{"desc": "a"}, {"desc": "b"}],
)
connection.execute(
cls.tables.tbl_b.insert(),
[{"desc": "a"}, {"desc": "b"}],
)
connection.execute(
cls.tables.tbl_b.insert(),
[{"id": 42, "desc": "c"}],
)
def test_select_all(self, connection):
res = connection.execute(
select(text("*"))
.select_from(self.tables.tbl_a)
.order_by(self.tables.tbl_a.c.id)
).fetchall()
eq_(res, [(42, "a"), (43, "b")])
res = connection.execute(
select(text("*"))
.select_from(self.tables.tbl_b)
.order_by(self.tables.tbl_b.c.id)
).fetchall()
eq_(res, [(-5, "b"), (0, "a"), (42, "c")])
def test_select_columns(self, connection):
res = connection.execute(
select(self.tables.tbl_a.c.id).order_by(self.tables.tbl_a.c.id)
).fetchall()
eq_(res, [(42,), (43,)])
@testing.requires.identity_columns_standard
def test_insert_always_error(self, connection):
def fn():
connection.execute(
self.tables.tbl_a.insert(),
[{"id": 200, "desc": "a"}],
)
assert_raises((DatabaseError, ProgrammingError), fn)
class IdentityAutoincrementTest(fixtures.TablesTest):
__backend__ = True
__requires__ = ("autoincrement_without_sequence",)
@classmethod
def define_tables(cls, metadata):
Table(
"tbl",
metadata,
Column(
"id",
Integer,
Identity(),
primary_key=True,
autoincrement=True,
),
Column("desc", String(100)),
)
def test_autoincrement_with_identity(self, connection):
connection.execute(self.tables.tbl.insert(), {"desc": "row"})
res = connection.execute(self.tables.tbl.select()).first()
eq_(res, (1, "row"))
class ExistsTest(fixtures.TablesTest):
__sparse_driver_backend__ = True
@classmethod
def define_tables(cls, metadata):
Table(
"stuff",
metadata,
Column("id", Integer, primary_key=True),
Column("data", String(50)),
)
@classmethod
def insert_data(cls, connection):
connection.execute(
cls.tables.stuff.insert(),
[
{"id": 1, "data": "some data"},
{"id": 2, "data": "some data"},
{"id": 3, "data": "some data"},
{"id": 4, "data": "some other data"},
],
)
def test_select_exists(self, connection):
stuff = self.tables.stuff
eq_(
connection.execute(
select(literal(1)).where(
exists().where(stuff.c.data == "some data")
)
).fetchall(),
[(1,)],
)
def test_select_exists_false(self, connection):
stuff = self.tables.stuff
eq_(
connection.execute(
select(literal(1)).where(
exists().where(stuff.c.data == "no data")
)
).fetchall(),
[],
)
class DistinctOnTest(AssertsCompiledSQL, fixtures.TablesTest):
__sparse_driver_backend__ = True
@testing.fails_if(testing.requires.supports_distinct_on)
def test_distinct_on(self):
stm = select("*").distinct(column("q")).select_from(table("foo"))
with testing.expect_deprecated(
"DISTINCT ON is currently supported only by the PostgreSQL "
):
self.assert_compile(stm, "SELECT DISTINCT * FROM foo")
class IsOrIsNotDistinctFromTest(fixtures.TablesTest):
__sparse_driver_backend__ = True
__requires__ = ("supports_is_distinct_from",)
@classmethod
def define_tables(cls, metadata):
Table(
"is_distinct_test",
metadata,
Column("id", Integer, primary_key=True),
Column("col_a", Integer, nullable=True),
Column("col_b", Integer, nullable=True),
)
@testing.combinations(
("both_int_different", 0, 1, 1),
("both_int_same", 1, 1, 0),
("one_null_first", None, 1, 1),
("one_null_second", 0, None, 1),
("both_null", None, None, 0),
id_="iaaa",
argnames="col_a_value, col_b_value, expected_row_count_for_is",
)
def test_is_or_is_not_distinct_from(
self, col_a_value, col_b_value, expected_row_count_for_is, connection
):
tbl = self.tables.is_distinct_test
connection.execute(
tbl.insert(),
[{"id": 1, "col_a": col_a_value, "col_b": col_b_value}],
)
result = connection.execute(
tbl.select().where(tbl.c.col_a.is_distinct_from(tbl.c.col_b))
).fetchall()
eq_(
len(result),
expected_row_count_for_is,
)
expected_row_count_for_is_not = (
1 if expected_row_count_for_is == 0 else 0
)
result = connection.execute(
tbl.select().where(tbl.c.col_a.is_not_distinct_from(tbl.c.col_b))
).fetchall()
eq_(
len(result),
expected_row_count_for_is_not,
)
class WindowFunctionTest(fixtures.TablesTest):
__requires__ = ("window_functions",)
__sparse_driver_backend__ = True
@classmethod
def define_tables(cls, metadata):
Table(
"some_table",
metadata,
Column("id", Integer, primary_key=True),
Column("col1", Integer),
Column("col2", Integer),
)
@classmethod
def insert_data(cls, connection):
connection.execute(
cls.tables.some_table.insert(),
[{"id": i, "col1": i, "col2": i * 5} for i in range(1, 50)],
)
def test_window(self, connection):
some_table = self.tables.some_table
rows = connection.execute(
select(
func.max(some_table.c.col2).over(
order_by=[some_table.c.col1.desc()]
)
).where(some_table.c.col1 < 20)
).all()
eq_(rows, [(95,) for i in range(19)])
def test_window_rows_between(self, connection):
some_table = self.tables.some_table
# note the rows are part of the cache key right now, not handled
# as binds. this is issue #11515
rows = connection.execute(
select(
func.max(some_table.c.col2).over(
order_by=[some_table.c.col1],
rows=(-5, 0),
)
)
).all()
eq_(rows, [(i,) for i in range(5, 250, 5)])
class BitwiseTest(fixtures.TablesTest):
__backend__ = True
run_inserts = run_deletes = "once"
inserted_data = [{"a": i, "b": i + 1} for i in range(10)]
@classmethod
def define_tables(cls, metadata):
Table("bitwise", metadata, Column("a", Integer), Column("b", Integer))
@classmethod
def insert_data(cls, connection):
connection.execute(cls.tables.bitwise.insert(), cls.inserted_data)
@testing.combinations(
(
lambda a: a.bitwise_xor(5),
[i for i in range(10) if i != 5],
testing.requires.supports_bitwise_xor,
),
(
lambda a: a.bitwise_or(1),
list(range(10)),
testing.requires.supports_bitwise_or,
),
(
lambda a: a.bitwise_and(4),
list(range(4, 8)),
testing.requires.supports_bitwise_and,
),
(
lambda a: (a - 2).bitwise_not(),
[0],
testing.requires.supports_bitwise_not,
),
(
lambda a: a.bitwise_lshift(1),
list(range(1, 10)),
testing.requires.supports_bitwise_shift,
),
(
lambda a: a.bitwise_rshift(2),
list(range(4, 10)),
testing.requires.supports_bitwise_shift,
),
argnames="case, expected",
)
def test_bitwise(self, case, expected, connection):
tbl = self.tables.bitwise
a = tbl.c.a
op = testing.resolve_lambda(case, a=a)
stmt = select(tbl).where(op > 0).order_by(a)
res = connection.execute(stmt).mappings().all()
eq_(res, [self.inserted_data[i] for i in expected])