ai_v/venv/Lib/site-packages/yarl/_parse.py

204 lines
7.0 KiB
Python
Raw Normal View History

feat(api): 实现图像生成及后台同步功能 - 新增图像生成接口,支持试用、积分和自定义API Key模式 - 实现生成图片结果异步上传至MinIO存储,带重试机制 - 优化积分预扣除和异常退还逻辑,保障用户积分准确 - 添加获取生成历史记录接口,支持时间范围和分页 - 提供本地字典配置接口,支持模型、比例、提示模板和尺寸 - 实现图片批量上传接口,支持S3兼容对象存储 feat(admin): 增加管理员角色管理与权限分配接口 - 实现角色列表查询、角色创建、更新及删除功能 - 增加权限列表查询接口 - 实现用户角色分配接口,便于统一管理用户权限 - 增加系统字典增删查改接口,支持分类过滤和排序 - 权限控制全面覆盖管理接口,保证安全访问 feat(auth): 完善用户登录注册及权限相关接口与页面 - 实现手机号验证码发送及校验功能,保障注册安全 - 支持手机号注册、登录及退出接口,集成日志记录 - 增加修改密码功能,验证原密码后更新 - 提供动态导航菜单接口,基于权限展示不同菜单 - 实现管理界面路由及日志、角色、字典管理页面访问权限控制 - 添加系统日志查询接口,支持关键词和等级筛选 feat(app): 初始化Flask应用并配置蓝图与数据库 - 创建应用程序工厂,加载配置,初始化数据库和Redis客户端 - 注册认证、API及管理员蓝图,整合路由 - 根路由渲染主页模板 - 应用上下文中自动创建数据库表,保证运行环境准备完毕 feat(database): 提供数据库创建与迁移支持脚本 - 新增数据库创建脚本,支持自动检测是否已存在 - 添加数据库表初始化脚本,支持创建和删除所有表 - 实现RBAC权限初始化,包含基础权限和角色创建 - 新增字段手动修复脚本,添加用户API Key和积分字段 - 强制迁移脚本支持清理连接和修复表结构,初始化默认数据及角色分配 feat(config): 新增系统配置参数 - 配置数据库、Redis、Session和MinIO相关参数 - 添加AI接口地址及试用Key配置 - 集成阿里云短信服务配置及开发模式相关参数 feat(extensions): 初始化数据库、Redis和MinIO客户端 - 创建全局SQLAlchemy数据库实例和Redis客户端 - 配置基于boto3的MinIO兼容S3客户端 chore(logs): 添加示例系统日志文件 - 记录用户请求、验证码发送成功与失败的日志信息
2026-01-12 00:53:31 +08:00
"""URL parsing utilities."""
import re
import unicodedata
from functools import lru_cache
from typing import Union
from urllib.parse import scheme_chars, uses_netloc
from ._quoters import QUOTER, UNQUOTER_PLUS
# Leading and trailing C0 control and space to be stripped per WHATWG spec.
# == "".join([chr(i) for i in range(0, 0x20 + 1)])
WHATWG_C0_CONTROL_OR_SPACE = (
"\x00\x01\x02\x03\x04\x05\x06\x07\x08\t\n\x0b\x0c\r\x0e\x0f\x10"
"\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f "
)
# Unsafe bytes to be removed per WHATWG spec
UNSAFE_URL_BYTES_TO_REMOVE = ["\t", "\r", "\n"]
USES_AUTHORITY = frozenset(uses_netloc)
SplitURLType = tuple[str, str, str, str, str]
def split_url(url: str) -> SplitURLType:
"""Split URL into parts."""
# Adapted from urllib.parse.urlsplit
# Only lstrip url as some applications rely on preserving trailing space.
# (https://url.spec.whatwg.org/#concept-basic-url-parser would strip both)
url = url.lstrip(WHATWG_C0_CONTROL_OR_SPACE)
for b in UNSAFE_URL_BYTES_TO_REMOVE:
if b in url:
url = url.replace(b, "")
scheme = netloc = query = fragment = ""
i = url.find(":")
if i > 0 and url[0] in scheme_chars:
for c in url[1:i]:
if c not in scheme_chars:
break
else:
scheme, url = url[:i].lower(), url[i + 1 :]
has_hash = "#" in url
has_question_mark = "?" in url
if url[:2] == "//":
delim = len(url) # position of end of domain part of url, default is end
if has_hash and has_question_mark:
delim_chars = "/?#"
elif has_question_mark:
delim_chars = "/?"
elif has_hash:
delim_chars = "/#"
else:
delim_chars = "/"
for c in delim_chars: # look for delimiters; the order is NOT important
wdelim = url.find(c, 2) # find first of this delim
if wdelim >= 0 and wdelim < delim: # if found
delim = wdelim # use earliest delim position
netloc = url[2:delim]
url = url[delim:]
has_left_bracket = "[" in netloc
has_right_bracket = "]" in netloc
if (has_left_bracket and not has_right_bracket) or (
has_right_bracket and not has_left_bracket
):
raise ValueError("Invalid IPv6 URL")
if has_left_bracket:
bracketed_host = netloc.partition("[")[2].partition("]")[0]
# Valid bracketed hosts are defined in
# https://www.rfc-editor.org/rfc/rfc3986#page-49
# https://url.spec.whatwg.org/
if bracketed_host and bracketed_host[0] == "v":
if not re.match(r"\Av[a-fA-F0-9]+\..+\Z", bracketed_host):
raise ValueError("IPvFuture address is invalid")
elif ":" not in bracketed_host:
raise ValueError("The IPv6 content between brackets is not valid")
if has_hash:
url, _, fragment = url.partition("#")
if has_question_mark:
url, _, query = url.partition("?")
if netloc and not netloc.isascii():
_check_netloc(netloc)
return scheme, netloc, url, query, fragment
def _check_netloc(netloc: str) -> None:
# Adapted from urllib.parse._checknetloc
# looking for characters like \u2100 that expand to 'a/c'
# IDNA uses NFKC equivalence, so normalize for this check
# ignore characters already included
# but not the surrounding text
n = netloc.replace("@", "").replace(":", "").replace("#", "").replace("?", "")
normalized_netloc = unicodedata.normalize("NFKC", n)
if n == normalized_netloc:
return
# Note that there are no unicode decompositions for the character '@' so
# its currently impossible to have test coverage for this branch, however if the
# one should be added in the future we want to make sure its still checked.
for c in "/?#@:": # pragma: no branch
if c in normalized_netloc:
raise ValueError(
f"netloc '{netloc}' contains invalid "
"characters under NFKC normalization"
)
@lru_cache # match the same size as urlsplit
def split_netloc(
netloc: str,
) -> tuple[Union[str, None], Union[str, None], Union[str, None], Union[int, None]]:
"""Split netloc into username, password, host and port."""
if "@" not in netloc:
username: Union[str, None] = None
password: Union[str, None] = None
hostinfo = netloc
else:
userinfo, _, hostinfo = netloc.rpartition("@")
username, have_password, password = userinfo.partition(":")
if not have_password:
password = None
if "[" in hostinfo:
_, _, bracketed = hostinfo.partition("[")
hostname, _, port_str = bracketed.partition("]")
_, _, port_str = port_str.partition(":")
else:
hostname, _, port_str = hostinfo.partition(":")
if not port_str:
return username or None, password, hostname or None, None
try:
port = int(port_str)
except ValueError:
raise ValueError("Invalid URL: port can't be converted to integer")
if not (0 <= port <= 65535):
raise ValueError("Port out of range 0-65535")
return username or None, password, hostname or None, port
def unsplit_result(
scheme: str, netloc: str, url: str, query: str, fragment: str
) -> str:
"""Unsplit a URL without any normalization."""
if netloc or (scheme and scheme in USES_AUTHORITY) or url[:2] == "//":
if url and url[:1] != "/":
url = f"{scheme}://{netloc}/{url}" if scheme else f"{scheme}:{url}"
else:
url = f"{scheme}://{netloc}{url}" if scheme else f"//{netloc}{url}"
elif scheme:
url = f"{scheme}:{url}"
if query:
url = f"{url}?{query}"
return f"{url}#{fragment}" if fragment else url
@lru_cache # match the same size as urlsplit
def make_netloc(
user: Union[str, None],
password: Union[str, None],
host: Union[str, None],
port: Union[int, None],
encode: bool = False,
) -> str:
"""Make netloc from parts.
The user and password are encoded if encode is True.
The host must already be encoded with _encode_host.
"""
if host is None:
return ""
ret = host
if port is not None:
ret = f"{ret}:{port}"
if user is None and password is None:
return ret
if password is not None:
if not user:
user = ""
elif encode:
user = QUOTER(user)
if encode:
password = QUOTER(password)
user = f"{user}:{password}"
elif user and encode:
user = QUOTER(user)
return f"{user}@{ret}" if user else ret
def query_to_pairs(query_string: str) -> list[tuple[str, str]]:
"""Parse a query given as a string argument.
Works like urllib.parse.parse_qsl with keep empty values.
"""
pairs: list[tuple[str, str]] = []
if not query_string:
return pairs
for k_v in query_string.split("&"):
k, _, v = k_v.partition("=")
pairs.append((UNQUOTER_PLUS(k), UNQUOTER_PLUS(v)))
return pairs