Files
iOSAI/Utils/AiUtils.py
2025-09-19 19:39:32 +08:00

986 lines
37 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import os
import shlex
import subprocess
import time
from pathlib import Path
import cv2
import numpy as np
import unicodedata
import wda
from Entity.Variables import WdaAppBundleId
from Utils.LogManager import LogManager
import xml.etree.ElementTree as ET
import re, html
from lxml import etree
from wda import Client
# 工具类
class AiUtils(object):
# 在屏幕中找到对应的图片
@classmethod
def findImageInScreen(cls, target, udid):
try:
# 加载原始图像和模板图像
image_path = AiUtils.imagePathWithName(udid, "bgv") # 替换为你的图像路径
template_path = AiUtils.imagePathWithName("", target) # 替换为你的模板路径
# 读取图像和模板,确保它们都是单通道灰度图
image = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
template = cv2.imread(template_path, cv2.IMREAD_GRAYSCALE)
if image is None:
LogManager.error("加载背景图失败")
return -1, -1
if template is None:
LogManager.error("加载模板图失败")
return -1, -1
# 获取模板的宽度和高度
w, h = template.shape[::-1]
# 使用模板匹配方法
res = cv2.matchTemplate(image, template, cv2.TM_CCOEFF_NORMED)
threshold = 0.7 # 匹配度阈值,可以根据需要调整
loc = np.where(res >= threshold)
# 检查是否有匹配结果
if loc[0].size > 0:
# 取第一个匹配位置
pt = zip(*loc[::-1]).__next__() # 获取第一个匹配点的坐标
center_x = int(pt[0] + w // 2)
center_y = int(pt[1] + h // 2)
# print(f"第一个匹配到的小心心中心坐标: ({center_x}, {center_y})")
return center_x, center_y
else:
return -1, -1
except Exception as e:
LogManager.error(f"加载素材失败:{e}", udid)
print(e)
return -1, -1
# 使用正则查找字符串中的数字
@classmethod
def findNumber(cls, str):
# 使用正则表达式匹配数字
match = re.search(r'\d+', str)
if match:
return int(match.group()) # 将匹配到的数字转换为整数
return None # 如果没有找到数字,返回 None
# 选择截图
@classmethod
def screenshot(cls):
client = wda.USBClient("eca000fcb6f55d7ed9b4c524055214c26a7de7aa")
session = client.session()
image = session.screenshot()
image_path = "screenshot.png"
image.save(image_path)
image = cv2.imread(image_path)
# 如果图像过大,缩小显示
scale_percent = 50 # 缩小比例
width = int(image.shape[1] * scale_percent / 100)
height = int(image.shape[0] * scale_percent / 100)
dim = (width, height)
resized_image = cv2.resize(image, dim, interpolation=cv2.INTER_AREA)
# 创建一个窗口并显示缩小后的图像
cv2.namedWindow("Image")
cv2.imshow("Image", resized_image)
print("请在图像上选择爱心图标区域然后按Enter键确认。")
# 使用selectROI函数手动选择区域
roi = cv2.selectROI("Image", resized_image, showCrosshair=True, fromCenter=False)
# 将ROI坐标按原始图像尺寸放大
x, y, w, h = roi
x = int(x * image.shape[1] / resized_image.shape[1])
y = int(y * image.shape[0] / resized_image.shape[0])
w = int(w * image.shape[1] / resized_image.shape[1])
h = int(h * image.shape[0] / resized_image.shape[0])
# 根据选择的ROI提取爱心图标
if w > 0 and h > 0: # 确保选择的区域有宽度和高度
heart_icon = image[y:y + h, x:x + w]
# 转换为HSV颜色空间
hsv = cv2.cvtColor(heart_icon, cv2.COLOR_BGR2HSV)
# 定义红色的HSV范围
lower_red1 = np.array([0, 120, 70])
upper_red1 = np.array([10, 255, 255])
lower_red2 = np.array([170, 120, 70])
upper_red2 = np.array([180, 255, 255])
# 创建掩模
mask1 = cv2.inRange(hsv, lower_red1, upper_red1)
mask2 = cv2.inRange(hsv, lower_red2, upper_red2)
mask = mask1 + mask2
# 反转掩模,因为我们想要的是爱心图标,而不是背景
mask_inv = cv2.bitwise_not(mask)
# 应用掩模
heart_icon = cv2.bitwise_and(heart_icon, heart_icon, mask=mask_inv)
# 创建一个全透明的背景
height, width, channels = heart_icon.shape
roi = np.zeros((height, width, channels), dtype=np.uint8)
# 将爱心图标粘贴到透明背景上
for c in range(channels):
roi[:, :, c] = np.where(mask_inv == 255, heart_icon[:, :, c], roi[:, :, c])
# 图片名称
imgName = "temp.png"
# 保存结果
cv2.imwrite(imgName, roi)
# 显示结果
cv2.imshow("Heart Icon with Transparent Background", roi)
cv2.waitKey(0)
cv2.destroyAllWindows()
else:
print("未选择有效区域。")
# 根据名称获取图片完整地址
@classmethod
def imagePathWithName(cls, udid, name):
current_file_path = os.path.abspath(__file__)
# 获取当前文件所在的目录即script目录
current_dir = os.path.dirname(current_file_path)
# 由于script目录位于项目根目录下一级因此需要向上一级目录移动两次
project_root = os.path.abspath(os.path.join(current_dir, '..'))
# 构建资源文件的完整路径,向上两级目录,然后进入 resources 目录
resource_path = os.path.abspath(os.path.join(project_root, "resources", udid, name + ".png")).replace('/', '\\')
return resource_path
# 获取根目录
@classmethod
def getRootDir(cls):
current_file = os.path.abspath(__file__)
# 获取当前文件所在的目录
current_dir = os.path.dirname(current_file)
# 获取项目根目录(假设根目录是当前文件的父目录的父目录)
project_root = os.path.dirname(current_dir)
# 返回根目录
return project_root
# 创建一个以udid命名的目录
@classmethod
def makeUdidDir(cls, udid):
# 获取项目根目录
home = cls.getRootDir()
# 拼接 resources 目录的路径
resources_dir = os.path.join(home, "resources")
# 拼接 udid 目录的路径
udid_dir = os.path.join(resources_dir, udid)
# 检查 udid 目录是否存在,如果不存在则创建
if not os.path.exists(udid_dir):
try:
os.makedirs(udid_dir)
LogManager.info(f"目录 {udid_dir} 创建成功", udid)
print(f"目录 {udid_dir} 创建成功")
except Exception as e:
print(f"创建目录时出错: {e}")
LogManager.error(f"创建目录时出错: {e}", udid)
else:
LogManager.info(f"目录 {udid_dir} 已存在,跳过创建", udid)
print(f"目录 {udid_dir} 已存在,跳过创建")
# 查找首页按钮
# uuid 设备id
# click 是否点击该按钮
@classmethod
def findHomeButton(cls, udid="eca000fcb6f55d7ed9b4c524055214c26a7de7aa"):
client = wda.USBClient(udid)
session = client.session()
session.appium_settings({"snapshotMaxDepth": 10})
homeButton = session.xpath("//*[@label='首页']")
try:
if homeButton.label == "首页":
print("1.找到了")
return homeButton
else:
print("1.没找到")
return None
except Exception as e:
print(e)
return None
# 查找关闭按钮
@classmethod
def findLiveCloseButton(cls, udid="eca000fcb6f55d7ed9b4c524055214c26a7de7aa"):
client = wda.USBClient(udid)
session = client.session()
session.appium_settings({"snapshotMaxDepth": 10})
r = session.xpath("//XCUIElementTypeButton[@name='关闭屏幕']")
try:
if r.label == "关闭屏幕":
return r
else:
return None
except Exception as e:
print(e)
return None
# 获取直播间窗口数量
@classmethod
def count_add_by_xml(cls, session):
xml = session.source()
root = ET.fromstring(xml)
return sum(
1 for e in root.iter()
if e.get('type') in ('XCUIElementTypeButton', 'XCUIElementTypeImage')
and (e.get('name') == '添加' or e.get('label') == '添加')
and (e.get('visible') in (None, 'true'))
)
# 获取关注按钮
@classmethod
def getFollowButton(cls, session: Client):
# followButton = session.xpath("//Window[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[2]/Other[2]/Other[1]/Other[1]/Other[3]/Other[1]/Other[1]/Button[1]")
session.appium_settings({"snapshotMaxDepth": 20})
followButton = session.xpath('//XCUIElementTypeButton[@name="关注" or @label="关注"]')
if followButton.exists:
print("2.关注找到了")
LogManager.info("2.关注找到了")
return followButton
else:
print("2.关注没找到")
print("2.关注没找到")
return None
# 查找发消息按钮
@classmethod
def getSendMesageButton(cls, session: Client):
msgButton = session.xpath(
'//XCUIElementTypeButton['
'(@name="发消息" or @label="发消息" or '
'@name="发送 👋" or @label="发送 👋" or '
'@name="消息" or @label="消息")'
' and @visible="true"]'
)
if msgButton.exists:
print("3.发消息按钮找到了")
LogManager.info("3.发消息按钮找到了")
return msgButton
else:
print("3.发消息按钮没找到")
LogManager.info("3.发消息按钮没找到")
return None
# 获取当前屏幕上的节点
@classmethod
def getCurrentScreenSource(cls):
client = wda.USBClient("eca000fcb6f55d7ed9b4c524055214c26a7de7aa")
print(client.source())
# 查找app主页上的收件箱按钮
@classmethod
def getMsgBoxButton(cls, session: Client):
# box = session.xpath("//XCUIElementTypeButton[name='a11y_vo_inbox']")
box = session(xpath='//XCUIElementTypeButton[@name="a11y_vo_inbox"]')
if box.exists:
return box
else:
return None
# 获取收件箱中的未读消息数
@classmethod
def getUnReadMsgCount(cls, session: Client):
btn = cls.getMsgBoxButton(session)
print(f"btn:{btn}")
return cls.findNumber(btn.label)
@classmethod
def extract_messages_from_xml(cls, xml: str):
"""
解析 TikTok 聊天 XML返回当前屏幕可见的消息与时间分隔
[{"type":"time","text":"..."}, {"type":"msg","dir":"in|out","text":"..."}]
兼容 Table / CollectionView / ScrollView过滤系统提示/底部工具栏;可见性使用“重叠可视+容差”。
"""
if not isinstance(xml, str) or not xml.strip():
return []
try:
root = etree.fromstring(xml.encode("utf-8"))
except Exception:
return []
# ---------- 小工具 ----------
def get_text(el):
s = (el.get('label') or el.get('name') or el.get('value') or '') or ''
return html.unescape(s.strip())
def is_visible(el):
"""无 visible 属性按可见处理;有且为 'false' 才视为不可见。"""
v = el.get('visible')
return (v is None) or (v.lower() == 'true')
# ---------- 屏幕尺寸 ----------
app = root.xpath('/XCUIElementTypeApplication')
screen_w = cls.parse_float(app[0], 'width', 414.0) if app else 414.0
screen_h = cls.parse_float(app[0], 'height', 736.0) if app else 736.0
# ---------- 主容器探测(评分选择最像聊天区的容器) ----------
def pick_container():
cands = []
for xp, ctype in (
('//XCUIElementTypeTable', 'table'),
('//XCUIElementTypeCollectionView', 'collection'),
('//XCUIElementTypeScrollView', 'scroll'),
):
nodes = [n for n in root.xpath(xp) if is_visible(n)]
for n in nodes:
y = cls.parse_float(n, 'y', 0.0)
h = cls.parse_float(n, 'height', screen_h)
# Cell 数越多越像聊天列表;越靠中间越像
cells = n.xpath('.//XCUIElementTypeCell')
score = len(cells) * 10 - abs((y + h / 2) - screen_h / 2)
cands.append((score, n, ctype))
if cands:
cands.sort(key=lambda t: t[0], reverse=True)
return cands[0][1], cands[0][2]
return None, None
container, container_type = pick_container()
# ---------- 可视区area_top, area_bot ----------
if container is not None:
area_top = cls.parse_float(container, 'y', 0.0)
area_h = cls.parse_float(container, 'height', screen_h)
area_bot = area_top + area_h
else:
# 顶栏底缘作为上边界(选最靠上的宽>200的块
blocks = [n for n in root.xpath('//XCUIElementTypeOther[@y and @height and @width>="200"]') if
is_visible(n)]
area_top = 0.0
if blocks:
blocks.sort(key=lambda n: cls.parse_float(n, 'y', 0.0))
b = blocks[0]
area_top = cls.parse_float(b, 'y', 0.0) + cls.parse_float(b, 'height', 0.0)
# 输入框 TextView 顶边作为下边界
tvs = [n for n in root.xpath('//XCUIElementTypeTextView') if is_visible(n)]
if tvs:
tvs.sort(key=lambda n: cls.parse_float(n, 'y', 0.0))
area_bot = cls.parse_float(tvs[-1], 'y', screen_h)
else:
area_bot = screen_h
if area_bot - area_top < 100:
area_top, area_bot = 0.0, screen_h
def in_view(el) -> bool:
if not is_visible(el):
return False
y = cls.parse_float(el, 'y', -1e9)
h = cls.parse_float(el, 'height', 0.0)
by = y + h
tol = 8.0 # 容差,避免边缘误判
return not (by <= area_top + tol or y >= area_bot - tol)
# ---------- 时间分隔Header ----------
items = []
for t in root.xpath('//XCUIElementTypeStaticText[contains(@traits, "Header")]'):
if not in_view(t):
continue
txt = get_text(t)
if txt:
items.append({'type': 'time', 'text': txt, 'y': cls.parse_float(t, 'y', 0.0)})
# ---------- 系统提示/横幅过滤 ----------
EXCLUDES_LITERAL = {
'Heart', 'Lol', 'ThumbsUp',
'分享发布内容', '视频贴纸标签页', '双击发送表情', '贴纸',
}
SYSTEM_PATTERNS = [
r"(消息请求已被接受|你开始了和.*的聊天|你打开了这个与.*的聊天).*"
r"回复时接收通知", r"开启(私信)?通知", r"开启通知",
r"你打开了这个与 .* 的聊天。.*隐私",
r"在此用户接受你的消息请求之前,你最多只能发送 ?\d+ 条消息。?",
r"聊天消息条数已达上限,你将无法向该用户发送消息。?",
r"未发送$",
r"Turn on (DM|message|direct message)?\s*notifications",
r"Enable notifications",
r"Get notified when .* replies",
r"You opened this chat .* privacy",
r"Only \d+ message can be sent .* accepts .* request",
]
SYSTEM_RE = re.compile("|".join(SYSTEM_PATTERNS), re.IGNORECASE)
# 排除底部贴纸/GIF/分享栏(通常是位于底部、较矮的一排 CollectionView
def is_toolbar_like(o) -> bool:
txt = get_text(o)
if txt in EXCLUDES_LITERAL:
return True
y = cls.parse_float(o, 'y', 0.0)
h = cls.parse_float(o, 'height', 0.0)
near_bottom = (area_bot - (y + h)) < 48
is_short = h <= 40
return near_bottom and is_short
# ---------- 收集消息候选 ----------
msg_nodes = []
if container is not None:
# 容器内优先找 Cell 下的文本节点Other/StaticText/TextView
cand = container.xpath(
'.//XCUIElementTypeCell//*[self::XCUIElementTypeOther or self::XCUIElementTypeStaticText or self::XCUIElementTypeTextView]'
'[@y and (@name or @label or @value)]'
)
for o in cand:
if not in_view(o):
continue
if is_toolbar_like(o):
continue
txt = get_text(o)
if not txt or SYSTEM_RE.search(txt):
continue
msg_nodes.append(o)
else:
# 全局兜底:排除直接挂在 CollectionView底部工具栏下的节点
cand = root.xpath(
'//XCUIElementTypeOther[@y and (@name or @label or @value)]'
' | //XCUIElementTypeStaticText[@y and (@name or @label or @value)]'
' | //XCUIElementTypeTextView[@y and (@name or @label or @value)]'
)
for o in cand:
p = o.getparent()
if p is not None and p.get('type') == 'XCUIElementTypeCollectionView':
continue
if not in_view(o) or is_toolbar_like(o):
continue
txt = get_text(o)
if not txt or SYSTEM_RE.search(txt):
continue
msg_nodes.append(o)
# ---------- 方向判定 & 组装 ----------
for o in msg_nodes:
txt = get_text(o)
if not txt or txt in EXCLUDES_LITERAL:
continue
# 找所在 Cell用于查头像
cell = o.getparent()
while cell is not None and cell.get('type') != 'XCUIElementTypeCell':
cell = cell.getparent()
x = cls.parse_float(o, 'x', 0.0)
y = cls.parse_float(o, 'y', 0.0)
w = cls.parse_float(o, 'width', 0.0)
right_edge = x + w
direction = None
if cell is not None:
avatars = [a for a in cell.xpath(
'.//XCUIElementTypeButton[@visible="true" and (@name="图片头像" or @label="图片头像")]'
) if is_visible(a)]
if avatars:
ax = cls.parse_float(avatars[0], 'x', 0.0)
direction = 'in' if ax < (screen_w / 2) else 'out'
if direction is None:
direction = 'out' if right_edge > (screen_w * 0.75) else 'in'
items.append({'type': 'msg', 'dir': direction, 'text': txt, 'y': y})
# ---------- 排序 & 收尾 ----------
if items:
items.sort(key=lambda i: i.get('y', 0.0))
for it in items:
it.pop('y', None)
return items
@classmethod
def parse_float(cls, el, attr, default=0.0):
try:
v = el.get(attr)
if v is None:
return default
return float(v)
except Exception:
return default
# 从导航栏读取主播名称。找不到时返回空字符串
@classmethod
def get_navbar_anchor_name(cls, session, timeout: float = 2.0) -> str:
"""只从导航栏读取主播名称。找不到时返回空字符串。"""
# 可选:限制快照深度,提升解析速度/稳定性
try:
session.appium_settings({"snapshotMaxDepth": 22})
except Exception:
pass
def _text_of(el) -> str:
info = getattr(el, "info", {}) or {}
return (info.get("label") or info.get("name") or info.get("value") or "").strip()
def _clean_tail(s: str) -> str:
return re.sub(r"[,、,。.\s]+$", "", s).strip()
# 导航栏容器:从“返回”按钮向上找最近祖先,且该祖先内包含“更多/举报”(多语言兜底)
NAV_CONTAINER = (
"//XCUIElementTypeButton"
"[@name='返回' or @label='返回' or @name='Back' or @label='Back' or @name='戻る' or @label='戻る']"
"/ancestor::XCUIElementTypeOther"
"[ .//XCUIElementTypeButton"
" [@name='更多' or @label='更多' or @name='More' or @label='More' or "
" @name='その他' or @label='その他' or @name='詳細' or @label='詳細' or "
" @name='举报' or @label='举报' or @name='Report' or @label='Report' or "
" @name='報告' or @label='報告']"
"][1]"
)
# ① 优先:可访问的 Other自身有文本且不含子 Button更贴近 TikTok 的实现
XPATH_TITLE_OTHER = (
NAV_CONTAINER +
"//XCUIElementTypeOther[@accessible='true' and count(.//XCUIElementTypeButton)=0 "
" and (string-length(@name)>0 or string-length(@label)>0 or string-length(@value)>0)"
"][1]"
)
# ② 退路:第一个 StaticText
XPATH_TITLE_STATIC = (
NAV_CONTAINER +
"//XCUIElementTypeStaticText[string-length(@value)>0 or string-length(@label)>0 or string-length(@name)>0][1]"
)
# 尝试 ①
q = session.xpath(XPATH_TITLE_OTHER)
if q.wait(timeout):
t = _clean_tail(_text_of(q.get()))
if t:
return t
# 尝试 ②
q2 = session.xpath(XPATH_TITLE_STATIC)
if q2.wait(1.0):
t = _clean_tail(_text_of(q2.get()))
if t:
return t
return ""
# 检查字符串中是否包含中文
@classmethod
def contains_chinese(cls, text):
"""
判断字符串中是否包含中文字符。
参数:
text (str): 要检测的字符串。
返回:
bool: 如果字符串中包含中文,返回 True否则返回 False。
"""
# 使用正则表达式匹配中文字符
pattern = re.compile(r'[\u4e00-\u9fff]')
return bool(pattern.search(text))
@classmethod
def is_language(cls, text: str) -> bool:
if not text:
return False
for ch in text:
if unicodedata.category(ch).startswith("L"):
return True
return False
@classmethod
def _read_json_list(cls, file_path: Path) -> list:
"""读取为 list读取失败或不是 list 则返回空数组"""
if not file_path.exists():
return []
try:
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f)
return data if isinstance(data, list) else []
except Exception as e:
LogManager.error(f"[acList] 读取失败,将按空数组处理: {e}")
return []
@classmethod
def _write_json_list(cls, file_path: Path, data: list) -> None:
file_path.parent.mkdir(parents=True, exist_ok=True)
with open(file_path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
# @staticmethod
# def _normalize_anchor_items(items):
# """
# 规范化输入为 [{anchorId, country}] 的列表:
# - 允许传入:单个对象、对象列表、字符串(当 anchorId 用)
# - 过滤不合规项
# """
# result = []
# if items is None:
# return result
#
# if isinstance(items, dict):
# # 单个对象
# aid = items.get("anchorId")
# if aid:
# result.append({"anchorId": str(aid), "country": items.get("country", "")})
# return result
#
# if isinstance(items, list):
# for it in items:
# if isinstance(it, dict):
# aid = it.get("anchorId")
# if aid:
# result.append({"anchorId": str(aid), "country": it.get("country", "")})
# elif isinstance(it, str):
# result.append({"anchorId": it, "country": ""})
# return result
#
# if isinstance(items, str):
# result.append({"anchorId": items, "country": ""})
# return result
@staticmethod
def _normalize_anchor_items(items):
"""
规范化为 [{...}]
- 允许传入:单个对象、对象数组、字符串(当 anchorId 用)
- 保留原有字段,不限制只存 anchorId/country
- 字符串输入 → {"anchorId": xxx}
"""
result = []
if items is None:
return result
if isinstance(items, dict):
aid = items.get("anchorId")
if aid:
obj = dict(items) # 保留所有字段
result.append(obj)
return result
if isinstance(items, list):
for it in items:
if isinstance(it, dict):
aid = it.get("anchorId")
if aid:
obj = dict(it)
result.append(obj)
elif isinstance(it, str):
result.append({"anchorId": it})
return result
if isinstance(items, str):
result.append({"anchorId": items})
return result
# -------- 追加(对象数组平铺追加) --------
@classmethod
def save_aclist_flat_append(cls, acList, filename="log/acList.json"):
"""
将 anchor 对象数组平铺追加到 JSON 文件(数组)中。
文件固定写到 项目根目录/log/acList.json
"""
# 找到当前文件所在目录,回退到项目根目录
root_dir = Path(__file__).resolve().parent.parent # 根据实际层级调整
log_dir = root_dir
log_dir.mkdir(parents=True, exist_ok=True) # 确保 log 目录存在
file_path = log_dir / filename
data = cls._read_json_list(file_path)
# 规范化输入,确保都是 {anchorId, country}
to_add = cls._normalize_anchor_items(acList)
if not to_add:
LogManager.info("[acList] 传入为空或不合规,跳过写入")
return
data.extend(to_add)
cls._write_json_list(file_path, data)
LogManager.method_info(f"写入的路径是:{file_path}", "写入数据")
LogManager.info(f"[acList] 已追加 {len(to_add)} 条,当前总数={len(data)} -> {file_path}")
@classmethod
def pop_aclist_first(cls, filename="log/acList.json", mode="pop"):
"""
从 JSON 数组/对象(anchorList)中取出第一个 anchor 对象。
- mode="pop" : 取出并删除
- mode="move" : 取出并放到列表尾部
返回形如:{"anchorId": "...", "country": "...", ...}
"""
file_path = cls._resolve_path(filename)
if not file_path.exists():
return None
try:
raw = json.loads(file_path.read_text(encoding="utf-8-sig"))
except Exception as e:
LogManager.error(f"[pop_aclist_first] 读取失败: {e}")
return None
# 支持两种格式list 或 dict{anchorList:[...]}
if isinstance(raw, list):
data, wrapper = raw, None
elif isinstance(raw, dict) and isinstance(raw.get("anchorList"), list):
data, wrapper = raw["anchorList"], raw
else:
return None
if not data:
return None
# 取第一个
first = data.pop(0)
norm = cls._normalize_anchor_items(first)
first = norm[0] if norm else None
if first and mode == "move":
# 放到尾部
data.append(first)
# 写回
to_write = wrapper if wrapper is not None else data
file_path.write_text(
json.dumps(to_write, ensure_ascii=False, indent=2),
encoding="utf-8"
)
return first
@classmethod
def bulk_update_anchors(cls, updates, filename="log/acList.json", case_insensitive=False):
"""
批量修改(文件根必须是数组,沿用 _read_json_list 的约定)
- updates:
dict: {"id1": {...}, "id2": {...}}
list[dict]: [{"anchorId":"id1", ...}, {"anchorId":"id2", ...}]
- case_insensitive: True 时用小写比较 anchorId
返回: {"updated": <int>, "missing": [ids...], "file": "<实际命中的路径>"}
"""
def norm_id(x: str) -> str:
s = str(x).strip()
return s.lower() if case_insensitive else s
# ✅ 关键:使用你已有的 _resolve_path避免受 cwd 影响
file_path = cls._resolve_path(filename)
data = cls._read_json_list(file_path)
if not data:
return {"updated": 0, "missing": cls._collect_all_ids(updates), "file": str(file_path)}
# 1) 归一化 updates -> map[normalized_id] = patch
upd_map = {}
raw_ids = [] # 保留原始传入 id用于返回 missing 时回显
if isinstance(updates, dict):
for aid, patch in updates.items():
if aid and isinstance(patch, dict):
key = norm_id(aid)
raw_ids.append(str(aid))
patch = {k: v for k, v in patch.items() if k != "anchorId"}
if patch:
upd_map[key] = {**upd_map.get(key, {}), **patch}
elif isinstance(updates, list):
for it in updates:
if isinstance(it, dict) and it.get("anchorId"):
rid = str(it["anchorId"])
key = norm_id(rid)
raw_ids.append(rid)
patch = {k: v for k, v in it.items() if k != "anchorId"}
if patch:
upd_map[key] = {**upd_map.get(key, {}), **patch}
if not upd_map:
return {"updated": 0, "missing": [], "file": str(file_path)}
# 2) 建索引map[normalized_id] -> item
index = {}
for item in data:
if isinstance(item, dict) and "anchorId" in item:
key = norm_id(item.get("anchorId", ""))
if key:
index[key] = item
# 3) 执行更新
updated, seen = 0, set()
for key, patch in upd_map.items():
target = index.get(key)
if target is not None:
target.update(patch)
updated += 1
seen.add(key)
# 4) 写回
if updated > 0:
cls._write_json_list(file_path, data)
# 5) 计算未命中(按传入原始 ID 回显)
missing = []
for rid in raw_ids:
if norm_id(rid) not in seen:
missing.append(rid)
return {"updated": updated, "missing": missing, "file": str(file_path)}
@staticmethod
def _collect_all_ids(updates):
ids = []
if isinstance(updates, dict):
ids = [str(k) for k in updates.keys()]
elif isinstance(updates, list):
for it in updates:
if isinstance(it, dict) and it.get("anchorId"):
ids.append(str(it["anchorId"]))
return ids
@classmethod
def delete_anchors_by_ids(cls, ids: list[str], filename="log/acList.json") -> int:
"""
根据 anchorId 列表从根目录/log/acList.json 中删除匹配的 anchor。
返回删除数量。
"""
# 确保路径固定在根目录下的 log 文件夹
root_dir = Path(__file__).resolve().parent.parent
file_path = root_dir / "log" / filename
if not file_path.exists():
return 0
try:
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f)
if not isinstance(data, list):
return 0
except Exception as e:
LogManager.error(f"[delete_anchors_by_ids] 读取失败: {e}")
return 0
before = len(data)
# 保留不在 ids 里的对象
data = [d for d in data if isinstance(d, dict) and d.get("anchorId") not in ids]
deleted = before - len(data)
try:
file_path.parent.mkdir(parents=True, exist_ok=True)
with open(file_path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
except Exception as e:
LogManager.error(f"[delete_anchors_by_ids] 写入失败: {e}")
return deleted
# -------- 查看第一个(取出但不删除) --------
@staticmethod
def _resolve_path(p) -> Path:
p = Path(p)
if p.is_absolute():
return p
# 以项目根目录 (iOSAI) 为基准 —— script 的上一级
base = Path(__file__).resolve().parents[1]
return (base / p).resolve()
@classmethod
def peek_aclist_first(cls, filename="log/acList.json"):
file_path = cls._resolve_path(filename)
if not file_path.exists():
print(f"[peek] 文件不存在: {file_path}")
return None
try:
raw = file_path.read_text(encoding="utf-8-sig").strip()
if not raw:
return None
data = json.loads(raw)
arr = data if isinstance(data, list) else data.get("anchorList") if isinstance(data, dict) else None
if not arr:
return None
first = arr[0]
norm = cls._normalize_anchor_items(first)
return norm[0] if norm else None
except Exception as e:
print(f"[peek] 读取失败: {e}")
return None
@staticmethod
def run_tidevice_command(udid, action, bundle_id, timeout=30):
"""
执行tidevice命令的辅助函数
:param udid: 设备UDID
:param action: 动作类型 ('kill''launch')
:param bundle_id: 应用的Bundle ID
:param timeout: 命令执行超时时间(秒)
:return: (bool) 成功返回True失败返回False
"""
# 构建命令列表
cmd = ["tidevice", "--udid", udid, action, bundle_id]
try:
# 执行命令并捕获输出
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=timeout
)
# 检查命令是否成功执行返回码为0通常表示成功
if result.returncode == 0:
LogManager.info(f"Successfully {action}ed {bundle_id} on device {udid}.")
return True
else:
# 记录错误信息
LogManager.error(f"Failed to {action} {bundle_id} on device {udid}. Error: {result.stderr}")
return False
except subprocess.TimeoutExpired:
# 处理命令执行超时
LogManager.error(f"Command 'tidevice {action}' timed out after {timeout} seconds for device {udid}.")
return False
except FileNotFoundError:
# 处理tidevice命令未找到的情况通常意味着tidevice未安装或不在PATH中
LogManager.error("The 'tidevice' command was not found. Please ensure it is installed and in your system PATH.")
return False
except Exception as e:
# 捕获其他可能异常
LogManager.error(f"An unexpected error occurred while trying to {action} the app: {e}")
return False
@classmethod
def kill_wda(cls, udid, bundle_id="com.yolozsAgent.wda.xctrunner"):
"""
杀死指定设备上的WDA应用
:param udid: 设备UDID
:param bundle_id: WDA的Bundle ID默认为 com.yolozsAgent.wda.xctrunner
:return: (bool) 成功返回True失败返回False
"""
return cls.run_tidevice_command(udid, "kill", bundle_id)
@classmethod
def launch_wda(cls, udid, bundle_id="com.yolozsAgent.wda.xctrunner", timeout=60):
"""
启动指定设备上的WDA应用
:param udid: 设备UDID
:param bundle_id: WDA的Bundle ID默认为 com.yolozsAgent.wda.xctrunner
:param timeout: 启动命令超时时间默认为60秒启动可能较慢
:return: (bool) 成功返回True失败返回False
"""
return cls.run_tidevice_command(udid, "launch", bundle_id, timeout)