重建仓库,重新提交。

This commit is contained in:
2025-12-18 14:20:57 +08:00
parent 849f0f409f
commit e518f781ad
108 changed files with 8508 additions and 57 deletions

1462
Utils/AiUtils.py Normal file

File diff suppressed because it is too large Load Diff

356
Utils/ControlUtils.py Normal file
View File

@@ -0,0 +1,356 @@
import math
import random
import re
import time
from typing import Tuple, List
import tidevice
import wda
from wda import Client
from Entity.Variables import wdaFunctionPort
from Utils.AiUtils import AiUtils
from Utils.LogManager import LogManager
# 页面控制工具类
class ControlUtils(object):
# 获取设备上的app列表
@classmethod
def getDeviceAppList(self, udid):
device = tidevice.Device(udid)
# 获取已安装的应用列表
apps = []
for app in device.installation.iter_installed():
apps.append({
"name": app.get("CFBundleDisplayName", "Unknown"),
"bundleId": app.get("CFBundleIdentifier", "Unknown"),
"version": app.get("CFBundleShortVersionString", "Unknown"),
"path": app.get("Path", "Unknown")
})
# 筛选非系统级应用(过滤掉以 com.apple 开头的系统应用)
noSystemApps = [app for app in apps if not app["bundleId"].startswith("com.apple")]
return noSystemApps
# 打开Tik Tok
@classmethod
def openTikTok(cls, session: Client, udid):
apps = cls.getDeviceAppList(udid)
tk = ""
for app in apps:
if app.get("name", "") == "TikTok":
tk = app.get("bundleId", "")
currentApp = session.app_current()
if currentApp != tk:
session.app_start(tk)
# 关闭Tik Tok
@classmethod
def closeTikTok(cls, session: Client, udid):
apps = cls.getDeviceAppList(udid)
tk = ""
for app in apps:
if app.get("name", "") == "TikTok":
tk = app.get("bundleId", "")
session.app_stop(tk)
# 返回
@classmethod
def clickBack(cls, session: Client):
try:
back = session.xpath(
# ① 常见中文文案
"//*[@label='返回' or @label='返回上一屏幕']"
" | "
# ② 英文 / 内部 name / 图标 label 的按钮(仅限 Button且可见
"//XCUIElementTypeButton[@visible='true' and ("
"@name='Back' or @label='Back' or " # 英文
"@name='返回' or @label='返回' or " # 中文
"@label='返回上一屏幕' or " # 中文另一种
"@name='returnButton' or"
"@name='nav_bar_start_back' or " # 内部常见 name
"(@name='TTKProfileNavBarBaseItemComponent' and @label='IconChevronLeftOffsetLTR')" # 你给的特例
")]"
)
if back.exists:
back.click()
return True
elif session.xpath("//*[@name='nav_bar_start_back']").exists:
back = session.xpath("//*[@name='nav_bar_start_back']")
if back.exists:
back.click()
return True
elif session.xpath(
"//Window[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]").exists:
back = session.xpath(
"//Window[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]")
if back.exists:
back.click()
return True
elif session.xpath(
"(//XCUIElementTypeOther[@y='20' and @height='44']//XCUIElementTypeButton[@visible='true'])[1]").exists:
back = session.xpath(
"(//XCUIElementTypeOther[@y='20' and @height='44']//XCUIElementTypeButton[@visible='true'])[1]")
if back.exists:
back.click()
return True
else:
return False
except Exception as e:
print(e)
return False
@classmethod
def isClickBackEnabled(cls, session: Client):
try:
back = session.xpath(
# ① 常见中文文案
"//*[@label='返回' or @label='返回上一屏幕']"
" | "
# ② 英文 / 内部 name / 图标 label 的按钮(仅限 Button且可见
"//XCUIElementTypeButton[@visible='true' and ("
"@name='Back' or @label='Back' or " # 英文
"@name='返回' or @label='返回' or " # 中文
"@label='返回上一屏幕' or " # 中文另一种
"@name='returnButton' or"
"@name='nav_bar_start_back' or " # 内部常见 name
"(@name='TTKProfileNavBarBaseItemComponent' and @label='IconChevronLeftOffsetLTR')" # 你给的特例
")]"
)
if back.exists:
return True
elif session.xpath("//*[@name='nav_bar_start_back']").exists:
back = session.xpath("//*[@name='nav_bar_start_back']")
if back.exists:
return True
elif session.xpath(
"//Window[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]").exists:
back = session.xpath(
"//Window[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]")
if back.exists:
return True
elif session.xpath(
"(//XCUIElementTypeOther[@y='20' and @height='44']//XCUIElementTypeButton[@visible='true'])[1]").exists:
back = session.xpath(
"(//XCUIElementTypeOther[@y='20' and @height='44']//XCUIElementTypeButton[@visible='true'])[1]")
if back.exists:
return True
else:
return False
except Exception as e:
print(e)
return False
# 点赞
@classmethod
def clickLike(cls, session: Client, udid):
try:
from script.ScriptManager import ScriptManager
width, height, scale = ScriptManager.get_screen_info(udid)
if scale == 3.0:
x, y = AiUtils.findImageInScreen("add", udid)
if x > -1:
LogManager.method_info(f"点赞了,点赞的坐标是:{x // scale, y // scale + 50}", "关注打招呼", udid)
session.click(int(x // scale), int(y // scale + 50))
return True
else:
LogManager.method_info("没有找到目标", "关注打招呼", udid)
return False
else:
x, y = AiUtils.findImageInScreen("like1", udid)
if x > -1:
LogManager.method_info(f"点赞了,点赞的坐标是:{x // scale, y // scale}", "关注打招呼", udid)
session.click(int(x // scale), int(y // scale))
return True
else:
LogManager.method_info("没有找到目标", "关注打招呼", udid)
return False
except Exception as e:
LogManager.method_info(f"点赞出现异常,异常的原因:{e}", "关注打招呼", udid)
raise False
# 点击搜索
@classmethod
def clickSearch(cls, session: Client):
# obj = session.xpath("//*[@name='搜索']")
obj = session(xpath='//*[@name="搜索" or @label="搜索" or @name="Search" or @label="Search"]')
try:
if obj.exists:
obj.click()
return True
except Exception as e:
print(e)
return False
# 点击收件箱按钮
@classmethod
def clickMsgBox(cls, session: Client):
box = session.xpath("//XCUIElementTypeButton[name='a11y_vo_inbox']")
if box.exists:
box.click()
return True
else:
return False
# 获取主播详情页的第一个视频
@classmethod
def clickFirstVideoFromDetailPage(cls, session: Client):
videoCell = session.xpath(
'(//XCUIElementTypeCollectionView//XCUIElementTypeCell[.//XCUIElementTypeImage[@name="profile_video"]])[1]')
tab = session.xpath(
'//XCUIElementTypeButton[@name="TTKProfileTabVideoButton_0" or contains(@label,"作品") or contains(@name,"作品")]'
).get(timeout=5) # 某些版本 tab.value 可能就是数量;或者 tab.label 类似 “作品 7”
m = re.search(r"\d+", tab.label)
num = 0
if m:
# 判断当前的作品的数量
num = int(m.group())
print("作品数量为:", num)
if videoCell.exists:
videoCell.click()
# 点击视频
print("找到主页的第一个视频")
return True, num
else:
print("没有找到主页的第一个视频")
return False, num
@classmethod
def clickFollow(cls, session, aid):
# 1) 含“关注/已关注/Follow/Following”的首个 cell
cell_xpath = (
'(//XCUIElementTypeCollectionView[@name="TTKSearchCollectionComponent"]'
'//XCUIElementTypeCell[.//XCUIElementTypeButton[@name="关注" or @name="Follow" or @name="已关注" or @name="Following"]])[1]'
)
cell = session.xpath(cell_xpath).get(timeout=5)
# 2) 先试“用户信息 Button”label/name 里包含 aid
profile_btn_xpath = (
f'{cell_xpath}//XCUIElementTypeButton[contains(@label, "{aid}") or contains(@name, "{aid}")]'
)
try:
profile_btn = session.xpath(profile_btn_xpath).get(timeout=3)
profile_btn.click()
except wda.WDAElementNotFoundError:
# 3) 兜底:用“关注”按钮做锚点,向左偏移点击头像/用户名区域
follow_btn_xpath = (
f'{cell_xpath}//XCUIElementTypeButton[@name="关注" or @name="Follow" or @name="已关注" or @name="Following"]'
)
follow_btn = session.xpath(follow_btn_xpath).get(timeout=5)
rect = follow_btn.bounds
left_x = max(1, rect.x - 20)
center_y = rect.y + rect.height // 2
session.tap(left_x, center_y)
@classmethod
def userClickProfile(cls, session, aid):
try:
user_btn = session.xpath("(//XCUIElementTypeButton[@name='用户' and @visible='true'])[1]")
if user_btn.exists:
user_btn.click()
time.sleep(3)
follow_btn = session.xpath(
"(//XCUIElementTypeTable//XCUIElementTypeButton[@name='关注' or @name='已关注'])[1]"
).get(timeout=5)
if follow_btn:
x, y, w, h = follow_btn.bounds
# 垂直方向中心 + 随机 3~8 像素偏移
cy = int(y + h / 2 + random.randint(-8, 8))
# 横向往左偏移 80~120 像素之间的随机值
cx = int(x - random.randint(80, 120))
# 点击
session.tap(cx, cy)
return True
return False
except Exception as e:
print(e)
return False
@classmethod
def random_micro_swipe(
cls,
center_x: int,
center_y: int,
session,
points: int = 6,
duration_ms: int = 15,
) -> None:
"""
在 (center_x, center_y) 附近做 20px 左右的不规则微滑动。
使用 facebook-wda 的 session.swipe(x1, y1, x2, y2, duration) 接口。
"""
# 1. 随机方向
angle = random.uniform(0, 2 * math.pi)
length = random.uniform(18, 22) # 20px 左右
end_x = center_x + length * math.cos(angle)
end_y = center_y + length * math.sin(angle)
# 2. 限制在 20px 圆内(防止超出)
def clamp_to_circle(x, y, cx, cy, r):
dx = x - cx
dy = y - cy
if dx * dx + dy * dy > r * r:
scale = r / math.hypot(dx, dy)
x = cx + dx * scale
y = cy + dy * scale
return int(round(x)), int(round(y))
end_x, end_y = clamp_to_circle(end_x, end_y, center_x, center_y, 20)
# 3. 加入轻微噪声,制造“不规则”曲线
noise = 3 # 最大偏移像素
mid_count = points - 2
mid_points: List[Tuple[int, int]] = []
for i in range(1, mid_count + 1):
t = i / (mid_count + 1)
# 线性插值 + 垂直方向噪声
x = center_x * (1 - t) + end_x * t
y = center_y * (1 - t) + end_y * t
perp_angle = angle + math.pi / 2 # 垂直方向
offset = random.uniform(-noise, noise)
x += offset * math.cos(perp_angle)
y += offset * math.sin(perp_angle)
x, y = clamp_to_circle(x, y, center_x, center_y, 20)
mid_points.append((int(round(x)), int(round(y))))
# 4. 构造完整轨迹
trajectory: List[Tuple[int, int]] = (
[(center_x, center_y)] + mid_points + [(end_x, end_y)]
)
# 5. 使用 facebook-wda 的 swipe 接口(逐段 swipe
# 由于总时长太短,我们一次性 swipe 到终点,但用多点轨迹模拟
# facebook-wda 支持 swipe(x1, y1, x2, y2, duration)
# 我们直接用起点 -> 终点duration 用总时长
print("开始微滑动")
session.swipe(center_x, center_y, end_x, end_y, duration_ms / 1000)
print("随机微滑动:", trajectory)
# 向上滑动 脚本内部使用
@classmethod
def swipe_up(cls, client):
client.swipe(200, 350, 200, 250, 0.05)
# 向下滑动,脚本内使用
@classmethod
def swipe_down(cls, udid):
dev = wda.USBClient(udid, wdaFunctionPort)
dev.swipe(200, 250, 200, 350, 0.05)

271
Utils/CountryEnum.py Normal file
View File

@@ -0,0 +1,271 @@
class CountryLanguageMapper:
# 初始化一个字典,映射国家到语言代码
country_to_language = {
"中国大陆": "zh-CN",
"台湾": "zh-TW",
"香港": "zh-TW",
"澳门": "zh-TW",
"美国": "en",
"英国": "en",
"澳大利亚": "en",
"日本": "ja",
"韩国": "ko",
"俄罗斯": "ru",
"法国": "fr",
"德国": "de",
"意大利": "it",
"西班牙": "es",
"墨西哥": "es",
"巴西": "pt",
"葡萄牙": "pt",
"印度": "hi",
"泰国": "th",
"越南": "vi",
"马来西亚": "ms",
"印度尼西亚": "id",
"阿联酋": "ar",
"沙特阿拉伯": "ar",
"埃及": "ar",
"以色列": "he",
"缅甸": "my",
"斯里兰卡": "ta",
"巴基斯坦": "ur",
"孟加拉国": "bn",
"波兰": "pl",
"荷兰": "nl",
"罗马尼亚": "ro",
"土耳其": "tr",
"老挝": "lo",
"乌克兰": "uk",
"芬兰": "fi",
"南非": "af",
"阿尔巴尼亚": "sq",
"安道尔": "ca",
"安提瓜和巴布达": "en",
"阿根廷": "es",
"亚美尼亚": "hy",
"奥地利": "de",
"阿塞拜疆": "az",
"巴哈马": "en",
"巴林": "ar",
"巴巴多斯": "en",
"白俄罗斯": "be",
"比利时": "fr",
"伯利兹": "en",
"贝宁": "fr",
"不丹": "dz",
"玻利维亚": "es",
"波斯尼亚和黑塞哥维那": "bs",
"博茨瓦纳": "en",
"文莱": "ms",
"保加利亚": "bg",
"布基纳法索": "fr",
"布隆迪": "fr",
"柬埔寨": "km",
"喀麦隆": "fr",
"加拿大": "en",
"佛得角": "pt",
"开曼群岛": "en",
"中非共和国": "fr",
"乍得": "fr",
"智利": "es",
"中国": "zh-CN",
"圣诞岛": "en",
"科科斯群岛": "en",
"哥伦比亚": "es",
"科摩罗": "ar",
"刚果": "fr",
"库克群岛": "en",
"哥斯达黎加": "es",
"科特迪瓦": "fr",
"克罗地亚": "hr",
"古巴": "es",
"库拉索": "nl",
"塞浦路斯": "el",
"捷克": "cs",
"丹麦": "da",
"吉布提": "fr",
"多米尼克": "en",
"多米尼加共和国": "es",
"厄瓜多尔": "es",
"萨尔瓦多": "es",
"赤道几内亚": "es",
"厄立特里亚": "ti",
"爱沙尼亚": "et",
"埃斯瓦蒂尼": "en",
"埃塞俄比亚": "am",
"福克兰群岛": "en",
"法罗群岛": "fo",
"斐济": "en",
"法属圭亚那": "fr",
"法属波利尼西亚": "fr",
"法属南部领地": "fr",
"加蓬": "fr",
"冈比亚": "en",
"格鲁吉亚": "ka",
"加纳": "en",
"直布罗陀": "en",
"希腊": "el",
"格陵兰": "kl",
"格林纳达": "en",
"瓜德罗普": "fr",
"关岛": "en",
"危地马拉": "es",
"根西岛": "en",
"几内亚": "fr",
"几内亚比绍": "pt",
"圭亚那": "en",
"海地": "fr",
"赫德岛和麦克唐纳群岛": "en",
"梵蒂冈": "it",
"洪都拉斯": "es",
"中国香港特别行政区": "zh-TW",
"匈牙利": "hu",
"冰岛": "is",
"伊朗": "fa",
"伊拉克": "ar",
"爱尔兰": "en",
"曼岛": "en",
"牙买加": "en",
"泽西岛": "en",
"约旦": "ar",
"哈萨克斯坦": "kk",
"肯尼亚": "en",
"基里巴斯": "en",
"朝鲜": "ko",
"科威特": "ar",
"吉尔吉斯斯坦": "ky",
"拉脱维亚": "lv",
"黎巴嫩": "ar",
"莱索托": "en",
"利比里亚": "en",
"利比亚": "ar",
"列支敦士登": "de",
"立陶宛": "lt",
"卢森堡": "fr",
"中国澳门特别行政区": "zh-TW",
"马达加斯加": "fr",
"马拉维": "en",
"马尔代夫": "dv",
"马里": "fr",
"马耳他": "mt",
"马绍尔群岛": "en",
"马提尼克": "fr",
"毛里塔尼亚": "ar",
"毛里求斯": "en",
"马约特": "fr",
"密克罗尼西亚": "en",
"摩尔多瓦": "ro",
"摩纳哥": "fr",
"蒙古": "mn",
"黑山": "sr",
"蒙特塞拉特": "en",
"摩洛哥": "ar",
"莫桑比克": "pt",
"纳米比亚": "en",
"瑙鲁": "en",
"尼泊尔": "ne",
"新喀里多尼亚": "fr",
"新西兰": "en",
"尼加拉瓜": "es",
"尼日尔": "fr",
"尼日利亚": "en",
"纽埃": "en",
"诺福克岛": "en",
"北马其顿": "mk",
"北马里亚纳群岛": "en",
"挪威": "no",
"阿曼": "ar",
"帕劳": "en",
"巴勒斯坦": "ar",
"巴拿马": "es",
"巴布亚新几内亚": "en",
"巴拉圭": "es",
"秘鲁": "es",
"菲律宾": "tl",
"皮特凯恩群岛": "en",
"波多黎各": "es",
"卡塔尔": "ar",
"留尼汪": "fr",
"卢旺达": "rw",
"圣巴泰勒米": "fr",
"圣赫勒拿": "en",
"圣基茨和尼维斯": "en",
"圣卢西亚": "en",
"法属圣马丁": "fr",
"圣皮埃尔和密克隆": "fr",
"圣文森特和格林纳丁斯": "en",
"萨摩亚": "sm",
"圣马力诺": "it",
"圣多美和普林西比": "pt",
"塞内加尔": "fr",
"塞尔维亚": "sr",
"塞舌尔": "fr",
"塞拉利昂": "en",
"新加坡": "en",
"荷属圣马丁": "nl",
"斯洛伐克": "sk",
"斯洛文尼亚": "sl",
"所罗门群岛": "en",
"索马里": "so",
"南乔治亚岛和南桑威奇群岛": "en",
"南苏丹": "en",
"苏丹": "ar",
"苏里南": "nl",
"斯瓦尔巴群岛和扬马延岛": "no",
"瑞典": "sv",
"瑞士": "de",
"叙利亚": "ar",
"台湾省": "zh-TW",
"塔吉克斯坦": "tg",
"坦桑尼亚": "sw",
"东帝汶": "tet",
"多哥": "fr",
"托克劳": "en",
"汤加": "to",
"特立尼达和多巴哥": "en",
"突尼斯": "ar",
"土库曼斯坦": "tk",
"特克斯和凯科斯群岛": "en",
"图瓦卢": "en",
"乌干达": "en",
"美国本土外小岛屿": "en",
"乌拉圭": "es",
"乌兹别克斯坦": "uz",
"瓦努阿图": "bi",
"委内瑞拉": "es",
"英属维尔京群岛": "en",
"美属维尔京群岛": "en",
"瓦利斯和富图纳": "fr",
"西撒哈拉": "ar",
"也门": "ar",
"赞比亚": "en",
"津巴布韦": "en",
"阿富汗": "fa",
"阿尔及利亚": "ar",
"美属萨摩亚": "en",
"安哥拉": "pt",
"安圭拉": "en",
"南极洲": "en",
"百慕大": "en",
"荷属加勒比区": "nl",
"布韦岛": "no",
"英属印度洋领地": "en",
}
@classmethod
def get_language_code(cls, country):
return cls.country_to_language.get(country)
# 使用示例
if __name__ == "__main__":
mapper = CountryLanguageMapper()
countries = ['英国', '美国', '日本', '未知国家']
for country in countries:
code = mapper.get_language_code(country)
if code:
print(f"{country} 对应的语言代码是 {code}")
else:
print(f"没有找到 {country} 对应的语言代码")

View File

@@ -0,0 +1,135 @@
# support_deployer.py
import os
import re
import shutil
from pathlib import Path
from dataclasses import dataclass, field
from typing import Iterable, Optional
VERSION_RE = re.compile(r"^\d+(?:\.\d+)*$") # 15 / 15.6 / 16.7 / 16.7.1
def _find_support_root(hint: Optional[Path]) -> Optional[Path]:
"""
1) 优先:显式传入的 hint
2) 其次:环境变量 SUPPORT_DDI_DIR
3) 再次:从 __file__ 所在目录向上搜索 3 层,找名为 'SupportFiles' 的目录
"""
if hint and hint.exists():
return hint.resolve()
env = os.environ.get("SUPPORT_DDI_DIR")
if env:
p = Path(env).expanduser()
if p.exists():
return p.resolve()
here = Path(__file__).resolve().parent
for _ in range(4): # 当前目录 + 向上 3 层
cand = here / "SupportFiles"
if cand.exists():
return cand.resolve()
here = here.parent
return None
@dataclass
class DevDiskImageDeployer:
"""
同步 SupportFiles/<version>/ 或 SupportFiles/<version>.zip 到 ~/.tidevice/device-support
- 目录:复制为 ~/.tidevice/device-support/<version>/
- zip原样复制为 ~/.tidevice/device-support/<version>.zip (不解压)
- 已存在则跳过;如设置 overwrite=True 则覆盖
"""
project_support_root: Optional[Path] = None
cache_root: Optional[Path] = None
verbose: bool = True
dry_run: bool = False
overwrite: bool = False
_src_dir: Path = field(init=False, repr=False)
_cache_dir: Path = field(init=False, repr=False)
def __post_init__(self):
src = _find_support_root(self.project_support_root)
if src is None:
raise FileNotFoundError(
"未找到 SupportFiles 目录。"
"可传入 project_support_root或设置环境变量 SUPPORT_DDI_DIR"
"或确保在当前文件上层 3 级目录内存在名为 'SupportFiles' 的目录。"
)
self._src_dir = src
if self.cache_root is None:
self._cache_dir = Path.home() / ".tidevice" / "device-support"
else:
self._cache_dir = Path(self.cache_root).expanduser().resolve()
self._cache_dir.mkdir(parents=True, exist_ok=True)
if self.verbose:
print(f"[INFO] resolved SupportFiles = {self._src_dir}")
print(f"[INFO] cache_dir = {self._cache_dir}")
parent = self._src_dir.parent
try:
siblings = ", ".join(sorted(p.name for p in parent.iterdir() if p.is_dir()))
print(f"[INFO] SupportFiles parent = {parent}")
print(f"[INFO] siblings = {siblings}")
except Exception:
pass
def deploy_all(self):
entries = list(self._iter_version_entries(self._src_dir))
copied = skipped = 0
for p, kind, version in entries:
# kind: "dir" 或 "zip"
if kind == "dir":
dst = self._cache_dir / version
exists = dst.exists()
if exists and not self.overwrite:
skipped += 1
# if self.verbose:
# print(f"[SKIP] {dst} 已存在(目录)")
continue
if exists and self.overwrite and not self.dry_run:
shutil.rmtree(dst)
if self.verbose:
print(f"[COPY] DIR {p} -> {dst}")
if not self.dry_run:
shutil.copytree(p, dst)
copied += 1
elif kind == "zip":
dst = self._cache_dir / f"{version}.zip"
exists = dst.exists()
if exists and not self.overwrite:
skipped += 1
# if self.verbose:
# print(f"[SKIP] {dst} 已存在zip")
continue
if exists and self.overwrite and not self.dry_run:
dst.unlink()
if self.verbose:
print(f"[COPY] ZIP {p} -> {dst}")
if not self.dry_run:
# 用 copy2 保留 mtime 等元数据
shutil.copy2(p, dst)
copied += 1
if self.verbose:
print(f"[SUMMARY] copied={copied}, skipped={skipped}, total={copied+skipped}")
# -------- helpers --------
def _iter_version_entries(self, root: Path) -> Iterable[tuple[Path, str, str]]:
"""
迭代返回 (路径, 类型, 版本号)
- 目录:名称需匹配版本号
- zipstem去除后缀的名称需匹配版本号
"""
for p in sorted(root.iterdir()):
if p.is_dir() and VERSION_RE.match(p.name):
yield (p, "dir", p.name)
elif p.is_file() and p.suffix.lower() == ".zip" and VERSION_RE.match(p.stem):
yield (p, "zip", p.stem)

88
Utils/IOSAIStorage.py Normal file
View File

@@ -0,0 +1,88 @@
import json
import os
from pathlib import Path
class IOSAIStorage:
@staticmethod
def _get_iosai_dir() -> Path:
"""获取 C:/Users/<用户名>/IOSAI/ 目录"""
user_dir = Path.home()
iosai_dir = user_dir / "IOSAI"
iosai_dir.mkdir(parents=True, exist_ok=True)
return iosai_dir
@classmethod
def save(cls, data: dict | list, filename: str = "data.json", mode: str = "overwrite") -> Path:
file_path = cls._get_iosai_dir() / filename
file_path.parent.mkdir(parents=True, exist_ok=True)
def _load_json():
try:
return json.loads(file_path.read_text("utf-8"))
except Exception:
return {} if isinstance(data, dict) else []
if mode == "merge" and isinstance(data, dict):
old = _load_json()
if not isinstance(old, dict):
old = {}
old.update(data)
to_write = old
elif mode == "append" and isinstance(data, list):
old = _load_json()
if not isinstance(old, list):
old = []
old.extend(data)
to_write = old
else:
to_write = data # 覆盖
# 原子写入
tmp = file_path.with_suffix(file_path.suffix + ".tmp")
with open(tmp, "w", encoding="utf-8") as f:
json.dump(to_write, f, ensure_ascii=False, indent=2)
os.replace(tmp, file_path)
print(f"[IOSAIStorage] 已写入: {file_path}")
return file_path
@classmethod
def load(cls, filename: str = "data.json") -> dict | list | None:
"""
从 C:/Users/<用户名>/IOSAI/filename 读取数据
"""
file_path = cls._get_iosai_dir() / filename
if not file_path.exists():
print(f"[IOSAIStorage] 文件不存在: {file_path}")
return {}
try:
with open(file_path, "r", encoding="utf-8") as f:
return json.load(f)
except Exception as e:
print(f"[IOSAIStorage] 读取失败: {e}")
return {}
@classmethod
def overwrite(cls, data: dict | list, filename: str = "data.json") -> Path:
"""
强制覆盖写入数据到 C:/Users/<用户名>/IOSAI/filename
(无论是否存在,都会写入)
"""
file_path = cls._get_iosai_dir() / filename
try:
# "w" 模式本身就是覆盖,但这里单独做一个方法
with open(file_path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
print(f"[IOSAIStorage] 已覆盖写入: {file_path}")
except Exception as e:
print(f"[IOSAIStorage] 覆盖失败: {e}")
return file_path

262
Utils/JsonUtils.py Normal file
View File

@@ -0,0 +1,262 @@
import json
import os
from pathlib import Path
import portalocker as locker # ① 引入跨平台锁
class JsonUtils:
@staticmethod
def _normalize_filename(filename: str) -> str:
"""
确保文件名以 .json 结尾
"""
if not filename.endswith(".json"):
filename = f"{filename}.json"
return filename
@staticmethod
def _get_data_path(filename: str) -> str:
"""
根据文件名生成 data 目录下的完整路径
"""
filename = JsonUtils._normalize_filename(filename)
base_dir = os.path.abspath(os.path.dirname(os.path.dirname(__file__))) # 当前项目根目录
data_dir = os.path.join(base_dir, "data")
Path(data_dir).mkdir(parents=True, exist_ok=True) # 确保 data 目录存在
return os.path.join(data_dir, filename)
@staticmethod
def read_json(filename: str) -> dict:
"""
读取 JSON 文件,返回字典
如果文件不存在,返回空字典
"""
file_path = JsonUtils._get_data_path(filename)
try:
if not os.path.exists(file_path):
return {}
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f)
return data if isinstance(data, dict) else {}
except Exception as e:
print(f"读取 JSON 文件失败: {e}")
return {}
@staticmethod
def write_json(filename: str, data: dict, overwrite: bool = True) -> bool:
"""
将字典写入 JSON 文件
:param filename: 文件名(不用写后缀,自动补 .json
:param data: 要写入的字典
:param overwrite: True=覆盖写False=合并更新
"""
file_path = JsonUtils._get_data_path(filename)
try:
if not overwrite and os.path.exists(file_path):
with open(file_path, "r", encoding="utf-8") as f:
old_data = json.load(f)
if not isinstance(old_data, dict):
old_data = {}
old_data.update(data)
data = old_data
with open(file_path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=4)
return True
except Exception as e:
print(f"写入 JSON 文件失败: {e}")
return False
@staticmethod
def update_json(filename: str, new_data: dict) -> bool:
"""
修改 JSON 文件:
- 如果 key 已存在,则修改其值
- 如果 key 不存在,则新增
"""
try:
data = JsonUtils.read_json(filename)
if not isinstance(data, dict):
data = {}
data.update(new_data)
return JsonUtils.write_json(filename, data)
except Exception as e:
print(f"更新 JSON 文件失败: {e}")
return False
@staticmethod
def delete_json_key(filename: str, key: str) -> bool:
"""
删除 JSON 文件中的某个 key
"""
try:
data = JsonUtils.read_json(filename)
if not isinstance(data, dict):
data = {}
if key in data:
del data[key]
return JsonUtils.write_json(filename, data)
except Exception as e:
print(f"删除 JSON key 失败: {e}")
return False
# "-------------------------------------------------"
@classmethod
def _read_json_list(cls, file_path: Path) -> list:
try:
if not file_path.exists():
return []
with file_path.open("r", encoding="utf-8") as f:
data = json.load(f)
return data if isinstance(data, list) else []
except Exception:
return []
@classmethod
def _write_json_list(cls, file_path: Path, data: list) -> None:
file_path.parent.mkdir(parents=True, exist_ok=True)
with file_path.open("w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=4)
# --- 新增:通用追加(不做字段校验) ---
# @classmethod
# def append_json_items(cls, items, filename="log/last_message.json"):
# """
# 将 dict 或 [dict, ...] 追加到 JSON 文件(数组)中;不校验字段。
# """
# file_path = Path(filename)
# data = cls._read_json_list(file_path)
#
# # 统一成 list
# if isinstance(items, dict):
# items = [items]
# elif not isinstance(items, list):
# # 既不是 dict 也不是 list直接忽略
# return
#
# # 只接受字典项
# items = [it for it in items if isinstance(it, dict)]
# if not items:
# return
#
# data.extend(items)
#
# # LogManager.method_info(filename,"路径")
# cls._write_json_list(file_path, data)
@classmethod
def append_json_items(cls, items, filename="log/last_message.json"):
file_path = Path(filename)
data = cls._read_json_list(file_path)
# 统一成 list
if isinstance(items, dict):
items = [items]
elif not isinstance(items, list):
return
# 只保留 sender 非空的字典
items = [
it for it in items
if isinstance(it, dict) and it.get("sender") != ""
]
if not items:
return
data.extend(items)
cls._write_json_list(file_path, data)
@classmethod
def update_json_items(cls, match: dict, patch: dict, filename="log/last_message.json", multi: bool = True) -> int:
"""
修改 JSON 文件(数组)中符合条件的项
:param match: 匹配条件(如 {"sender": "xxx"}
:param patch: 要修改/更新的字段(如 {"status": 1}
:param filename: JSON 文件路径
:param multi: True=修改所有匹配项False=只修改第一项
:return: 修改的条数
"""
file_path = Path(filename)
data = cls._read_json_list(file_path)
if not isinstance(match, dict) or not isinstance(patch, dict):
return 0
updated = 0
for idx, item in enumerate(data):
if not isinstance(item, dict):
continue
# 判断是否匹配
if all(item.get(k) == v for k, v in match.items()):
data[idx].update(patch)
updated += 1
if not multi:
break
if updated > 0:
cls._write_json_list(file_path, data)
return updated
@classmethod
def query_all_json_items(cls, filename="log/last_message.json"):
"""
读取 JSON 数组文件,过滤掉 sender 或 text 为空的记录
:param filename: 文件路径
:return: 有效记录列表,可能为空
"""
file_path = Path(filename)
data = cls._read_json_list(file_path)
if not isinstance(data, list):
return []
def _is_valid(d):
if not isinstance(d, dict):
return False
sender = d.get("sender") or ""
text = d.get("text") or ""
return (
isinstance(sender, str)
and isinstance(text, str)
and sender.strip() != ""
and text.strip() != ""
)
return [item for item in data if _is_valid(item)]
@classmethod
def delete_json_items(cls,
match: dict,
filename: str = "log/last_message.json",
multi: bool = True) -> int:
file_path = Path(filename)
with file_path.open('r+', encoding='utf-8') as f:
locker.lock(f, locker.LOCK_EX) # ② 加独占锁Windows/Linux 通用)
try:
data = json.load(f)
if not isinstance(match, dict):
return 0
deleted = 0
new_data = []
for item in data:
if isinstance(item, dict) and all(item.get(k) == v for k, v in match.items()):
if multi or deleted == 0: # 删多条 / 第一条
deleted += 1
continue
new_data.append(item)
if deleted:
f.seek(0)
json.dump(new_data, f, ensure_ascii=False, indent=2)
f.truncate()
return deleted
finally:
locker.unlock(f) # ③ 解锁

277
Utils/LogManager.py Normal file
View File

@@ -0,0 +1,277 @@
# -*- coding: utf-8 -*-
import datetime
import io
import logging
import os
import re
import sys
import shutil
import zipfile
from pathlib import Path
import requests
# ========= 全局:强制 UTF-8打包 EXE / 无控制台也生效) =========
def _force_utf8_everywhere():
os.environ.setdefault("PYTHONUTF8", "1")
os.environ.setdefault("PYTHONIOENCODING", "utf-8")
# windowed 模式下 stdout/stderr 可能没有 buffer这里做保护包装
try:
if getattr(sys.stdout, "buffer", None):
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace")
except Exception:
pass
try:
if getattr(sys.stderr, "buffer", None):
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8", errors="replace")
except Exception:
pass
_force_utf8_everywhere()
class LogManager:
"""
设备级与“设备+方法”级日志管理:
- log/<udid>/info.log | warning.log | error.log
- log/<udid>/<method>.log
- 文件统一 UTF-8 编码,避免 GBK/CP936 导致的 emoji 报错
- 提供 clearLogs() 与 upload_all_logs()
"""
# 运行根目录:打包后取 exe 目录;源码运行取项目目录
if getattr(sys, "frozen", False):
projectRoot = os.path.dirname(sys.executable)
else:
projectRoot = os.path.dirname(os.path.dirname(__file__))
logDir = os.path.join(projectRoot, "log")
_method_loggers = {} # 缓存“设备+方法”的 logger
# ---------- 工具:安全文本/文件名 ----------
@staticmethod
def _safe_text(obj) -> str:
"""把任意对象安全转为可写字符串(避免因编码问题再次抛异常)"""
try:
if isinstance(obj, bytes):
return obj.decode("utf-8", "replace")
s = str(obj)
# 确保解码上屏不再出错
_ = s.encode("utf-8", "replace")
return s
except Exception:
try:
return repr(obj)
except Exception:
return "<unprintable>"
@classmethod
def _safe_filename(cls, name: str, max_len: int = 80) -> str:
"""
将方法名/udid等转成安全文件名
- 允许字母数字、点、下划线、连字符
- 保留常见 CJK 字符(中日韩)
- 其余替换为下划线;合并下划线;避免保留名;限长
"""
if not name:
return "unknown"
name = str(name).strip()
name = re.sub(r'[\\/:*?"<>|\r\n\t]+', '_', name) # Windows 非法字符
name = re.sub(
r'[^a-zA-Z0-9_.\-'
r'\u4e00-\u9fff' # 中
r'\u3040-\u30ff' # 日
r'\uac00-\ud7a3' # 韩
r']+', '_', name
)
name = re.sub(r'_+', '_', name).strip(' _.')
name = name or "unknown"
if re.fullmatch(r'(?i)(CON|PRN|AUX|NUL|COM[1-9]|LPT[1-9])', name):
name = f"_{name}"
return name[:max_len] or "unknown"
# ---------- 设备级固定文件info/warning/error ----------
@classmethod
def _setupLogger(cls, udid, name, logName, level=logging.INFO):
"""创建或获取 logger并绑定到设备目录下的固定文件info.log / warning.log / error.log"""
udid_key = cls._safe_filename(udid or "system")
deviceLogDir = os.path.join(cls.logDir, udid_key)
os.makedirs(deviceLogDir, exist_ok=True)
logFile = os.path.join(deviceLogDir, logName)
logger_name = f"{udid_key}_{name}"
logger = logging.getLogger(logger_name)
logger.setLevel(level)
logger.propagate = False # 不向根 logger 传播,避免重复
# 避免重复添加同一路径的 file handler
abs_target = os.path.abspath(logFile)
for h in logger.handlers:
if isinstance(h, logging.FileHandler) and getattr(h, "baseFilename", "") == abs_target:
return logger
fileHandler = logging.FileHandler(logFile, mode="a", encoding="utf-8")
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
fileHandler.setFormatter(formatter)
logger.addHandler(fileHandler)
return logger
@classmethod
def info(cls, text, udid="system"):
msg = cls._safe_text(f"[{udid}] {text}")
cls._setupLogger(udid, "infoLogger", "info.log", level=logging.INFO).info(msg)
@classmethod
def warning(cls, text, udid="system"):
msg = cls._safe_text(f"[{udid}] {text}")
cls._setupLogger(udid, "warningLogger", "warning.log", level=logging.WARNING).warning(msg)
@classmethod
def error(cls, text, udid="system"):
msg = cls._safe_text(f"[{udid}] {text}")
cls._setupLogger(udid, "errorLogger", "error.log", level=logging.ERROR).error(msg)
# ---------- “设备+方法”独立文件:<udid>/<method>.log ----------
@classmethod
def _setupMethodLogger(cls, udid: str, method: str, level=logging.INFO):
"""
为某设备的某个方法单独创建 loggerlog/<udid>/<method>.log
"""
udid_key = cls._safe_filename(udid or "system")
method_key = cls._safe_filename(method or "general")
cache_key = (udid_key, method_key)
# 命中缓存
logger = cls._method_loggers.get(cache_key)
if logger:
return logger
deviceLogDir = os.path.join(cls.logDir, udid_key)
os.makedirs(deviceLogDir, exist_ok=True)
logFile = os.path.join(deviceLogDir, f"{method_key}.log")
logger_name = f"{udid_key}.{method_key}"
logger = logging.getLogger(logger_name)
logger.setLevel(level)
logger.propagate = False
abs_target = os.path.abspath(logFile)
for h in logger.handlers:
if isinstance(h, logging.FileHandler) and getattr(h, "baseFilename", "") == abs_target:
cls._method_loggers[cache_key] = logger
return logger
fileHandler = logging.FileHandler(logFile, mode="a", encoding="utf-8")
formatter = logging.Formatter(
"%(asctime)s - %(levelname)s - %(name)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
fileHandler.setFormatter(formatter)
logger.addHandler(fileHandler)
cls._method_loggers[cache_key] = logger
return logger
@classmethod
def method_info(cls, text, method, udid="system"):
msg = cls._safe_text(f"[{udid}][{method}] {text}")
cls._setupMethodLogger(udid, method, level=logging.INFO).info(msg)
@classmethod
def method_warning(cls, text, method, udid="system"):
msg = cls._safe_text(f"[{udid}][{method}] {text}")
cls._setupMethodLogger(udid, method, level=logging.WARNING).warning(msg)
@classmethod
def method_error(cls, text, method, udid="system"):
msg = cls._safe_text(f"[{udid}][{method}] {text}")
cls._setupMethodLogger(udid, method, level=logging.ERROR).error(msg)
# ---------- 清空日志 ----------
@classmethod
def clearLogs(cls):
print("清空日志")
"""启动时清空 log 目录下所有文件"""
# 先关闭所有 logger 的文件句柄
for _, logger in logging.Logger.manager.loggerDict.items():
if isinstance(logger, logging.Logger):
for handler in list(logger.handlers):
try:
handler.close()
except Exception:
pass
logger.removeHandler(handler)
log_path = Path(cls.logDir)
if log_path.exists():
for item in log_path.iterdir():
try:
if item.is_file():
item.unlink(missing_ok=True) # py>=3.8
elif item.is_dir():
shutil.rmtree(item, ignore_errors=True)
except Exception:
# 不阻塞清理
pass
cls._method_loggers.clear()
# ---------- 上传所有日志(内存打包 zip ----------
@classmethod
def upload_all_logs(cls, server_url, token, userId, tenantId):
"""
将 log/ 目录下所有日志打包为 zip内存上传至服务器
- headers: {"vvtoken": <token>}
- form: {"tenantId": <tenantId>, "userId": <userId>, "file": <zip>}
返回 True/False
"""
try:
log_path = Path(cls.logDir)
if not log_path.exists():
logging.info("[upload_all_logs] 日志目录不存在:%s", log_path)
return False
# 文件名仅用于表单,不落盘,可包含安全字符
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{timestamp}_logs.zip"
logging.info("[upload_all_logs] 打包文件名:%s", filename)
# 1) 内存中打包 zip
zip_buf = io.BytesIO()
with zipfile.ZipFile(zip_buf, "w", compression=zipfile.ZIP_DEFLATED) as zf:
for p in log_path.rglob("*"):
if p.is_file():
arcname = str(p.relative_to(log_path))
zf.write(p, arcname=arcname)
zip_bytes = zip_buf.getvalue()
# 2) 组织请求
headers = {"vvtoken": token} if token else {}
data = {"tenantId": tenantId, "userId": userId}
files = {"file": (filename, io.BytesIO(zip_bytes), "application/zip")}
# 3) 上传
resp = requests.post(server_url, headers=headers, data=data, files=files, timeout=120)
ok = False
try:
js = resp.json()
ok = bool(js.get("data"))
except Exception:
# 响应不是 JSON也许是纯文本降级按状态码判断
ok = resp.ok
if ok:
logging.info("[upload_all_logs] 上传成功:%s", server_url)
return True
else:
logging.error("[upload_all_logs] 上传失败status=%s, text=%s", resp.status_code, LogManager._safe_text(resp.text))
return False
except Exception as e:
logging.error("[upload_all_logs] 异常:%s", LogManager._safe_text(e))
return False

243
Utils/OCRUtils.py Normal file
View File

@@ -0,0 +1,243 @@
import os
import cv2
import numpy as np
from typing import List, Tuple, Union, Optional
from PIL import Image
ArrayLikeImage = Union[np.ndarray, str, Image.Image]
class OCRUtils:
@classmethod
def _to_gray(cls, img: ArrayLikeImage) -> np.ndarray:
"""
接受路径/np.ndarray/PIL.Image统一转为灰度 np.ndarray。
"""
# 路径
if isinstance(img, str):
arr = cv2.imread(img, cv2.IMREAD_GRAYSCALE)
if arr is None:
raise FileNotFoundError(f"图像加载失败,请检查路径: {img}")
return arr
# PIL.Image
if isinstance(img, Image.Image):
return cv2.cvtColor(np.array(img.convert("RGB")), cv2.COLOR_RGB2GRAY)
# numpy 数组
if isinstance(img, np.ndarray):
if img.ndim == 2:
return img # 已是灰度
if img.ndim == 3:
return cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
raise ValueError("不支持的图像维度(期望 2D 灰度或 3D BGR/RGB")
raise TypeError("large_image 类型必须是 str / np.ndarray / PIL.Image.Image")
@classmethod
def non_max_suppression(
cls,
boxes: List[List[float]],
scores: Optional[np.ndarray] = None,
overlapThresh: float = 0.5
) -> np.ndarray:
"""
boxes: [ [x1,y1,x2,y2], ... ]
scores: 每个框的置信度(用于“按分数做 NMS”。若为 None则退化为按 y2 排序的经典近似。
返回: 经过 NMS 保留的 boxes(int) ndarray形状 (N,4)
"""
if len(boxes) == 0:
return np.empty((0, 4), dtype=int)
boxes = np.asarray(boxes, dtype=np.float32)
x1, y1, x2, y2 = boxes.T
areas = (x2 - x1 + 1) * (y2 - y1 + 1)
if scores is None:
order = np.argsort(y2) # 经典写法
else:
scores = np.asarray(scores, dtype=np.float32)
order = np.argsort(scores)[::-1] # 分数从高到低
keep = []
while order.size > 0:
i = order[0] if scores is not None else order[-1]
keep.append(i)
rest = order[1:] if scores is not None else order[:-1]
xx1 = np.maximum(x1[i], x1[rest])
yy1 = np.maximum(y1[i], y1[rest])
xx2 = np.minimum(x2[i], x2[rest])
yy2 = np.minimum(y2[i], y2[rest])
w = np.maximum(0, xx2 - xx1 + 1)
h = np.maximum(0, yy2 - yy1 + 1)
inter = w * h
ovr = inter / areas[rest]
inds = np.where(ovr <= overlapThresh)[0]
order = rest[inds]
return boxes[keep].astype(int)
# @classmethod
# def find_template(
# cls,
# template_path: str,
# large_image: ArrayLikeImage,
# threshold: float = 0.8,
# overlapThresh: float = 0.5,
# return_boxes: bool = False
# ) -> Union[List[Tuple[int, int]], Tuple[List[Tuple[int, int]], np.ndarray]]:
# """
# 在 large_image 中查找 template_path 模板的位置。
# - large_image 可为文件路径、np.ndarray 或 PIL.Image
# - threshold: 模板匹配阈值TM_CCOEFF_NORMED
# - overlapThresh: NMS 重叠阈值
# - return_boxes: True 时同时返回保留的框数组 (N,4)
#
# 返回:
# centers 或 (centers, boxes)
# centers: [(cx, cy), ...]
# boxes: [[x1,y1,x2,y2], ...] (np.ndarray, int)
# """
# # 模板(灰度)
# template = cv2.imread(template_path, cv2.IMREAD_GRAYSCALE)
# if template is None:
# raise FileNotFoundError(f"模板图像加载失败,请检查路径: {template_path}")
#
# # 大图(灰度)
# gray = cls._to_gray(large_image)
#
# # 模板尺寸
# tw, th = template.shape[::-1]
#
# # 模板匹配(相关系数归一化)
# result = cv2.matchTemplate(gray, template, cv2.TM_CCOEFF_NORMED)
#
# # 阈值筛选
# ys, xs = np.where(result >= threshold)
# if len(xs) == 0:
# return ([], np.empty((0, 4), dtype=int)) if return_boxes else []
#
# # 收集候选框与分数
# boxes = []
# scores = []
# for (x, y) in zip(xs, ys):
# boxes.append([x, y, x + tw, y + th])
# scores.append(result[y, x])
#
# # 按分数做 NMS
# boxes_nms = cls.non_max_suppression(boxes, scores=np.array(scores), overlapThresh=overlapThresh)
#
# # 计算中心点
# centers = [((x1 + x2) // 2, (y1 + y2) // 2) for (x1, y1, x2, y2) in boxes_nms]
#
#
#
# if return_boxes:
# return centers, boxes_nms
#
#
# return centers
@classmethod
def find_template(
cls,
template_path: str,
large_image: ArrayLikeImage,
threshold: float = 0.8,
overlapThresh: float = 0.5,
return_boxes: bool = False
) -> Union[List[Tuple[int, int]], Tuple[List[Tuple[int, int]], np.ndarray]]:
"""
在 large_image 中查找 template_path 模板的位置。
- large_image 可为文件路径、np.ndarray 或 PIL.Image
- threshold: 模板匹配阈值TM_CCOEFF_NORMED
- overlapThresh: NMS 重叠阈值
- return_boxes: True 时同时返回保留的框数组 (N,4)
若检测结果为空,则在相同阈值下最多重试三次(共 3 次尝试)。
返回:
centers 或 (centers, boxes)
centers: [(cx, cy), ...]
boxes: [[x1,y1,x2,y2], ...] (np.ndarray, int)
"""
if not os.path.isfile(template_path):
print(f"模板文件不存在 → {template_path}")
raise FileNotFoundError(f"模板文件不存在 → {template_path}")
size = os.path.getsize(template_path)
if size == 0:
print(f"模板文件大小为 0 → {template_path} ")
raise ValueError(f"模板文件大小为 0 → {template_path}")
# 模板(灰度)
template = cv2.imread(template_path, cv2.IMREAD_GRAYSCALE)
if template is None:
raise FileNotFoundError(f"模板图像加载失败,请检查路径: {template_path}")
# 大图(灰度)
gray = cls._to_gray(large_image)
# 模板尺寸
tw, th = template.shape[::-1]
# 内部:执行一次匹配并返回 (centers, boxes_nms)
def _match_once(cur_threshold: float):
# 模板匹配(相关系数归一化)
result = cv2.matchTemplate(gray, template, cv2.TM_CCOEFF_NORMED)
# 阈值筛选
ys, xs = np.where(result >= cur_threshold)
if len(xs) == 0:
return [], np.empty((0, 4), dtype=int)
# 收集候选框与分数
boxes = []
scores = []
for (x, y) in zip(xs, ys):
boxes.append([int(x), int(y), int(x + tw), int(y + th)])
scores.append(float(result[y, x]))
# 按分数做 NMS
boxes_nms = cls.non_max_suppression(
boxes,
scores=np.asarray(scores, dtype=np.float32),
overlapThresh=overlapThresh
)
# 计算中心点(转为 Python int
centers = [(int((x1 + x2) // 2), int((y1 + y2) // 2))
for (x1, y1, x2, y2) in boxes_nms]
# 统一为 np.ndarray[int]
boxes_nms = np.asarray(boxes_nms, dtype=int)
return centers, boxes_nms
# ===== 重试控制(最多 3 次)=====
MAX_RETRIES = 3
THRESHOLD_DECAY = 0.0 # 如需越试越宽松,可改为 0.02~0.05;不需要则保持 0
MIN_THRESHOLD = 0.6
cur_threshold = float(threshold)
last_centers, last_boxes = [], np.empty((0, 4), dtype=int)
for attempt in range(MAX_RETRIES):
centers, boxes_nms = _match_once(cur_threshold)
if centers:
if return_boxes:
return centers, boxes_nms
return centers
# 记录最后一次(若最终失败按规范返回空)
last_centers, last_boxes = centers, boxes_nms
# 为下一次尝试准备(这里默认不衰减阈值;如需可打开 THRESHOLD_DECAY
if attempt < MAX_RETRIES - 1 and THRESHOLD_DECAY > 0.0:
cur_threshold = max(MIN_THRESHOLD, cur_threshold - THRESHOLD_DECAY)
# 全部尝试失败
if return_boxes:
return last_centers, last_boxes
return last_centers

147
Utils/Requester.py Normal file
View File

@@ -0,0 +1,147 @@
import requests
from Entity.Variables import prologueList, API_KEY
from Utils.IOSAIStorage import IOSAIStorage
from Utils.LogManager import LogManager
BaseUrl = "https://crawlclient.api.yolozs.com/api/common/"
class Requester():
comment = "comment"
prologue = "prologue"
@classmethod
def requestPrologue(cls, token):
try:
headers = {
"vvtoken": token,
}
url = BaseUrl + cls.prologue
result = requests.get(headers=headers, url=url, verify=False)
json = result.json()
data = json.get("data")
for i in data:
prologueList.append(i)
except Exception as e:
LogManager.method_error(f"获取requestPrologue失败,报错的原因:{e}", "获取requestPrologue异常")
# 翻译
@classmethod
def translation(cls, msg, country="英国"):
try:
if country == "":
country = "英国"
param = {
"msg": msg,
"country": country,
}
url = "https://ai.yolozs.com/translation"
result = requests.post(url=url, json=param, verify=False)
LogManager.info(f"翻译 请求的参数:{param}", "翻译")
LogManager.info(f"翻译,状态码:{result.status_code},服务器返回的内容:{result.text}", "翻译")
if result.status_code != 200:
LogManager.error(f"翻译失败,状态码:{result.status_code},服务器返回的内容:{result.text}")
return None
json = result.json()
data = json.get("data")
return data
except Exception as e:
LogManager.method_error(f"翻译失败,报错的原因:{e}", "翻译失败异常")
# 翻译
@classmethod
def translationToChinese(cls, msg):
try:
param = {
"msg": msg,
}
url = "https://ai.yolozs.com/translationToChinese"
result = requests.post(url=url, json=param, verify=False)
LogManager.info(f"翻译 请求的参数:{param}", "翻译")
LogManager.info(f"翻译,状态码:{result.status_code},服务器返回的内容:{result.text}", "翻译")
if result.status_code != 200:
LogManager.error(f"翻译失败,状态码:{result.status_code},服务器返回的内容:{result.text}")
return None
json = result.json()
data = json.get("data")
return data
except Exception as e:
LogManager.method_error(f"翻译失败,报错的原因:{e}", "翻译失败异常")
# ai聊天
@classmethod
def chatToAi(cls, param):
# aiConfig = JsonUtils.read_json("aiConfig")
aiConfig = IOSAIStorage.load("aiConfig.json")
agentName = aiConfig.get("agentName")
guildName = aiConfig.get("guildName")
contactTool = aiConfig.get("contactTool", "")
contact = aiConfig.get("contact", "")
age = aiConfig.get("age", 20)
sex = aiConfig.get("sex", "")
height = aiConfig.get("height", 160)
weight = aiConfig.get("weight", 55)
body_features = aiConfig.get("body_features", "")
nationality = aiConfig.get("nationality", "中国")
personality = aiConfig.get("personality", "")
strengths = aiConfig.get("strengths", "")
inputs = {
"name": agentName,
"Trade_union": guildName,
"contcat_method": contactTool,
"contcat_info": contact,
"age": age,
"sex": sex,
"height": height,
"weight": weight,
"body_features": body_features,
"nationality": nationality,
"personality": personality,
"strengths": strengths,
}
param["inputs"] = inputs
try:
# url = "https://ai.yolozs.com/chat"
url = "https://ai.yolozs.com/customchat"
result = requests.post(url=url, json=param, verify=False)
LogManager.method_info(f"ai聊天的参数{param}", "ai聊天")
print(f"ai聊天的参数{param}")
json = result.json()
data = json.get("answer", "")
session_id = json.get("conversation_id", "")
LogManager.method_info(f"ai聊天返回的内容{result.json()}", "ai聊天")
return data, session_id
except Exception as e:
LogManager.method_error(f"ai聊天失败,ai聊天出现异常,报错的原因:{e}", "ai聊天接口异常")

24
Utils/SubprocessKit.py Normal file
View File

@@ -0,0 +1,24 @@
import os
import subprocess
__all__ = ['check_output', 'popen', 'PIPE']
# 模块级单例,导入时只创建一次
if os.name == "nt":
_si = subprocess.STARTUPINFO()
_si.dwFlags |= subprocess.STARTF_USESHOWWINDOW
_si.wShowWindow = subprocess.SW_HIDE
else:
_si = None
PIPE = subprocess.PIPE
def check_output(cmd, **kw):
if os.name == "nt":
kw.setdefault('startupinfo', _si)
return subprocess.check_output(cmd, **kw)
def popen(*args, **kw):
if os.name == "nt":
kw.setdefault('startupinfo', _si)
return subprocess.Popen(*args, **kw)

327
Utils/TencentOCRUtils.py Normal file
View File

@@ -0,0 +1,327 @@
# -*- coding: utf-8 -*-
import base64
import hashlib
import hmac
import json
import os
import re
import socket
import time
from datetime import datetime, timezone
from http.client import HTTPSConnection
from typing import Any, Dict, List, Optional
Point = Dict[str, int]
ItemPolygon = Dict[str, int]
class TencentOCR:
"""腾讯云 OCR 封装,自动从环境变量或配置文件加载密钥"""
@staticmethod
def _load_secret() -> Dict[str, str]:
# 优先从环境变量读取
sid = "AKIDXw86q6D8pJYZOEvOm25wZy96oIZcQ1OX"
skey = "ye7MNAj4ub5PVO2TmriLkwtc8QTItGPO"
# 如果没有,就尝试从 ~/.tencent_ocr.json 加载
if not sid or not skey:
cfg_path = os.path.expanduser("~/.tencent_ocr.json")
if os.path.exists(cfg_path):
with open(cfg_path, "r", encoding="utf-8") as f:
cfg = json.load(f)
sid = sid or cfg.get("secret_id")
skey = skey or cfg.get("secret_key")
if not sid or not skey:
raise RuntimeError(
"❌ 未找到腾讯云 OCR 密钥,请设置环境变量 TENCENT_SECRET_ID / TENCENT_SECRET_KEY"
"或在用户目录下创建 ~/.tencent_ocr.json格式{\"secret_id\":\"...\",\"secret_key\":\"...\"}"
)
return {"secret_id": sid, "secret_key": skey}
@staticmethod
def _hmac_sha256(key: bytes, msg: str) -> bytes:
return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest()
@staticmethod
def _strip_data_uri_prefix(b64: str) -> str:
if "," in b64 and b64.strip().lower().startswith("data:"):
return b64.split(",", 1)[1]
return b64
@staticmethod
def _now_ts_and_date():
ts = int(time.time())
date = datetime.fromtimestamp(ts, tz=timezone.utc).strftime("%Y-%m-%d")
return ts, date
@staticmethod
def recognize(
*,
image_path: Optional[str] = None,
image_bytes: Optional[bytes] = None,
image_url: Optional[str] = None,
region: Optional[str] = None,
token: Optional[str] = None,
action: str = "GeneralBasicOCR",
version: str = "2018-11-19",
service: str = "ocr",
host: str = "ocr.tencentcloudapi.com",
timeout: int = 15,
) -> Dict[str, Any]:
"""
调用腾讯云 OCR三选一image_path / image_bytes / image_url
自动加载密钥(优先环境变量 -> ~/.tencent_ocr.json
"""
# 读取密钥
sec = TencentOCR._load_secret()
secret_id = sec["secret_id"]
secret_key = sec["secret_key"]
assert sum(v is not None for v in (image_path, image_bytes, image_url)) == 1, \
"必须且只能提供 image_path / image_bytes / image_url 之一"
# 1. payload
payload: Dict[str, Any] = {}
if image_url:
payload["ImageUrl"] = image_url
else:
if image_bytes is None:
with open(image_path, "rb") as f:
image_bytes = f.read()
img_b64 = base64.b64encode(image_bytes).decode("utf-8")
img_b64 = TencentOCR._strip_data_uri_prefix(img_b64)
payload["ImageBase64"] = img_b64
payload_str = json.dumps(payload, ensure_ascii=False, separators=(",", ":"))
# 2. 参数准备
algorithm = "TC3-HMAC-SHA256"
http_method = "POST"
canonical_uri = "/"
canonical_querystring = ""
content_type = "application/json; charset=utf-8"
signed_headers = "content-type;host;x-tc-action"
timestamp, date = TencentOCR._now_ts_and_date()
credential_scope = f"{date}/{service}/tc3_request"
# 3. 规范请求串
canonical_headers = (
f"content-type:{content_type}\n"
f"host:{host}\n"
f"x-tc-action:{action.lower()}\n"
)
hashed_request_payload = hashlib.sha256(payload_str.encode("utf-8")).hexdigest()
canonical_request = (
f"{http_method}\n{canonical_uri}\n{canonical_querystring}\n"
f"{canonical_headers}\n{signed_headers}\n{hashed_request_payload}"
)
# 4. 签名
hashed_canonical_request = hashlib.sha256(canonical_request.encode("utf-8")).hexdigest()
string_to_sign = (
f"{algorithm}\n{timestamp}\n{credential_scope}\n{hashed_canonical_request}"
)
secret_date = TencentOCR._hmac_sha256(("TC3" + secret_key).encode("utf-8"), date)
secret_service = hmac.new(secret_date, service.encode("utf-8"), hashlib.sha256).digest()
secret_signing = hmac.new(secret_service, b"tc3_request", hashlib.sha256).digest()
signature = hmac.new(secret_signing, string_to_sign.encode("utf-8"), hashlib.sha256).hexdigest()
authorization = (
f"{algorithm} "
f"Credential={secret_id}/{credential_scope}, "
f"SignedHeaders={signed_headers}, "
f"Signature={signature}"
)
# 5. headers
headers = {
"Authorization": authorization,
"Content-Type": content_type,
"Host": host,
"X-TC-Action": action,
"X-TC-Timestamp": str(timestamp),
"X-TC-Version": version,
}
if region:
headers["X-TC-Region"] = region
if token:
headers["X-TC-Token"] = token
# 6. 发请求
try:
conn = HTTPSConnection(host, timeout=timeout)
conn.request("POST", "/", body=payload_str.encode("utf-8"), headers=headers)
resp = conn.getresponse()
raw = resp.read().decode("utf-8", errors="replace")
try:
data = json.loads(raw)
except Exception:
data = {"NonJSONBody": raw}
return {
"http_status": resp.status,
"http_reason": resp.reason,
"headers": dict(resp.getheaders()),
"body": data,
}
except socket.gaierror as e:
return {"error": "DNS_RESOLUTION_FAILED", "detail": str(e)}
except socket.timeout:
return {"error": "NETWORK_TIMEOUT", "detail": f"Timeout after {timeout}s"}
except Exception as e:
return {"error": "REQUEST_FAILED", "detail": str(e)}
finally:
try:
conn.close()
except Exception:
pass
@staticmethod
def _norm(s: str) -> str:
return (s or "").strip().lstrip("@").lower()
@staticmethod
def _rect_from_polygon(poly: List[Point]) -> Optional[ItemPolygon]:
if not poly:
return None
xs = [p["X"] for p in poly]
ys = [p["Y"] for p in poly]
return {"X": min(xs), "Y": min(ys), "Width": max(xs) - min(xs), "Height": max(ys) - min(ys)}
@classmethod
def find_last_name_bbox(cls, ocr: Dict[str, Any], name: str) -> Optional[Dict[str, Any]]:
"""
从 OCR JSON 中找到指定名字的“最后一次”出现并返回坐标信息。
:param ocr: 完整 OCR JSON含 Response.TextDetections
:param name: 前端传入的名字,比如 'lee39160'
:return: dict 或 None例如
{
"index": 21,
"text": "lee39160",
"item": {"X": 248, "Y": 1701, "Width": 214, "Height": 49},
"polygon": [...],
"center": {"x": 355.0, "y": 1725.5}
}
"""
dets = (ocr.get("body") or ocr).get("Response", {}).get("TextDetections", [])
if not dets or not name:
return None
target = cls._norm(name)
found = -1
# 从后往前找最后一个严格匹配
for i in range(len(dets) - 1, -1, -1):
txt = cls._norm(dets[i].get("DetectedText", ""))
if txt == target:
found = i
break
# 兜底:再匹配原始文本(可能带 @
if found == -1:
for i in range(len(dets) - 1, -1, -1):
raw = (dets[i].get("DetectedText") or "").strip().lower()
if raw.lstrip("@") == target:
found = i
break
if found == -1:
return None
det = dets[found]
item: Optional[ItemPolygon] = det.get("ItemPolygon")
poly: List[Point] = det.get("Polygon") or []
# 没有 ItemPolygon 就从 Polygon 算
if not item:
item = cls._rect_from_polygon(poly)
if not item:
return None
center = {"x": item["X"] + item["Width"] / 2.0, "y": item["Y"] + item["Height"] / 2.0}
return {
"index": found,
"text": det.get("DetectedText", ""),
"item": item,
"polygon": poly,
"center": center,
}
@staticmethod
def _get_detections(ocr: Dict[str, Any]) -> List[Dict[str, Any]]:
"""兼容含 body 层的 OCR 结构,提取 TextDetections 列表"""
return (ocr.get("body") or ocr).get("Response", {}).get("TextDetections", []) or []
@staticmethod
def _norm_txt(s: str) -> str:
"""清洗文本:去空格"""
return (s or "").strip()
@classmethod
def slice_texts_between(
cls,
ocr: Dict[str, Any],
start_keyword: str = "切换账号",
end_keyword: str = "添加账号",
*,
username_like: bool = False, # True 时只保留像用户名的文本
min_conf: int = 0 # 置信度下限
) -> List[Dict[str, Any]]:
"""
返回位于 start_keyword 与 end_keyword 之间的所有文本项(不含两端),
每项保留原始 DetectedText、Confidence、ItemPolygon 等信息。
"""
dets = cls._get_detections(ocr)
if not dets:
return []
# 找“切换账号”最后一次出现的下标
start_idx = -1
for i, d in enumerate(dets):
txt = cls._norm_txt(d.get("DetectedText", ""))
if txt == start_keyword:
start_idx = i
# 找“添加账号”第一次出现的下标
end_idx = -1
for i, d in enumerate(dets):
txt = cls._norm_txt(d.get("DetectedText", ""))
if txt == end_keyword:
end_idx = i
break
if start_idx == -1 or end_idx == -1 or end_idx <= start_idx:
return []
# 提取两者之间的内容
mid = []
for d in dets[start_idx + 1:end_idx]:
if int(d.get("Confidence", 0)) < min_conf:
continue
txt = cls._norm_txt(d.get("DetectedText", ""))
if not txt:
continue
mid.append(d)
if not username_like:
return mid
# 只保留像用户名的文本
pat = re.compile(r"^[A-Za-z0-9_.-]{3,}$")
filtered = [d for d in mid if pat.match(cls._norm_txt(d.get("DetectedText", "")))]
return filtered
if __name__ == "__main__":
result = TencentOCR.recognize(
image_path=r"C:\Users\zhangkai\Desktop\last-item\iosai\test.png",
action="GeneralAccurateOCR",
)
print(json.dumps(result, ensure_ascii=False, indent=2))

226
Utils/ThreadManager.py Normal file
View File

@@ -0,0 +1,226 @@
# -*- coding: utf-8 -*-
import threading
import ctypes
import inspect
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Dict, Optional, List, Tuple, Any
from Utils.LogManager import LogManager
def _raise_async_exception(tid: int, exc_type) -> int:
if not inspect.isclass(exc_type):
raise TypeError("exc_type must be a class")
return ctypes.pythonapi.PyThreadState_SetAsyncExc(
ctypes.c_long(tid), ctypes.py_object(exc_type)
)
def _kill_thread_by_tid(tid: Optional[int]) -> bool:
if tid is None:
return False
res = _raise_async_exception(tid, SystemExit)
if res == 0:
return False
if res > 1:
_raise_async_exception(tid, None)
raise SystemError("PyThreadState_SetAsyncExc affected multiple threads; reverted.")
return True
class ThreadManager:
"""
- add(udid, thread_or_target, *args, **kwargs) -> (code, msg)
- stop(udid, join_timeout=2.0, retries=5, wait_step=0.2) -> (code, msg) # 强杀
- batch_stop(udids, join_timeout_each=2.0, retries_each=5, wait_step_each=0.2) -> (code, msg)
- get_thread / get_tid / is_running / list_udids
"""
_threads: Dict[str, threading.Thread] = {}
_lock = threading.RLock()
# ========== 基础 ==========
@classmethod
def add(cls, udid: str, thread_or_target: Any, *args, **kwargs) -> Tuple[int, str]:
"""
兼容两种用法:
1) add(udid, t) # t 是 threading.Thread 实例
2) add(udid, target, *args, **kwargs) # target 是可调用
返回:(200, "创建任务成功") / (1001, "任务已存在") / (1001, "创建任务失败")
"""
with cls._lock:
exist = cls._threads.get(udid)
if exist and exist.is_alive():
return 1001, "任务已存在"
if isinstance(thread_or_target, threading.Thread):
t = thread_or_target
try:
t.daemon = True
except Exception:
pass
if not t.name:
t.name = f"task-{udid}"
# 包装 run退出时从表移除
orig_run = t.run
def run_wrapper():
try:
orig_run()
finally:
with cls._lock:
if cls._threads.get(udid) is t:
cls._threads.pop(udid, None)
t.run = run_wrapper # type: ignore
else:
target = thread_or_target
def _wrapper():
try:
target(*args, **kwargs)
finally:
with cls._lock:
cur = cls._threads.get(udid)
if cur is threading.current_thread():
cls._threads.pop(udid, None)
t = threading.Thread(target=_wrapper, daemon=True, name=f"task-{udid}")
try:
t.start()
except Exception:
return 1001, "创建任务失败"
cls._threads[udid] = t
# 保留你原有的创建成功日志
try:
LogManager.method_info(f"创建任务成功 [{udid}]线程ID={t.ident}", "task")
except Exception:
pass
return 200, "创建任务成功"
@classmethod
def get_thread(cls, udid: str) -> Optional[threading.Thread]:
with cls._lock:
return cls._threads.get(udid)
@classmethod
def get_tid(cls, udid: str) -> Optional[int]:
t = cls.get_thread(udid)
return t.ident if t else None
@classmethod
def is_running(cls, udid: str) -> bool:
t = cls.get_thread(udid)
return bool(t and t.is_alive())
@classmethod
def list_udids(cls) -> List[str]:
with cls._lock:
return list(cls._threads.keys())
# ========== 内部:强杀一次 ==========
@classmethod
def _stop_once(cls, udid: str, join_timeout: float, retries: int, wait_step: float) -> bool:
"""
对指定 udid 执行一次强杀流程;返回 True=已停止/不存在False=仍存活或被拒。
"""
with cls._lock:
t = cls._threads.get(udid)
if not t:
return True # 视为已停止
main_tid = threading.main_thread().ident
cur_tid = threading.get_ident()
if t.ident in (main_tid, cur_tid):
return False
try:
_kill_thread_by_tid(t.ident)
except Exception:
pass
if join_timeout < 0:
join_timeout = 0.0
t.join(join_timeout)
while t.is_alive() and retries > 0:
evt = threading.Event()
evt.wait(wait_step)
retries -= 1
dead = not t.is_alive()
if dead:
with cls._lock:
if cls._threads.get(udid) is t:
cls._threads.pop(udid, None)
return dead
# ========== 对外stop / batch_stop均返回二元组 ==========
@classmethod
def stop(cls, udid: str, join_timeout: float = 2.0,
retries: int = 5, wait_step: float = 0.2) -> Tuple[int, str]:
"""
强杀单个:返回 (200, "stopped") 或 (1001, "failed")
"""
ok = cls._stop_once(udid, join_timeout, retries, wait_step)
if ok:
return 200, "stopped"
else:
return 1001, "failed"
@classmethod
def batch_stop(cls, udids: List[str]) -> Tuple[int, str, List[str]]:
"""
并行批量停止(简化版):
- 只接收 udids 参数
- 其他参数写死join_timeout=2.0, retries=5, wait_step=0.2
- 所有设备同时执行,失败的重试 3 轮,每轮间隔 1 秒
- 返回:
(200, "停止任务成功", [])
(1001, "停止任务失败", [失败udid...])
"""
if not udids:
return 200, "停止任务成功", []
join_timeout = 2.0
retries = 5
wait_step = 0.2
retry_rounds = 3
round_interval = 1.0
def _stop_one(u: str) -> Tuple[str, bool]:
ok = cls._stop_once(u, join_timeout, retries, wait_step)
return u, ok
# === 第一轮:并行执行所有设备 ===
fail: List[str] = []
with ThreadPoolExecutor(max_workers=len(udids)) as pool:
futures = [pool.submit(_stop_one, u) for u in udids]
for f in as_completed(futures):
u, ok = f.result()
if not ok:
fail.append(u)
# === 对失败的设备重试 3 轮(每轮间隔 1 秒) ===
for _ in range(retry_rounds):
if not fail:
break
time.sleep(round_interval)
remain: List[str] = []
with ThreadPoolExecutor(max_workers=len(fail)) as pool:
futures = [pool.submit(_stop_one, u) for u in fail]
for f in as_completed(futures):
u, ok = f.result()
if not ok:
remain.append(u)
fail = remain
# === 返回结果 ===
if not fail:
return 200, "停止任务成功", []
else:
return 1001, "停止任务失败", fail