ai_v/venv/Lib/site-packages/sqlalchemy/sql/traversals.py

1025 lines
33 KiB
Python
Raw Normal View History

feat(api): 实现图像生成及后台同步功能 - 新增图像生成接口,支持试用、积分和自定义API Key模式 - 实现生成图片结果异步上传至MinIO存储,带重试机制 - 优化积分预扣除和异常退还逻辑,保障用户积分准确 - 添加获取生成历史记录接口,支持时间范围和分页 - 提供本地字典配置接口,支持模型、比例、提示模板和尺寸 - 实现图片批量上传接口,支持S3兼容对象存储 feat(admin): 增加管理员角色管理与权限分配接口 - 实现角色列表查询、角色创建、更新及删除功能 - 增加权限列表查询接口 - 实现用户角色分配接口,便于统一管理用户权限 - 增加系统字典增删查改接口,支持分类过滤和排序 - 权限控制全面覆盖管理接口,保证安全访问 feat(auth): 完善用户登录注册及权限相关接口与页面 - 实现手机号验证码发送及校验功能,保障注册安全 - 支持手机号注册、登录及退出接口,集成日志记录 - 增加修改密码功能,验证原密码后更新 - 提供动态导航菜单接口,基于权限展示不同菜单 - 实现管理界面路由及日志、角色、字典管理页面访问权限控制 - 添加系统日志查询接口,支持关键词和等级筛选 feat(app): 初始化Flask应用并配置蓝图与数据库 - 创建应用程序工厂,加载配置,初始化数据库和Redis客户端 - 注册认证、API及管理员蓝图,整合路由 - 根路由渲染主页模板 - 应用上下文中自动创建数据库表,保证运行环境准备完毕 feat(database): 提供数据库创建与迁移支持脚本 - 新增数据库创建脚本,支持自动检测是否已存在 - 添加数据库表初始化脚本,支持创建和删除所有表 - 实现RBAC权限初始化,包含基础权限和角色创建 - 新增字段手动修复脚本,添加用户API Key和积分字段 - 强制迁移脚本支持清理连接和修复表结构,初始化默认数据及角色分配 feat(config): 新增系统配置参数 - 配置数据库、Redis、Session和MinIO相关参数 - 添加AI接口地址及试用Key配置 - 集成阿里云短信服务配置及开发模式相关参数 feat(extensions): 初始化数据库、Redis和MinIO客户端 - 创建全局SQLAlchemy数据库实例和Redis客户端 - 配置基于boto3的MinIO兼容S3客户端 chore(logs): 添加示例系统日志文件 - 记录用户请求、验证码发送成功与失败的日志信息
2026-01-12 00:53:31 +08:00
# sql/traversals.py
# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
# <see AUTHORS file>
#
# This module is part of SQLAlchemy and is released under
# the MIT License: https://www.opensource.org/licenses/mit-license.php
# mypy: allow-untyped-defs, allow-untyped-calls
from __future__ import annotations
from collections import deque
import collections.abc as collections_abc
import itertools
from itertools import zip_longest
import operator
import typing
from typing import Any
from typing import Callable
from typing import Deque
from typing import Dict
from typing import Iterable
from typing import Optional
from typing import Set
from typing import Tuple
from typing import Type
from . import operators
from .cache_key import HasCacheKey
from .visitors import _TraverseInternalsType
from .visitors import anon_map
from .visitors import ExternallyTraversible
from .visitors import HasTraversalDispatch
from .visitors import HasTraverseInternals
from .. import util
from ..util import langhelpers
from ..util.typing import Self
SKIP_TRAVERSE = util.symbol("skip_traverse")
COMPARE_FAILED = False
COMPARE_SUCCEEDED = True
def compare(obj1: Any, obj2: Any, **kw: Any) -> bool:
strategy: TraversalComparatorStrategy
if kw.get("use_proxies", False):
strategy = ColIdentityComparatorStrategy()
else:
strategy = TraversalComparatorStrategy()
return strategy.compare(obj1, obj2, **kw)
def _preconfigure_traversals(target_hierarchy: Type[Any]) -> None:
for cls in util.walk_subclasses(target_hierarchy):
if hasattr(cls, "_generate_cache_attrs") and hasattr(
cls, "_traverse_internals"
):
cls._generate_cache_attrs()
_copy_internals.generate_dispatch(
cls,
cls._traverse_internals,
"_generated_copy_internals_traversal",
)
_get_children.generate_dispatch(
cls,
cls._traverse_internals,
"_generated_get_children_traversal",
)
class HasShallowCopy(HasTraverseInternals):
"""attribute-wide operations that are useful for classes that use
__slots__ and therefore can't operate on their attributes in a dictionary.
"""
__slots__ = ()
if typing.TYPE_CHECKING:
def _generated_shallow_copy_traversal(self, other: Self) -> None: ...
def _generated_shallow_from_dict_traversal(
self, d: Dict[str, Any]
) -> None: ...
def _generated_shallow_to_dict_traversal(self) -> Dict[str, Any]: ...
@classmethod
def _generate_shallow_copy(
cls,
internal_dispatch: _TraverseInternalsType,
method_name: str,
) -> Callable[[Self, Self], None]:
code = "\n".join(
f" other.{attrname} = self.{attrname}"
for attrname, _ in internal_dispatch
)
meth_text = f"def {method_name}(self, other):\n{code}\n"
return langhelpers._exec_code_in_env(meth_text, {}, method_name)
@classmethod
def _generate_shallow_to_dict(
cls,
internal_dispatch: _TraverseInternalsType,
method_name: str,
) -> Callable[[Self], Dict[str, Any]]:
code = ",\n".join(
f" '{attrname}': self.{attrname}"
for attrname, _ in internal_dispatch
)
meth_text = f"def {method_name}(self):\n return {{{code}}}\n"
return langhelpers._exec_code_in_env(meth_text, {}, method_name)
@classmethod
def _generate_shallow_from_dict(
cls,
internal_dispatch: _TraverseInternalsType,
method_name: str,
) -> Callable[[Self, Dict[str, Any]], None]:
code = "\n".join(
f" self.{attrname} = d['{attrname}']"
for attrname, _ in internal_dispatch
)
meth_text = f"def {method_name}(self, d):\n{code}\n"
return langhelpers._exec_code_in_env(meth_text, {}, method_name)
def _shallow_from_dict(self, d: Dict[str, Any]) -> None:
cls = self.__class__
shallow_from_dict: Callable[[HasShallowCopy, Dict[str, Any]], None]
try:
shallow_from_dict = cls.__dict__[
"_generated_shallow_from_dict_traversal"
]
except KeyError:
shallow_from_dict = self._generate_shallow_from_dict(
cls._traverse_internals,
"_generated_shallow_from_dict_traversal",
)
cls._generated_shallow_from_dict_traversal = shallow_from_dict # type: ignore # noqa: E501
shallow_from_dict(self, d)
def _shallow_to_dict(self) -> Dict[str, Any]:
cls = self.__class__
shallow_to_dict: Callable[[HasShallowCopy], Dict[str, Any]]
try:
shallow_to_dict = cls.__dict__[
"_generated_shallow_to_dict_traversal"
]
except KeyError:
shallow_to_dict = self._generate_shallow_to_dict(
cls._traverse_internals, "_generated_shallow_to_dict_traversal"
)
cls._generated_shallow_to_dict_traversal = shallow_to_dict # type: ignore # noqa: E501
return shallow_to_dict(self)
def _shallow_copy_to(self, other: Self) -> None:
cls = self.__class__
shallow_copy: Callable[[Self, Self], None]
try:
shallow_copy = cls.__dict__["_generated_shallow_copy_traversal"]
except KeyError:
shallow_copy = self._generate_shallow_copy(
cls._traverse_internals, "_generated_shallow_copy_traversal"
)
cls._generated_shallow_copy_traversal = shallow_copy # type: ignore # noqa: E501
shallow_copy(self, other)
def _clone(self, **kw: Any) -> Self:
"""Create a shallow copy"""
c = self.__class__.__new__(self.__class__)
self._shallow_copy_to(c)
return c
class GenerativeOnTraversal(HasShallowCopy):
"""Supplies Generative behavior but making use of traversals to shallow
copy.
.. seealso::
:class:`sqlalchemy.sql.base.Generative`
"""
__slots__ = ()
def _generate(self) -> Self:
cls = self.__class__
s = cls.__new__(cls)
self._shallow_copy_to(s)
return s
def _clone(element, **kw):
return element._clone()
class HasCopyInternals(HasTraverseInternals):
__slots__ = ()
def _clone(self, **kw):
raise NotImplementedError()
def _copy_internals(
self, *, omit_attrs: Iterable[str] = (), **kw: Any
) -> None:
"""Reassign internal elements to be clones of themselves.
Called during a copy-and-traverse operation on newly
shallow-copied elements to create a deep copy.
The given clone function should be used, which may be applying
additional transformations to the element (i.e. replacement
traversal, cloned traversal, annotations).
"""
try:
traverse_internals = self._traverse_internals
except AttributeError:
# user-defined classes may not have a _traverse_internals
return
for attrname, obj, meth in _copy_internals.run_generated_dispatch(
self, traverse_internals, "_generated_copy_internals_traversal"
):
if attrname in omit_attrs:
continue
if obj is not None:
result = meth(attrname, self, obj, **kw)
if result is not None:
setattr(self, attrname, result)
class _CopyInternalsTraversal(HasTraversalDispatch):
"""Generate a _copy_internals internal traversal dispatch for classes
with a _traverse_internals collection."""
def visit_clauseelement(
self, attrname, parent, element, clone=_clone, **kw
):
return clone(element, **kw)
def visit_clauseelement_list(
self, attrname, parent, element, clone=_clone, **kw
):
return [clone(clause, **kw) for clause in element]
def visit_clauseelement_tuple(
self, attrname, parent, element, clone=_clone, **kw
):
return tuple([clone(clause, **kw) for clause in element])
def visit_executable_options(
self, attrname, parent, element, clone=_clone, **kw
):
return tuple([clone(clause, **kw) for clause in element])
def visit_clauseelement_unordered_set(
self, attrname, parent, element, clone=_clone, **kw
):
return {clone(clause, **kw) for clause in element}
def visit_clauseelement_tuples(
self, attrname, parent, element, clone=_clone, **kw
):
return [
tuple(clone(tup_elem, **kw) for tup_elem in elem)
for elem in element
]
def visit_string_clauseelement_dict(
self, attrname, parent, element, clone=_clone, **kw
):
return {key: clone(value, **kw) for key, value in element.items()}
def visit_setup_join_tuple(
self, attrname, parent, element, clone=_clone, **kw
):
return tuple(
(
clone(target, **kw) if target is not None else None,
clone(onclause, **kw) if onclause is not None else None,
clone(from_, **kw) if from_ is not None else None,
flags,
)
for (target, onclause, from_, flags) in element
)
def visit_memoized_select_entities(self, attrname, parent, element, **kw):
return self.visit_clauseelement_tuple(attrname, parent, element, **kw)
def visit_dml_ordered_values(
self, attrname, parent, element, clone=_clone, **kw
):
# sequence of 2-tuples
return [
(
(
clone(key, **kw)
if hasattr(key, "__clause_element__")
else key
),
clone(value, **kw),
)
for key, value in element
]
def visit_dml_values(self, attrname, parent, element, clone=_clone, **kw):
return {
(
clone(key, **kw) if hasattr(key, "__clause_element__") else key
): clone(value, **kw)
for key, value in element.items()
}
def visit_dml_multi_values(
self, attrname, parent, element, clone=_clone, **kw
):
# sequence of sequences, each sequence contains a list/dict/tuple
def copy(elem):
if isinstance(elem, (list, tuple)):
return [
(
clone(value, **kw)
if hasattr(value, "__clause_element__")
else value
)
for value in elem
]
elif isinstance(elem, dict):
return {
(
clone(key, **kw)
if hasattr(key, "__clause_element__")
else key
): (
clone(value, **kw)
if hasattr(value, "__clause_element__")
else value
)
for key, value in elem.items()
}
else:
# TODO: use abc classes
assert False
return [
[copy(sub_element) for sub_element in sequence]
for sequence in element
]
def visit_propagate_attrs(
self, attrname, parent, element, clone=_clone, **kw
):
return element
_copy_internals = _CopyInternalsTraversal()
def _flatten_clauseelement(element):
while hasattr(element, "__clause_element__") and not getattr(
element, "is_clause_element", False
):
element = element.__clause_element__()
return element
class _GetChildrenTraversal(HasTraversalDispatch):
"""Generate a _children_traversal internal traversal dispatch for classes
with a _traverse_internals collection."""
def visit_has_cache_key(self, element, **kw):
# the GetChildren traversal refers explicitly to ClauseElement
# structures. Within these, a plain HasCacheKey is not a
# ClauseElement, so don't include these.
return ()
def visit_clauseelement(self, element, **kw):
return (element,)
def visit_clauseelement_list(self, element, **kw):
return element
def visit_clauseelement_tuple(self, element, **kw):
return element
def visit_clauseelement_tuples(self, element, **kw):
return itertools.chain.from_iterable(element)
def visit_fromclause_canonical_column_collection(self, element, **kw):
return ()
def visit_string_clauseelement_dict(self, element, **kw):
return element.values()
def visit_fromclause_ordered_set(self, element, **kw):
return element
def visit_clauseelement_unordered_set(self, element, **kw):
return element
def visit_setup_join_tuple(self, element, **kw):
for target, onclause, from_, flags in element:
if from_ is not None:
yield from_
if not isinstance(target, str):
yield _flatten_clauseelement(target)
if onclause is not None and not isinstance(onclause, str):
yield _flatten_clauseelement(onclause)
def visit_memoized_select_entities(self, element, **kw):
return self.visit_clauseelement_tuple(element, **kw)
def visit_dml_ordered_values(self, element, **kw):
for k, v in element:
if hasattr(k, "__clause_element__"):
yield k
yield v
def visit_dml_values(self, element, **kw):
expr_values = {k for k in element if hasattr(k, "__clause_element__")}
str_values = expr_values.symmetric_difference(element)
for k in sorted(str_values):
yield element[k]
for k in expr_values:
yield k
yield element[k]
def visit_dml_multi_values(self, element, **kw):
return ()
def visit_propagate_attrs(self, element, **kw):
return ()
_get_children = _GetChildrenTraversal()
@util.preload_module("sqlalchemy.sql.elements")
def _resolve_name_for_compare(element, name, anon_map, **kw):
if isinstance(name, util.preloaded.sql_elements._anonymous_label):
name = name.apply_map(anon_map)
return name
class TraversalComparatorStrategy(HasTraversalDispatch, util.MemoizedSlots):
__slots__ = "stack", "cache", "anon_map"
def __init__(self):
self.stack: Deque[
Tuple[
Optional[ExternallyTraversible],
Optional[ExternallyTraversible],
]
] = deque()
self.cache = set()
def _memoized_attr_anon_map(self):
return (anon_map(), anon_map())
def compare(
self,
obj1: ExternallyTraversible,
obj2: ExternallyTraversible,
**kw: Any,
) -> bool:
stack = self.stack
cache = self.cache
compare_annotations = kw.get("compare_annotations", False)
stack.append((obj1, obj2))
while stack:
left, right = stack.popleft()
if left is right:
continue
elif left is None or right is None:
# we know they are different so no match
return False
elif (left, right) in cache:
continue
cache.add((left, right))
visit_name = left.__visit_name__
if visit_name != right.__visit_name__:
return False
meth = getattr(self, "compare_%s" % visit_name, None)
if meth:
attributes_compared = meth(left, right, **kw)
if attributes_compared is COMPARE_FAILED:
return False
elif attributes_compared is SKIP_TRAVERSE:
continue
# attributes_compared is returned as a list of attribute
# names that were "handled" by the comparison method above.
# remaining attribute names in the _traverse_internals
# will be compared.
else:
attributes_compared = ()
for (
(left_attrname, left_visit_sym),
(right_attrname, right_visit_sym),
) in zip_longest(
left._traverse_internals,
right._traverse_internals,
fillvalue=(None, None),
):
if not compare_annotations and (
(left_attrname == "_annotations")
or (right_attrname == "_annotations")
):
continue
if (
left_attrname != right_attrname
or left_visit_sym is not right_visit_sym
):
return False
elif left_attrname in attributes_compared:
continue
assert left_visit_sym is not None
assert left_attrname is not None
assert right_attrname is not None
dispatch = self.dispatch(left_visit_sym)
assert dispatch is not None, (
f"{self.__class__} has no dispatch for "
f"'{self._dispatch_lookup[left_visit_sym]}'"
)
left_child = operator.attrgetter(left_attrname)(left)
right_child = operator.attrgetter(right_attrname)(right)
if left_child is None:
if right_child is not None:
return False
else:
continue
elif right_child is None:
return False
comparison = dispatch(
left_attrname, left, left_child, right, right_child, **kw
)
if comparison is COMPARE_FAILED:
return False
return True
def compare_inner(self, obj1, obj2, **kw):
comparator = self.__class__()
return comparator.compare(obj1, obj2, **kw)
def visit_has_cache_key(
self, attrname, left_parent, left, right_parent, right, **kw
):
if left._gen_cache_key(self.anon_map[0], []) != right._gen_cache_key(
self.anon_map[1], []
):
return COMPARE_FAILED
def visit_propagate_attrs(
self, attrname, left_parent, left, right_parent, right, **kw
):
return self.compare_inner(
left.get("plugin_subject", None), right.get("plugin_subject", None)
)
def visit_has_cache_key_list(
self, attrname, left_parent, left, right_parent, right, **kw
):
for l, r in zip_longest(left, right, fillvalue=None):
if l is None:
if r is not None:
return COMPARE_FAILED
else:
continue
elif r is None:
return COMPARE_FAILED
if l._gen_cache_key(self.anon_map[0], []) != r._gen_cache_key(
self.anon_map[1], []
):
return COMPARE_FAILED
def visit_executable_options(
self, attrname, left_parent, left, right_parent, right, **kw
):
for l, r in zip_longest(left, right, fillvalue=None):
if l is None:
if r is not None:
return COMPARE_FAILED
else:
continue
elif r is None:
return COMPARE_FAILED
if (
l._gen_cache_key(self.anon_map[0], [])
if l._is_has_cache_key
else l
) != (
r._gen_cache_key(self.anon_map[1], [])
if r._is_has_cache_key
else r
):
return COMPARE_FAILED
def visit_clauseelement(
self, attrname, left_parent, left, right_parent, right, **kw
):
self.stack.append((left, right))
def visit_fromclause_canonical_column_collection(
self, attrname, left_parent, left, right_parent, right, **kw
):
for lcol, rcol in zip_longest(left, right, fillvalue=None):
self.stack.append((lcol, rcol))
def visit_fromclause_derived_column_collection(
self, attrname, left_parent, left, right_parent, right, **kw
):
pass
def visit_string_clauseelement_dict(
self, attrname, left_parent, left, right_parent, right, **kw
):
for lstr, rstr in zip_longest(
sorted(left), sorted(right), fillvalue=None
):
if lstr != rstr:
return COMPARE_FAILED
self.stack.append((left[lstr], right[rstr]))
def visit_clauseelement_tuples(
self, attrname, left_parent, left, right_parent, right, **kw
):
for ltup, rtup in zip_longest(left, right, fillvalue=None):
if ltup is None or rtup is None:
return COMPARE_FAILED
for l, r in zip_longest(ltup, rtup, fillvalue=None):
self.stack.append((l, r))
def visit_clauseelement_list(
self, attrname, left_parent, left, right_parent, right, **kw
):
for l, r in zip_longest(left, right, fillvalue=None):
self.stack.append((l, r))
def visit_clauseelement_tuple(
self, attrname, left_parent, left, right_parent, right, **kw
):
for l, r in zip_longest(left, right, fillvalue=None):
self.stack.append((l, r))
def _compare_unordered_sequences(self, seq1, seq2, **kw):
if seq1 is None:
return seq2 is None
completed: Set[object] = set()
for clause in seq1:
for other_clause in set(seq2).difference(completed):
if self.compare_inner(clause, other_clause, **kw):
completed.add(other_clause)
break
return len(completed) == len(seq1) == len(seq2)
def visit_clauseelement_unordered_set(
self, attrname, left_parent, left, right_parent, right, **kw
):
return self._compare_unordered_sequences(left, right, **kw)
def visit_fromclause_ordered_set(
self, attrname, left_parent, left, right_parent, right, **kw
):
for l, r in zip_longest(left, right, fillvalue=None):
self.stack.append((l, r))
def visit_string(
self, attrname, left_parent, left, right_parent, right, **kw
):
return left == right
def visit_string_list(
self, attrname, left_parent, left, right_parent, right, **kw
):
return left == right
def visit_string_multi_dict(
self, attrname, left_parent, left, right_parent, right, **kw
):
for lk, rk in zip_longest(
sorted(left.keys()), sorted(right.keys()), fillvalue=(None, None)
):
if lk != rk:
return COMPARE_FAILED
lv, rv = left[lk], right[rk]
lhc = isinstance(left, HasCacheKey)
rhc = isinstance(right, HasCacheKey)
if lhc and rhc:
if lv._gen_cache_key(
self.anon_map[0], []
) != rv._gen_cache_key(self.anon_map[1], []):
return COMPARE_FAILED
elif lhc != rhc:
return COMPARE_FAILED
elif lv != rv:
return COMPARE_FAILED
def visit_multi(
self, attrname, left_parent, left, right_parent, right, **kw
):
lhc = isinstance(left, HasCacheKey)
rhc = isinstance(right, HasCacheKey)
if lhc and rhc:
if left._gen_cache_key(
self.anon_map[0], []
) != right._gen_cache_key(self.anon_map[1], []):
return COMPARE_FAILED
elif lhc != rhc:
return COMPARE_FAILED
else:
return left == right
def visit_anon_name(
self, attrname, left_parent, left, right_parent, right, **kw
):
return _resolve_name_for_compare(
left_parent, left, self.anon_map[0], **kw
) == _resolve_name_for_compare(
right_parent, right, self.anon_map[1], **kw
)
def visit_boolean(
self, attrname, left_parent, left, right_parent, right, **kw
):
return left == right
def visit_operator(
self, attrname, left_parent, left, right_parent, right, **kw
):
return left == right
def visit_type(
self, attrname, left_parent, left, right_parent, right, **kw
):
return left._compare_type_affinity(right)
def visit_plain_dict(
self, attrname, left_parent, left, right_parent, right, **kw
):
return left == right
def visit_dialect_options(
self, attrname, left_parent, left, right_parent, right, **kw
):
return left == right
def visit_annotations_key(
self, attrname, left_parent, left, right_parent, right, **kw
):
if left and right:
return (
left_parent._annotations_cache_key
== right_parent._annotations_cache_key
)
else:
return left == right
def visit_with_context_options(
self, attrname, left_parent, left, right_parent, right, **kw
):
return tuple((fn.__code__, c_key) for fn, c_key in left) == tuple(
(fn.__code__, c_key) for fn, c_key in right
)
def visit_plain_obj(
self, attrname, left_parent, left, right_parent, right, **kw
):
return left == right
def visit_named_ddl_element(
self, attrname, left_parent, left, right_parent, right, **kw
):
if left is None:
if right is not None:
return COMPARE_FAILED
return left.name == right.name
def visit_prefix_sequence(
self, attrname, left_parent, left, right_parent, right, **kw
):
for (l_clause, l_str), (r_clause, r_str) in zip_longest(
left, right, fillvalue=(None, None)
):
if l_str != r_str:
return COMPARE_FAILED
else:
self.stack.append((l_clause, r_clause))
def visit_setup_join_tuple(
self, attrname, left_parent, left, right_parent, right, **kw
):
# TODO: look at attrname for "legacy_join" and use different structure
for (
(l_target, l_onclause, l_from, l_flags),
(r_target, r_onclause, r_from, r_flags),
) in zip_longest(left, right, fillvalue=(None, None, None, None)):
if l_flags != r_flags:
return COMPARE_FAILED
self.stack.append((l_target, r_target))
self.stack.append((l_onclause, r_onclause))
self.stack.append((l_from, r_from))
def visit_memoized_select_entities(
self, attrname, left_parent, left, right_parent, right, **kw
):
return self.visit_clauseelement_tuple(
attrname, left_parent, left, right_parent, right, **kw
)
def visit_table_hint_list(
self, attrname, left_parent, left, right_parent, right, **kw
):
left_keys = sorted(left, key=lambda elem: (elem[0].fullname, elem[1]))
right_keys = sorted(
right, key=lambda elem: (elem[0].fullname, elem[1])
)
for (ltable, ldialect), (rtable, rdialect) in zip_longest(
left_keys, right_keys, fillvalue=(None, None)
):
if ldialect != rdialect:
return COMPARE_FAILED
elif left[(ltable, ldialect)] != right[(rtable, rdialect)]:
return COMPARE_FAILED
else:
self.stack.append((ltable, rtable))
def visit_statement_hint_list(
self, attrname, left_parent, left, right_parent, right, **kw
):
return left == right
def visit_unknown_structure(
self, attrname, left_parent, left, right_parent, right, **kw
):
raise NotImplementedError()
def visit_dml_ordered_values(
self, attrname, left_parent, left, right_parent, right, **kw
):
# sequence of tuple pairs
for (lk, lv), (rk, rv) in zip_longest(
left, right, fillvalue=(None, None)
):
if not self._compare_dml_values_or_ce(lk, rk, **kw):
return COMPARE_FAILED
def _compare_dml_values_or_ce(self, lv, rv, **kw):
lvce = hasattr(lv, "__clause_element__")
rvce = hasattr(rv, "__clause_element__")
if lvce != rvce:
return False
elif lvce and not self.compare_inner(lv, rv, **kw):
return False
elif not lvce and lv != rv:
return False
elif not self.compare_inner(lv, rv, **kw):
return False
return True
def visit_dml_values(
self, attrname, left_parent, left, right_parent, right, **kw
):
if left is None or right is None or len(left) != len(right):
return COMPARE_FAILED
if isinstance(left, collections_abc.Sequence):
for lv, rv in zip(left, right):
if not self._compare_dml_values_or_ce(lv, rv, **kw):
return COMPARE_FAILED
elif isinstance(right, collections_abc.Sequence):
return COMPARE_FAILED
else:
# dictionaries guaranteed to support insert ordering in
# py37 so that we can compare the keys in order. without
# this, we can't compare SQL expression keys because we don't
# know which key is which
for (lk, lv), (rk, rv) in zip(left.items(), right.items()):
if not self._compare_dml_values_or_ce(lk, rk, **kw):
return COMPARE_FAILED
if not self._compare_dml_values_or_ce(lv, rv, **kw):
return COMPARE_FAILED
def visit_dml_multi_values(
self, attrname, left_parent, left, right_parent, right, **kw
):
for lseq, rseq in zip_longest(left, right, fillvalue=None):
if lseq is None or rseq is None:
return COMPARE_FAILED
for ld, rd in zip_longest(lseq, rseq, fillvalue=None):
if (
self.visit_dml_values(
attrname, left_parent, ld, right_parent, rd, **kw
)
is COMPARE_FAILED
):
return COMPARE_FAILED
def compare_expression_clauselist(self, left, right, **kw):
if left.operator is right.operator:
if operators.is_associative(left.operator):
if self._compare_unordered_sequences(
left.clauses, right.clauses, **kw
):
return ["operator", "clauses"]
else:
return COMPARE_FAILED
else:
return ["operator"]
else:
return COMPARE_FAILED
def compare_clauselist(self, left, right, **kw):
return self.compare_expression_clauselist(left, right, **kw)
def compare_binary(self, left, right, **kw):
if left.operator == right.operator:
if operators.is_commutative(left.operator):
if (
self.compare_inner(left.left, right.left, **kw)
and self.compare_inner(left.right, right.right, **kw)
) or (
self.compare_inner(left.left, right.right, **kw)
and self.compare_inner(left.right, right.left, **kw)
):
return ["operator", "negate", "left", "right"]
else:
return COMPARE_FAILED
else:
return ["operator", "negate"]
else:
return COMPARE_FAILED
def compare_bindparam(self, left, right, **kw):
compare_keys = kw.pop("compare_keys", True)
compare_values = kw.pop("compare_values", True)
if compare_values:
omit = []
else:
# this means, "skip these, we already compared"
omit = ["callable", "value"]
if not compare_keys:
omit.append("key")
return omit
class ColIdentityComparatorStrategy(TraversalComparatorStrategy):
def compare_column_element(
self, left, right, use_proxies=True, equivalents=(), **kw
):
"""Compare ColumnElements using proxies and equivalent collections.
This is a comparison strategy specific to the ORM.
"""
to_compare = (right,)
if equivalents and right in equivalents:
to_compare = equivalents[right].union(to_compare)
for oth in to_compare:
if use_proxies and left.shares_lineage(oth):
return SKIP_TRAVERSE
elif hash(left) == hash(right):
return SKIP_TRAVERSE
else:
return COMPARE_FAILED
def compare_column(self, left, right, **kw):
return self.compare_column_element(left, right, **kw)
def compare_label(self, left, right, **kw):
return self.compare_column_element(left, right, **kw)
def compare_table(self, left, right, **kw):
# tables compare on identity, since it's not really feasible to
# compare them column by column with the above rules
return SKIP_TRAVERSE if left is right else COMPARE_FAILED