- 新增图像生成接口,支持试用、积分和自定义API Key模式 - 实现生成图片结果异步上传至MinIO存储,带重试机制 - 优化积分预扣除和异常退还逻辑,保障用户积分准确 - 添加获取生成历史记录接口,支持时间范围和分页 - 提供本地字典配置接口,支持模型、比例、提示模板和尺寸 - 实现图片批量上传接口,支持S3兼容对象存储 feat(admin): 增加管理员角色管理与权限分配接口 - 实现角色列表查询、角色创建、更新及删除功能 - 增加权限列表查询接口 - 实现用户角色分配接口,便于统一管理用户权限 - 增加系统字典增删查改接口,支持分类过滤和排序 - 权限控制全面覆盖管理接口,保证安全访问 feat(auth): 完善用户登录注册及权限相关接口与页面 - 实现手机号验证码发送及校验功能,保障注册安全 - 支持手机号注册、登录及退出接口,集成日志记录 - 增加修改密码功能,验证原密码后更新 - 提供动态导航菜单接口,基于权限展示不同菜单 - 实现管理界面路由及日志、角色、字典管理页面访问权限控制 - 添加系统日志查询接口,支持关键词和等级筛选 feat(app): 初始化Flask应用并配置蓝图与数据库 - 创建应用程序工厂,加载配置,初始化数据库和Redis客户端 - 注册认证、API及管理员蓝图,整合路由 - 根路由渲染主页模板 - 应用上下文中自动创建数据库表,保证运行环境准备完毕 feat(database): 提供数据库创建与迁移支持脚本 - 新增数据库创建脚本,支持自动检测是否已存在 - 添加数据库表初始化脚本,支持创建和删除所有表 - 实现RBAC权限初始化,包含基础权限和角色创建 - 新增字段手动修复脚本,添加用户API Key和积分字段 - 强制迁移脚本支持清理连接和修复表结构,初始化默认数据及角色分配 feat(config): 新增系统配置参数 - 配置数据库、Redis、Session和MinIO相关参数 - 添加AI接口地址及试用Key配置 - 集成阿里云短信服务配置及开发模式相关参数 feat(extensions): 初始化数据库、Redis和MinIO客户端 - 创建全局SQLAlchemy数据库实例和Redis客户端 - 配置基于boto3的MinIO兼容S3客户端 chore(logs): 添加示例系统日志文件 - 记录用户请求、验证码发送成功与失败的日志信息
361 lines
12 KiB
Python
361 lines
12 KiB
Python
from __future__ import annotations
|
|
|
|
from encodings.aliases import aliases
|
|
from hashlib import sha256
|
|
from json import dumps
|
|
from re import sub
|
|
from typing import Any, Iterator, List, Tuple
|
|
|
|
from .constant import RE_POSSIBLE_ENCODING_INDICATION, TOO_BIG_SEQUENCE
|
|
from .utils import iana_name, is_multi_byte_encoding, unicode_range
|
|
|
|
|
|
class CharsetMatch:
|
|
def __init__(
|
|
self,
|
|
payload: bytes,
|
|
guessed_encoding: str,
|
|
mean_mess_ratio: float,
|
|
has_sig_or_bom: bool,
|
|
languages: CoherenceMatches,
|
|
decoded_payload: str | None = None,
|
|
preemptive_declaration: str | None = None,
|
|
):
|
|
self._payload: bytes = payload
|
|
|
|
self._encoding: str = guessed_encoding
|
|
self._mean_mess_ratio: float = mean_mess_ratio
|
|
self._languages: CoherenceMatches = languages
|
|
self._has_sig_or_bom: bool = has_sig_or_bom
|
|
self._unicode_ranges: list[str] | None = None
|
|
|
|
self._leaves: list[CharsetMatch] = []
|
|
self._mean_coherence_ratio: float = 0.0
|
|
|
|
self._output_payload: bytes | None = None
|
|
self._output_encoding: str | None = None
|
|
|
|
self._string: str | None = decoded_payload
|
|
|
|
self._preemptive_declaration: str | None = preemptive_declaration
|
|
|
|
def __eq__(self, other: object) -> bool:
|
|
if not isinstance(other, CharsetMatch):
|
|
if isinstance(other, str):
|
|
return iana_name(other) == self.encoding
|
|
return False
|
|
return self.encoding == other.encoding and self.fingerprint == other.fingerprint
|
|
|
|
def __lt__(self, other: object) -> bool:
|
|
"""
|
|
Implemented to make sorted available upon CharsetMatches items.
|
|
"""
|
|
if not isinstance(other, CharsetMatch):
|
|
raise ValueError
|
|
|
|
chaos_difference: float = abs(self.chaos - other.chaos)
|
|
coherence_difference: float = abs(self.coherence - other.coherence)
|
|
|
|
# Below 1% difference --> Use Coherence
|
|
if chaos_difference < 0.01 and coherence_difference > 0.02:
|
|
return self.coherence > other.coherence
|
|
elif chaos_difference < 0.01 and coherence_difference <= 0.02:
|
|
# When having a difficult decision, use the result that decoded as many multi-byte as possible.
|
|
# preserve RAM usage!
|
|
if len(self._payload) >= TOO_BIG_SEQUENCE:
|
|
return self.chaos < other.chaos
|
|
return self.multi_byte_usage > other.multi_byte_usage
|
|
|
|
return self.chaos < other.chaos
|
|
|
|
@property
|
|
def multi_byte_usage(self) -> float:
|
|
return 1.0 - (len(str(self)) / len(self.raw))
|
|
|
|
def __str__(self) -> str:
|
|
# Lazy Str Loading
|
|
if self._string is None:
|
|
self._string = str(self._payload, self._encoding, "strict")
|
|
return self._string
|
|
|
|
def __repr__(self) -> str:
|
|
return f"<CharsetMatch '{self.encoding}' bytes({self.fingerprint})>"
|
|
|
|
def add_submatch(self, other: CharsetMatch) -> None:
|
|
if not isinstance(other, CharsetMatch) or other == self:
|
|
raise ValueError(
|
|
"Unable to add instance <{}> as a submatch of a CharsetMatch".format(
|
|
other.__class__
|
|
)
|
|
)
|
|
|
|
other._string = None # Unload RAM usage; dirty trick.
|
|
self._leaves.append(other)
|
|
|
|
@property
|
|
def encoding(self) -> str:
|
|
return self._encoding
|
|
|
|
@property
|
|
def encoding_aliases(self) -> list[str]:
|
|
"""
|
|
Encoding name are known by many name, using this could help when searching for IBM855 when it's listed as CP855.
|
|
"""
|
|
also_known_as: list[str] = []
|
|
for u, p in aliases.items():
|
|
if self.encoding == u:
|
|
also_known_as.append(p)
|
|
elif self.encoding == p:
|
|
also_known_as.append(u)
|
|
return also_known_as
|
|
|
|
@property
|
|
def bom(self) -> bool:
|
|
return self._has_sig_or_bom
|
|
|
|
@property
|
|
def byte_order_mark(self) -> bool:
|
|
return self._has_sig_or_bom
|
|
|
|
@property
|
|
def languages(self) -> list[str]:
|
|
"""
|
|
Return the complete list of possible languages found in decoded sequence.
|
|
Usually not really useful. Returned list may be empty even if 'language' property return something != 'Unknown'.
|
|
"""
|
|
return [e[0] for e in self._languages]
|
|
|
|
@property
|
|
def language(self) -> str:
|
|
"""
|
|
Most probable language found in decoded sequence. If none were detected or inferred, the property will return
|
|
"Unknown".
|
|
"""
|
|
if not self._languages:
|
|
# Trying to infer the language based on the given encoding
|
|
# Its either English or we should not pronounce ourselves in certain cases.
|
|
if "ascii" in self.could_be_from_charset:
|
|
return "English"
|
|
|
|
# doing it there to avoid circular import
|
|
from charset_normalizer.cd import encoding_languages, mb_encoding_languages
|
|
|
|
languages = (
|
|
mb_encoding_languages(self.encoding)
|
|
if is_multi_byte_encoding(self.encoding)
|
|
else encoding_languages(self.encoding)
|
|
)
|
|
|
|
if len(languages) == 0 or "Latin Based" in languages:
|
|
return "Unknown"
|
|
|
|
return languages[0]
|
|
|
|
return self._languages[0][0]
|
|
|
|
@property
|
|
def chaos(self) -> float:
|
|
return self._mean_mess_ratio
|
|
|
|
@property
|
|
def coherence(self) -> float:
|
|
if not self._languages:
|
|
return 0.0
|
|
return self._languages[0][1]
|
|
|
|
@property
|
|
def percent_chaos(self) -> float:
|
|
return round(self.chaos * 100, ndigits=3)
|
|
|
|
@property
|
|
def percent_coherence(self) -> float:
|
|
return round(self.coherence * 100, ndigits=3)
|
|
|
|
@property
|
|
def raw(self) -> bytes:
|
|
"""
|
|
Original untouched bytes.
|
|
"""
|
|
return self._payload
|
|
|
|
@property
|
|
def submatch(self) -> list[CharsetMatch]:
|
|
return self._leaves
|
|
|
|
@property
|
|
def has_submatch(self) -> bool:
|
|
return len(self._leaves) > 0
|
|
|
|
@property
|
|
def alphabets(self) -> list[str]:
|
|
if self._unicode_ranges is not None:
|
|
return self._unicode_ranges
|
|
# list detected ranges
|
|
detected_ranges: list[str | None] = [unicode_range(char) for char in str(self)]
|
|
# filter and sort
|
|
self._unicode_ranges = sorted(list({r for r in detected_ranges if r}))
|
|
return self._unicode_ranges
|
|
|
|
@property
|
|
def could_be_from_charset(self) -> list[str]:
|
|
"""
|
|
The complete list of encoding that output the exact SAME str result and therefore could be the originating
|
|
encoding.
|
|
This list does include the encoding available in property 'encoding'.
|
|
"""
|
|
return [self._encoding] + [m.encoding for m in self._leaves]
|
|
|
|
def output(self, encoding: str = "utf_8") -> bytes:
|
|
"""
|
|
Method to get re-encoded bytes payload using given target encoding. Default to UTF-8.
|
|
Any errors will be simply ignored by the encoder NOT replaced.
|
|
"""
|
|
if self._output_encoding is None or self._output_encoding != encoding:
|
|
self._output_encoding = encoding
|
|
decoded_string = str(self)
|
|
if (
|
|
self._preemptive_declaration is not None
|
|
and self._preemptive_declaration.lower()
|
|
not in ["utf-8", "utf8", "utf_8"]
|
|
):
|
|
patched_header = sub(
|
|
RE_POSSIBLE_ENCODING_INDICATION,
|
|
lambda m: m.string[m.span()[0] : m.span()[1]].replace(
|
|
m.groups()[0],
|
|
iana_name(self._output_encoding).replace("_", "-"), # type: ignore[arg-type]
|
|
),
|
|
decoded_string[:8192],
|
|
count=1,
|
|
)
|
|
|
|
decoded_string = patched_header + decoded_string[8192:]
|
|
|
|
self._output_payload = decoded_string.encode(encoding, "replace")
|
|
|
|
return self._output_payload # type: ignore
|
|
|
|
@property
|
|
def fingerprint(self) -> str:
|
|
"""
|
|
Retrieve the unique SHA256 computed using the transformed (re-encoded) payload. Not the original one.
|
|
"""
|
|
return sha256(self.output()).hexdigest()
|
|
|
|
|
|
class CharsetMatches:
|
|
"""
|
|
Container with every CharsetMatch items ordered by default from most probable to the less one.
|
|
Act like a list(iterable) but does not implements all related methods.
|
|
"""
|
|
|
|
def __init__(self, results: list[CharsetMatch] | None = None):
|
|
self._results: list[CharsetMatch] = sorted(results) if results else []
|
|
|
|
def __iter__(self) -> Iterator[CharsetMatch]:
|
|
yield from self._results
|
|
|
|
def __getitem__(self, item: int | str) -> CharsetMatch:
|
|
"""
|
|
Retrieve a single item either by its position or encoding name (alias may be used here).
|
|
Raise KeyError upon invalid index or encoding not present in results.
|
|
"""
|
|
if isinstance(item, int):
|
|
return self._results[item]
|
|
if isinstance(item, str):
|
|
item = iana_name(item, False)
|
|
for result in self._results:
|
|
if item in result.could_be_from_charset:
|
|
return result
|
|
raise KeyError
|
|
|
|
def __len__(self) -> int:
|
|
return len(self._results)
|
|
|
|
def __bool__(self) -> bool:
|
|
return len(self._results) > 0
|
|
|
|
def append(self, item: CharsetMatch) -> None:
|
|
"""
|
|
Insert a single match. Will be inserted accordingly to preserve sort.
|
|
Can be inserted as a submatch.
|
|
"""
|
|
if not isinstance(item, CharsetMatch):
|
|
raise ValueError(
|
|
"Cannot append instance '{}' to CharsetMatches".format(
|
|
str(item.__class__)
|
|
)
|
|
)
|
|
# We should disable the submatch factoring when the input file is too heavy (conserve RAM usage)
|
|
if len(item.raw) < TOO_BIG_SEQUENCE:
|
|
for match in self._results:
|
|
if match.fingerprint == item.fingerprint and match.chaos == item.chaos:
|
|
match.add_submatch(item)
|
|
return
|
|
self._results.append(item)
|
|
self._results = sorted(self._results)
|
|
|
|
def best(self) -> CharsetMatch | None:
|
|
"""
|
|
Simply return the first match. Strict equivalent to matches[0].
|
|
"""
|
|
if not self._results:
|
|
return None
|
|
return self._results[0]
|
|
|
|
def first(self) -> CharsetMatch | None:
|
|
"""
|
|
Redundant method, call the method best(). Kept for BC reasons.
|
|
"""
|
|
return self.best()
|
|
|
|
|
|
CoherenceMatch = Tuple[str, float]
|
|
CoherenceMatches = List[CoherenceMatch]
|
|
|
|
|
|
class CliDetectionResult:
|
|
def __init__(
|
|
self,
|
|
path: str,
|
|
encoding: str | None,
|
|
encoding_aliases: list[str],
|
|
alternative_encodings: list[str],
|
|
language: str,
|
|
alphabets: list[str],
|
|
has_sig_or_bom: bool,
|
|
chaos: float,
|
|
coherence: float,
|
|
unicode_path: str | None,
|
|
is_preferred: bool,
|
|
):
|
|
self.path: str = path
|
|
self.unicode_path: str | None = unicode_path
|
|
self.encoding: str | None = encoding
|
|
self.encoding_aliases: list[str] = encoding_aliases
|
|
self.alternative_encodings: list[str] = alternative_encodings
|
|
self.language: str = language
|
|
self.alphabets: list[str] = alphabets
|
|
self.has_sig_or_bom: bool = has_sig_or_bom
|
|
self.chaos: float = chaos
|
|
self.coherence: float = coherence
|
|
self.is_preferred: bool = is_preferred
|
|
|
|
@property
|
|
def __dict__(self) -> dict[str, Any]: # type: ignore
|
|
return {
|
|
"path": self.path,
|
|
"encoding": self.encoding,
|
|
"encoding_aliases": self.encoding_aliases,
|
|
"alternative_encodings": self.alternative_encodings,
|
|
"language": self.language,
|
|
"alphabets": self.alphabets,
|
|
"has_sig_or_bom": self.has_sig_or_bom,
|
|
"chaos": self.chaos,
|
|
"coherence": self.coherence,
|
|
"unicode_path": self.unicode_path,
|
|
"is_preferred": self.is_preferred,
|
|
}
|
|
|
|
def to_json(self) -> str:
|
|
return dumps(self.__dict__, ensure_ascii=True, indent=4)
|