Files
iOSAI/script/ScriptManager.py

699 lines
28 KiB
Python
Raw Normal View History

2025-08-06 22:11:33 +08:00
import random
2025-08-13 20:20:13 +08:00
import re
2025-08-08 22:08:10 +08:00
import threading
2025-08-06 22:11:33 +08:00
import time
2025-08-08 22:08:10 +08:00
from enum import Enum
2025-08-01 13:43:51 +08:00
import wda
import os
2025-08-05 15:41:20 +08:00
from Utils.AiUtils import AiUtils
2025-08-06 22:11:33 +08:00
from Utils.ControlUtils import ControlUtils
from Utils.LogManager import LogManager
2025-08-13 20:20:13 +08:00
from Entity.Variables import anchorList, removeModelFromAnchorList, prologueList, anchorWithSession
2025-08-12 18:53:06 +08:00
from Utils.Requester import Requester
2025-08-01 13:43:51 +08:00
# 脚本管理类
class ScriptManager():
2025-08-05 15:41:20 +08:00
# 单利对象
_instance = None # 类变量,用于存储单例实例
def __new__(cls):
# 如果实例不存在,则创建一个新实例
if cls._instance is None:
cls._instance = super(ScriptManager, cls).__new__(cls)
# 返回已存在的实例
return cls._instance
2025-08-01 13:43:51 +08:00
def __init__(self):
super().__init__()
2025-08-05 15:41:20 +08:00
self.initialized = True # 标记已初始化
2025-08-01 13:43:51 +08:00
# 养号
2025-08-06 22:11:33 +08:00
def growAccount(self, udid, event):
2025-08-01 13:43:51 +08:00
client = wda.USBClient(udid)
session = client.session()
2025-08-08 22:08:10 +08:00
session.appium_settings({"snapshotMaxDepth": 0})
# 先关闭Tik Tok
ControlUtils.closeTikTok(session, udid)
time.sleep(1)
2025-08-06 22:11:33 +08:00
2025-08-08 22:08:10 +08:00
# 重新打开Tik Tok
ControlUtils.openTikTok(session, udid)
2025-08-06 22:11:33 +08:00
time.sleep(3)
# 创建udid名称的目录
AiUtils.makeUdidDir(udid)
2025-08-06 22:11:33 +08:00
# 假设此时有个开关
while not event.is_set():
try:
img = client.screenshot()
filePath = f"resources/{udid}/bgv.png"
2025-08-06 22:11:33 +08:00
img.save(filePath)
LogManager.info("保存屏幕图像成功", udid)
print("保存了背景图")
time.sleep(1)
2025-08-06 22:11:33 +08:00
except Exception as e:
LogManager.error(e, udid)
2025-08-06 22:11:33 +08:00
print(e)
try:
# 判断视频类型
addX, addY = AiUtils.findImageInScreen("add", udid)
2025-08-11 22:06:48 +08:00
# 多次获取结果是否一致,如果有不一致的结果就切换视频
isSame = False
for i in range(2):
tx, ty = AiUtils.findImageInScreen("add", udid)
if addX == tx and addY == ty:
isSame = True
time.sleep(1)
else:
isSame = False
break
2025-08-06 22:11:33 +08:00
# 如果找到普通视频
if addX > 0 and isSame:
needLike = random.randint(0, 10)
# 查找首页按钮
homeButton = AiUtils.findHomeButton(udid)
if homeButton:
2025-08-07 21:37:46 +08:00
print("有首页按钮,查看视频")
videoTime = random.randint(5, 15)
time.sleep(videoTime)
2025-08-06 22:11:33 +08:00
# 百分之三的概率点赞
if needLike < 3:
print("点赞")
ControlUtils.clickLike(session, udid)
2025-08-06 22:11:33 +08:00
print("继续观看视频")
videoTime = random.randint(10, 30)
time.sleep(videoTime)
print("准备划到下一个视频")
client.swipe_up()
else:
2025-08-07 21:37:46 +08:00
print("找不到首页按钮。出错了")
else:
nextTime = random.randint(1, 5)
time.sleep(nextTime)
client.swipe_up()
except Exception as e:
print(f"发生异常:{e}")
client.swipe_up()
2025-08-06 22:11:33 +08:00
2025-08-08 22:08:10 +08:00
# 观看直播
2025-08-15 19:51:02 +08:00
def watchLiveForGrowth(self, udid, event, max_retries=None):
import time, random, wda
2025-08-08 22:08:10 +08:00
2025-08-15 19:51:02 +08:00
retry_count = 0
backoff_sec = 5 # 异常后冷却,避免频繁重启
2025-08-11 22:06:48 +08:00
while not event.is_set():
2025-08-15 19:51:02 +08:00
if max_retries is not None and retry_count >= max_retries:
LogManager.error(f"达到最大重试次数停止任务。retries={retry_count}", udid)
break
2025-08-11 22:06:48 +08:00
try:
2025-08-15 19:51:02 +08:00
# —— 每次重启都新建 client/session ——
client = wda.USBClient(udid)
session = client.session()
session.appium_settings({"snapshotMaxDepth": 15})
# 1) 先关再开
ControlUtils.closeTikTok(session, udid)
time.sleep(1)
ControlUtils.openTikTok(session, udid)
2025-08-11 22:06:48 +08:00
time.sleep(3)
2025-08-15 19:51:02 +08:00
# 2) 进入直播
live_button = session(xpath='//XCUIElementTypeButton[@name="直播"]')
if live_button.exists:
live_button.click()
else:
LogManager.error("无法找到直播间按钮 抛出异常 重新启动", udid)
# 抛出异常
raise Exception(f"找不到直播按钮,抛出异常 重新启动")
time.sleep(20)
# 3) 取分辨率;可选重建 session 规避句柄陈旧
size = session.window_size()
width, height = size.width, size.height
session = client.session()
# 4) 主循环:刷直播
while not event.is_set():
time.sleep(3)
2025-08-11 22:06:48 +08:00
2025-08-15 19:51:02 +08:00
# PK 直接划走
if session(xpath='//XCUIElementTypeOther[@name="kGBLInteractionViewMatchScoreBar"]').exists:
print("✅ 当前是 PK跳过")
session.swipe_up()
continue
2025-08-11 22:06:48 +08:00
2025-08-15 19:51:02 +08:00
# 计算直播显示窗口数量(主画面+连麦小窗)
count = AiUtils.count_add_by_xml(session)
print(f"检测到直播显示区域窗口数:{count}")
if count > 1:
print("❌ 多窗口(有人连麦/分屏),划走")
session.swipe_up()
continue
else:
print("✅ 单窗口,(20%概率)开始点赞")
# 随机点赞(仍保留中途保护)
if random.random() >= 0.90: # 你原来是 0.89,可自行调整概率
print("开始点赞")
for _ in range(random.randint(10, 30)):
# 中途转PK/连麦立即跳过
if session(xpath='//XCUIElementTypeOther[@name="kGBLInteractionViewMatchScoreBar"]').exists \
or AiUtils.count_add_by_xml(session) > 1:
print("❗ 中途发生 PK/连麦,跳过")
session.swipe_up()
break
x = width // 3 + random.randint(-10, 10)
y = height // 3 + random.randint(10, 20)
print("双击坐标:", x, y)
session.double_tap(x, y)
print("--------------------------------------------")
time.sleep(random.randint(180, 300))
2025-08-11 22:06:48 +08:00
session.swipe_up()
2025-08-15 19:51:02 +08:00
# 正常退出(外部 event 触发)
break
2025-08-11 22:06:48 +08:00
except Exception as e:
2025-08-15 19:51:02 +08:00
retry_count += 1
LogManager.error(f"watchLiveForGrowth 异常(第{retry_count}次):{repr(e)}", udid)
# 尝试轻量恢复一次,避免一些短暂性 session 失效
2025-08-11 22:06:48 +08:00
try:
2025-08-15 19:51:02 +08:00
client = wda.USBClient(udid)
_ = client.session()
2025-08-11 22:06:48 +08:00
except Exception:
2025-08-15 19:51:02 +08:00
pass
time.sleep(backoff_sec) # 冷却后整段流程重来
continue
# def watchLiveForGrowth(self, udid, event):
#
# client = wda.USBClient(udid)
# session = client.session()
#
# session.appium_settings({"snapshotMaxDepth": 15})
# # 先关闭Tik Tok
# ControlUtils.closeTikTok(session, udid)
# time.sleep(1)
#
# # 重新打开Tik Tok
# ControlUtils.openTikTok(session, udid)
# time.sleep(3)
# # 进入直播
# live_button = session(xpath='//XCUIElementTypeButton[@name="直播"]')
# if live_button.exists:
# live_button.click()
# else:
# LogManager.error(f"无法找到直播间按钮", udid)
# time.sleep(20)
#
# size = session.window_size()
# width, height = size.width, size.height
#
# # 可选:重新拉起 session规避偶发 Stale 会话
# session = client.session()
#
# while not event.is_set():
# try:
# time.sleep(3)
#
# # 如果处于 PK分数条直接划走
# if session(xpath='//XCUIElementTypeOther[@name="kGBLInteractionViewMatchScoreBar"]').exists:
# print("✅ 当前是 PK跳过")
# session.swipe_up()
# continue
#
# # 数直播显示区域窗口(主画面 + 连麦小窗)
# count = AiUtils.count_add_by_xml(session)
# print(f"检测到直播显示区域窗口数:{count}")
#
# if count > 1:
# print("❌ 多窗口(有人连麦/分屏),划走")
# session.swipe_up()
# continue
# else:
# print("✅ 单窗口(只有一个主播),(目前是20%概率)开始点赞")
#
# # 点赞仍保留中途转PK的保护
# if random.random() >= 0.89:
# print("开始点赞")
# for _ in range(random.randint(10, 30)):
# if session(xpath='//XCUIElementTypeOther[@name="kGBLInteractionViewMatchScoreBar"]').exists:
# print("❗ 中途开始 PK停止点赞并跳过")
# session.swipe_up()
# break
# if AiUtils.count_add_by_xml(session) > 1:
# print("❗ 中途开始 连麦,停止点赞并跳过")
# session.swipe_up()
# break
# x = width // 3 + random.randint(-10, 10)
# y = height // 3 + random.randint(10, 20)
# print("双击坐标:", x, y)
# session.double_tap(x, y)
#
# print("--------------------------------------------")
# time.sleep(random.randint(100, 300))
# session.swipe_up()
#
# except Exception as e:
# print("循环异常,重试:", repr(e))
# # 轻量恢复:重新获取 session避免因为快照或元素句柄失效卡死
# try:
# session = client.session()
# except Exception:
# time.sleep(2)
# session = client.session()
2025-08-11 22:06:48 +08:00
# 关注打招呼以及回复主播消息
2025-08-08 22:08:10 +08:00
def greetNewFollowers(self, udid, needReply, event):
client = wda.USBClient(udid)
session = client.session()
2025-08-15 19:51:02 +08:00
print(f"是否要自动回复消息:{needReply}")
2025-08-08 22:08:10 +08:00
# 先关闭Tik Tok
ControlUtils.closeTikTok(session, udid)
time.sleep(1)
# 重新打开Tik Tok
ControlUtils.openTikTok(session, udid)
time.sleep(3)
2025-08-12 18:53:06 +08:00
# 设置查找深度
session.appium_settings({"snapshotMaxDepth": 15})
2025-08-08 22:08:10 +08:00
# 点击搜索按钮
2025-08-11 22:06:48 +08:00
ControlUtils.clickSearch(session)
2025-08-14 14:46:52 +08:00
# 创建udid名称的目录
AiUtils.makeUdidDir(udid)
2025-08-11 22:06:48 +08:00
# 返回上一步
2025-08-12 22:03:08 +08:00
def goBack(count):
for i in range(count):
ControlUtils.clickBack(session)
time.sleep(2)
2025-08-11 22:06:48 +08:00
# 循环条件。1、 循环关闭 2、 数据处理完毕
while not event.is_set() or len(anchorList) > 0:
2025-08-12 22:03:08 +08:00
# 查找输入框
input = session.xpath('//XCUIElementTypeSearchField')
# 如果找到了输入框,就点击并且输入内容
input.click()
# 稍作停顿
time.sleep(1)
2025-08-12 18:53:06 +08:00
2025-08-11 22:06:48 +08:00
# 获取一个主播
anchor = anchorList[0]
aid = anchor.anchorId
2025-08-12 18:53:06 +08:00
anchorCountry = anchor.country
2025-08-11 22:06:48 +08:00
2025-08-12 18:53:06 +08:00
input.clear_text()
2025-08-11 22:06:48 +08:00
time.sleep(2)
# 输入主播id
2025-08-08 22:08:10 +08:00
input.set_text(aid + "\n")
# 切换UI查找深度
session.appium_settings({"snapshotMaxDepth": 25})
2025-08-11 22:06:48 +08:00
# 定位 "关注" 按钮 通过关注按钮的位置点击主播首页
2025-08-13 20:20:13 +08:00
follow_button = session.xpath("//XCUIElementTypeButton[@traits='Button' and @index='1']")
2025-08-12 22:03:08 +08:00
time.sleep(2)
if follow_button.exists:
2025-08-13 20:20:13 +08:00
print(follow_button.bounds)
# session.appium_settings({"snapshotMaxDepth": 10})
2025-08-11 22:06:48 +08:00
print("找到关注按钮!")
2025-08-13 20:20:13 +08:00
x = follow_button.bounds.x - 100
2025-08-11 22:06:48 +08:00
y = follow_button.bounds.y
2025-08-13 20:20:13 +08:00
print(x, y)
2025-08-11 22:06:48 +08:00
client.click(x, y)
2025-08-13 20:20:13 +08:00
print("进入主播首页啦")
2025-08-11 22:06:48 +08:00
else:
2025-08-12 22:03:08 +08:00
goBack(1)
removeModelFromAnchorList(anchor)
2025-08-11 22:06:48 +08:00
print("未找到关注按钮")
2025-08-12 22:03:08 +08:00
continue
2025-08-08 22:08:10 +08:00
time.sleep(3)
2025-08-13 20:20:13 +08:00
session.appium_settings({"snapshotMaxDepth": 25})
time.sleep(2)
2025-08-11 22:06:48 +08:00
# 找到并点击第一个视频
cellClickResult = ControlUtils.clickFirstVideoFromDetailPage(session)
2025-08-13 20:20:13 +08:00
time.sleep(2)
2025-08-08 22:08:10 +08:00
# 观看主播视频
def viewAnchorVideo():
2025-08-13 20:20:13 +08:00
print("开始查看视频,并且重新调整查询深度")
session.appium_settings({"snapshotMaxDepth": 5})
count = 3
2025-08-12 22:03:08 +08:00
while count != 0:
time.sleep(5)
2025-08-11 22:06:48 +08:00
img = client.screenshot()
time.sleep(1)
filePath = f"resources/{udid}/bgv.png"
img.save(filePath)
LogManager.info("保存屏幕图像成功", udid)
2025-08-12 22:03:08 +08:00
time.sleep(2)
# 查找add图标
2025-08-13 20:20:13 +08:00
r = ControlUtils.clickLike(session, udid)
# 点赞成功。
if r == True:
count -= 1
2025-08-11 22:06:48 +08:00
# 假装看几秒视频
2025-08-12 22:03:08 +08:00
time.sleep(5)
2025-08-14 13:49:28 +08:00
if count != 0:
client.swipe_up()
2025-08-12 22:03:08 +08:00
2025-08-13 20:20:13 +08:00
# 右滑返回
client.swipe_right()
2025-08-11 22:06:48 +08:00
2025-08-12 22:03:08 +08:00
# 如果打开视频失败。说明该主播没有视频
if cellClickResult == True:
2025-08-13 20:20:13 +08:00
# 观看主播视频
2025-08-14 15:40:17 +08:00
LogManager.info("去查看主播视频", udid)
2025-08-11 22:06:48 +08:00
viewAnchorVideo()
2025-08-13 20:20:13 +08:00
time.sleep(3)
LogManager.info("视频看完了,重置试图查询深度", udid)
2025-08-12 18:53:06 +08:00
session.appium_settings({"snapshotMaxDepth": 25})
2025-08-12 22:03:08 +08:00
# 点击关注按钮
followButton = AiUtils.getFollowButton(session)
2025-08-13 20:20:13 +08:00
if followButton is not None:
LogManager.info("找到关注按钮了", udid)
2025-08-12 22:03:08 +08:00
followButton.click()
else:
2025-08-13 20:20:13 +08:00
LogManager.info("没找到关注按钮", udid)
removeModelFromAnchorList(anchor)
goBack(3)
continue
2025-08-12 18:53:06 +08:00
time.sleep(2)
2025-08-13 20:20:13 +08:00
msgButton = AiUtils.getSendMesageButton(session)
2025-08-12 18:53:06 +08:00
time.sleep(2)
2025-08-13 20:20:13 +08:00
if msgButton is not None:
print("找到发消息按钮了")
# 进入聊天页面
msgButton.click()
else:
print("没有识别出发消息按钮")
removeModelFromAnchorList(anchor)
goBack(3)
continue
2025-08-11 22:06:48 +08:00
2025-08-13 20:20:13 +08:00
time.sleep(3)
2025-08-12 18:53:06 +08:00
# 查找聊天界面中的输入框节点
2025-08-13 20:20:13 +08:00
chatInput = session.xpath("//TextView")
2025-08-11 22:06:48 +08:00
if chatInput.exists:
2025-08-13 20:20:13 +08:00
print("找到输入框了, 准备发送一条打招呼消息")
# 准备打招呼的文案
2025-08-14 14:30:36 +08:00
text = random.choice(prologueList)
2025-08-13 20:20:13 +08:00
# 翻译成主播国家的语言
2025-08-14 14:30:36 +08:00
msg = Requester.translation(text, anchorCountry)
2025-08-12 18:53:06 +08:00
# 准备发送一条信息
chatInput.click()
time.sleep(2)
2025-08-13 20:20:13 +08:00
# 发送消息
2025-08-12 22:03:08 +08:00
chatInput.set_text(msg + "\n")
2025-08-12 18:53:06 +08:00
time.sleep(1)
2025-08-11 22:06:48 +08:00
else:
2025-08-13 20:20:13 +08:00
print("无法发送信息")
2025-08-14 14:30:36 +08:00
LogManager.error(f"给主播{anchor.anchorId} 发送消息失败", udid)
2025-08-13 20:20:13 +08:00
# 接着下一个主播
removeModelFromAnchorList(anchor)
goBack(4)
2025-08-11 22:06:48 +08:00
else:
2025-08-13 20:20:13 +08:00
print(f"{anchor.anchorId}:该主播没有视频")
2025-08-12 22:03:08 +08:00
# 删除当前数据
removeModelFromAnchorList(anchor)
2025-08-13 20:20:13 +08:00
goBack(3)
continue
2025-08-08 22:08:10 +08:00
2025-08-13 20:20:13 +08:00
# 设置查找深度
session.appium_settings({"snapshotMaxDepth": 15})
time.sleep(2)
2025-08-15 19:51:02 +08:00
print("即将要回复消息")
print(f"页面层级:{session.source()}")
2025-08-13 20:20:13 +08:00
if needReply:
print("如果需要回复主播消息。走此逻辑")
if AiUtils.getUnReadMsgCount(session) > 0:
2025-08-15 19:51:02 +08:00
print("监控回复消息")
2025-08-13 20:20:13 +08:00
# 执行回复消息逻辑
self.monitorMessages(session, udid)
2025-08-15 20:04:59 +08:00
# 判断是否有首页按钮
2025-08-13 20:20:13 +08:00
homeButton = AiUtils.findHomeButton(udid)
if homeButton.exists:
homeButton.click()
else:
ControlUtils.closeTikTok(session, udid)
time.sleep(2)
ControlUtils.openTikTok(session, udid)
time.sleep(3)
2025-08-15 19:51:02 +08:00
print("重新创建wda会话 防止wda会话失效")
client = wda.USBClient(udid)
session = client.session()
2025-08-13 20:20:13 +08:00
# 执行完成之后。继续点击搜索
session.appium_settings({"snapshotMaxDepth": 15})
# 点击搜索按钮
ControlUtils.clickSearch(session)
else:
session.appium_settings({"snapshotMaxDepth": 15})
# 点击搜索按钮
ControlUtils.clickSearch(session)
2025-08-15 20:04:59 +08:00
2025-08-14 15:40:17 +08:00
def replyMessages(self, udid, event):
client = wda.USBClient(udid)
session = client.session()
2025-08-13 20:20:13 +08:00
ControlUtils.closeTikTok(session, udid)
time.sleep(2)
2025-08-14 15:40:17 +08:00
ControlUtils.openTikTok(session, udid)
2025-08-13 20:20:13 +08:00
time.sleep(3)
2025-08-15 19:51:02 +08:00
while not event.is_set():
2025-08-14 15:40:17 +08:00
self.monitorMessages(session, udid)
2025-08-13 20:20:13 +08:00
2025-08-14 15:40:17 +08:00
# 检查未读消息并回复
def monitorMessages(self, session, udid):
2025-08-15 19:51:02 +08:00
LogManager.info("开始监控收件箱消息,", udid)
2025-08-13 20:20:13 +08:00
session.appium_settings({"snapshotMaxDepth": 7})
2025-08-15 19:51:02 +08:00
2025-08-13 20:20:13 +08:00
el = session(xpath='//XCUIElementTypeButton[@name="a11y_vo_inbox"]')
2025-08-15 19:51:02 +08:00
2025-08-13 20:20:13 +08:00
# 如果收件箱有消息 则进行点击
if el.exists:
m = re.search(r'(\d+)', el.label) # 抓到的第一个数字串
count = int(m.group(1)) if m else 0
if count:
el.click()
2025-08-15 19:51:02 +08:00
else:
LogManager.error(f"检测不到收件箱", udid)
2025-08-13 20:20:13 +08:00
time.sleep(3)
session.appium_settings({"snapshotMaxDepth": 22})
while True:
el = session(xpath='//XCUIElementTypeButton[@name="a11y_vo_inbox"]')
2025-08-15 19:51:02 +08:00
print("el", el)
if not el.exists:
LogManager.warning(f"检测不到收件箱", udid)
break
2025-08-13 20:20:13 +08:00
m = re.search(r'(\d+)', el.label) # 抓到的第一个数字串
count = int(m.group(1)) if m else 0
if not count:
2025-08-15 19:51:02 +08:00
LogManager.info(f"当前收件箱的总数量{count}", udid)
2025-08-13 20:20:13 +08:00
break
2025-08-15 19:51:02 +08:00
# 双击收件箱 定位到消息的位置
r = el.bounds # 可能是命名属性,也可能是 tuple
cx = int((r.x + r.width / 2) if hasattr(r, "x") else (r[0] + r[2] / 2))
cy = int((r.y + r.height / 2) if hasattr(r, "y") else (r[1] + r[3] / 2))
session.double_tap(cx, cy) # 可能抛异常:方法不存在
LogManager.info(f"双击收件箱 定位到信息", udid)
2025-08-13 20:20:13 +08:00
# 新粉丝
xp_new_fan_badge = (
"//XCUIElementTypeCell[.//XCUIElementTypeLink[@name='新粉丝']]"
"//XCUIElementTypeStaticText[string-length(@value)>0 and translate(@value,'0123456789','')='']"
)
# 活动
xp_activity_badge = (
"//XCUIElementTypeCell[.//XCUIElementTypeLink[@name='活动']]"
"//XCUIElementTypeStaticText[string-length(@value)>0 and translate(@value,'0123456789','')='']"
)
# 系统通知
xp_system_badge = (
"//XCUIElementTypeCell[.//XCUIElementTypeLink[@name='系统通知']]"
"//XCUIElementTypeStaticText[string-length(@value)>0 and translate(@value,'0123456789','')='']"
)
2025-08-15 19:51:02 +08:00
# 消息请求
xp_request_badge = (
"//XCUIElementTypeCell"
"[.//*[self::XCUIElementTypeLink or self::XCUIElementTypeStaticText]"
" [@name='消息请求' or @label='消息请求' or @value='消息请求']]"
"//XCUIElementTypeStaticText[string-length(@value)>0 and translate(@value,'0123456789','')='']"
)
2025-08-13 20:20:13 +08:00
# 用户消息
xp_badge_numeric = (
2025-08-15 19:51:02 +08:00
'//XCUIElementTypeOther['
' @name="AWEIMChatListCellUnreadCountViewComponent"'
' or @name="TikTokIMImpl.InboxCellUnreadCountViewBuilder"'
']//XCUIElementTypeStaticText[@value and translate(@value,"0123456789","")=""]'
2025-08-13 20:20:13 +08:00
)
try:
# 如果 2 秒内找不到,会抛异常
badge_text = session.xpath(xp_new_fan_badge).get(timeout=2.0)
val = (badge_text.info.get("value") or
badge_text.info.get("label") or
badge_text.info.get("name"))
2025-08-15 19:51:02 +08:00
LogManager.info(f"新粉丝未读数量:{val}", udid)
2025-08-13 20:20:13 +08:00
if badge_text:
badge_text.tap()
time.sleep(1)
ControlUtils.clickBack(session)
time.sleep(1)
except Exception:
2025-08-15 19:51:02 +08:00
LogManager.warning("当前屏幕没有找到 新粉丝 未读徽标数字", udid)
2025-08-13 20:20:13 +08:00
badge_text = None
try:
# 如果 2 秒内找不到,会抛异常
badge_text = session.xpath(xp_activity_badge).get(timeout=2.0)
val = (badge_text.info.get("value") or
badge_text.info.get("label") or
badge_text.info.get("name"))
2025-08-15 19:51:02 +08:00
LogManager.info(f"活动未读数量:{val}", udid)
2025-08-13 20:20:13 +08:00
if badge_text:
badge_text.tap()
time.sleep(1)
ControlUtils.clickBack(session)
time.sleep(1)
except Exception:
2025-08-15 19:51:02 +08:00
LogManager.warning("当前屏幕没有找到 活动 未读徽标数字", udid)
2025-08-13 20:20:13 +08:00
badge_text = None
try:
# 如果 2 秒内找不到,会抛异常
badge_text = session.xpath(xp_system_badge).get(timeout=2.0)
val = (badge_text.info.get("value") or
badge_text.info.get("label") or
badge_text.info.get("name"))
2025-08-15 19:51:02 +08:00
LogManager.info(f"系统通知未读数量:{val}", udid)
2025-08-13 20:20:13 +08:00
if badge_text:
badge_text.tap()
time.sleep(1)
ControlUtils.clickBack(session)
time.sleep(1)
except Exception:
2025-08-15 19:51:02 +08:00
LogManager.warning("当前屏幕没有找到 系统通知 未读徽标数字", udid)
badge_text = None
try:
# 如果 2 秒内找不到,会抛异常
badge_text = session.xpath(xp_request_badge).get(timeout=2.0)
val = (badge_text.info.get("value") or
badge_text.info.get("label") or
badge_text.info.get("name"))
LogManager.info(f"消息请求未读数量:{val}", udid)
if badge_text:
badge_text.tap()
time.sleep(1)
ControlUtils.clickBack(session)
time.sleep(1)
except Exception:
LogManager.warning("当前屏幕没有找到 消息请求 未读徽标数字", udid)
2025-08-13 20:20:13 +08:00
badge_text = None
try:
# 如果 2 秒内找不到,会抛异常
badge_text = session.xpath(xp_badge_numeric).get(timeout=2.0)
val = (badge_text.info.get("value") or
badge_text.info.get("label") or
badge_text.info.get("name"))
2025-08-15 19:51:02 +08:00
LogManager.info(f"用户未读数量:{val}", udid)
2025-08-13 20:20:13 +08:00
if badge_text:
badge_text.tap()
time.sleep(3)
xml = session.source()
msgs = AiUtils.extract_messages_from_xml(xml)
# 检测出对方发的最后一条信息
last_msg_text = next(item['text'] for item in reversed(msgs) if item['type'] == 'msg')
# 向ai发送信息
# 获取主播的名称
anchor_name = AiUtils.get_navbar_anchor_name(session)
# 找到输入框
2025-08-15 19:51:02 +08:00
# sel = session.xpath(
# "//XCUIElementTypeTextView[@name='消息...' or @label='消息...' or @value='消息...']")
sel = session.xpath("//TextView")
2025-08-13 20:20:13 +08:00
if anchor_name not in anchorWithSession:
# 如果是第一次发消息(没有sessionId的情况)
response = Requester.chatToAi({"msg": last_msg_text})
aiResult = response['result']
sessionId = response['session_id']
anchorWithSession[anchor_name] = sessionId
# 找到输入框输入ai返回出来的消息
if sel.exists:
sel.click() # 聚焦
time.sleep(1)
sel.clear_text()
sel.set_text(aiResult + "\n")
else:
# 如果不是第一次发消息证明存储的有sessionId
sessionId = anchorWithSession[anchor_name]
response = Requester.chatToAi({"msg": last_msg_text, "sid": sessionId})
aiResult = response['result']
if sel.exists:
sel.click() # 聚焦
time.sleep(1)
sel.clear_text()
sel.set_text(aiResult + "\n")
time.sleep(1)
# 返回
ControlUtils.clickBack(session)
except Exception:
2025-08-15 19:51:02 +08:00
LogManager.warning("当前屏幕没有找到 用户 未读徽标数字", udid)
2025-08-13 20:20:13 +08:00
badge_text = None
2025-08-08 22:08:10 +08:00
2025-08-12 18:53:06 +08:00
def test(self, udid):
client = wda.USBClient(udid)
2025-08-12 22:03:08 +08:00
session = client.session()
session.appium_settings({"snapshotMaxDepth": 10})
print(client.source())
2025-08-08 22:08:10 +08:00
2025-08-12 22:03:08 +08:00
# manager = ScriptManager()
2025-08-14 15:40:17 +08:00
# manager.test("2335890f77fb51322361bd46b85f7fd1311aed53")
# manager.growAccount("2335890f77fb51322361bd46b85f7fd1311aed53")