Files
iOSAI/Module/DeviceInfo.py

289 lines
11 KiB
Python
Raw Normal View History

2025-08-15 20:04:59 +08:00
import os
2025-08-28 19:51:57 +08:00
import signal
2025-08-15 20:04:59 +08:00
import sys
2025-08-01 13:43:51 +08:00
import time
import wda
2025-08-15 20:04:59 +08:00
import threading
import subprocess
from pathlib import Path
from typing import List, Dict, Optional
2025-09-04 20:47:14 +08:00
from tidevice import Usbmux, ConnectionType
2025-09-08 13:48:21 +08:00
from tidevice._device import BaseDevice
2025-08-01 13:43:51 +08:00
from Entity.DeviceModel import DeviceModel
2025-08-14 15:51:17 +08:00
from Entity.Variables import WdaAppBundleId
2025-08-01 13:43:51 +08:00
from Module.FlaskSubprocessManager import FlaskSubprocessManager
from Utils.LogManager import LogManager
2025-08-01 13:43:51 +08:00
class Deviceinfo(object):
def __init__(self):
self.deviceIndex = 0
self.screenProxy = 9110
2025-08-15 20:04:59 +08:00
self.pidList: List[Dict] = []
self.deviceArray: List = []
2025-08-01 13:43:51 +08:00
self.manager = FlaskSubprocessManager.get_instance()
2025-08-15 20:04:59 +08:00
self.deviceModelList: List[DeviceModel] = []
2025-08-18 22:20:23 +08:00
self.maxDeviceCount = 6
2025-08-28 19:51:57 +08:00
self._lock = threading.Lock()
2025-09-08 13:48:21 +08:00
self._pending_udids = set()
2025-08-18 22:20:23 +08:00
try:
2025-09-08 13:48:21 +08:00
self.iproxy_path = self._iproxy_path()
2025-08-18 22:20:23 +08:00
self.iproxy_dir = self.iproxy_path.parent
os.environ["PATH"] = str(self.iproxy_dir) + os.pathsep + os.environ.get("PATH", "")
try:
os.add_dll_directory(str(self.iproxy_dir))
except Exception:
pass
self._creationflags = 0x08000000 if os.name == "nt" else 0
self._popen_kwargs = dict(
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=str(self.iproxy_dir),
shell=False,
text=True,
2025-08-28 15:46:17 +08:00
creationflags=self._creationflags,
2025-08-18 22:20:23 +08:00
encoding="utf-8",
bufsize=1,
)
def _spawn_iproxy(udid: str, local_port: int, remote_port: int = 9100) -> subprocess.Popen:
args = [str(self.iproxy_path), "-u", udid, str(local_port), str(remote_port)]
p = subprocess.Popen(args, **self._popen_kwargs)
def _pipe_to_log(name: str, stream):
try:
for line in iter(stream.readline, ''):
s = line.strip()
if s:
LogManager.info(f"[iproxy {name}] {s}", udid)
except Exception:
pass
try:
threading.Thread(target=_pipe_to_log, args=("STDOUT", p.stdout), daemon=True).start()
threading.Thread(target=_pipe_to_log, args=("STDERR", p.stderr), daemon=True).start()
except Exception:
pass
return p
2025-09-08 13:48:21 +08:00
self._spawn_iproxy = _spawn_iproxy
2025-08-18 22:20:23 +08:00
LogManager.info(f"iproxy 启动器已就绪,目录: {self.iproxy_dir}")
except Exception as e:
self.iproxy_path = None
self.iproxy_dir = None
self._spawn_iproxy = None
LogManager.error(f"初始化 iproxy 失败:{e}")
2025-08-01 13:43:51 +08:00
2025-08-15 20:04:59 +08:00
# ----------------------------
# 监听设备连接(死循环,内部捕获异常)
# ----------------------------
2025-08-01 13:43:51 +08:00
def startDeviceListener(self):
while True:
2025-08-15 20:04:59 +08:00
try:
lists = Usbmux().device_list()
except Exception as e:
2025-09-08 13:48:21 +08:00
LogManager.warning(
f"usbmuxd 连接失败: {e}。请确认已安装 iTunes/Apple Mobile Device Support并在手机上“信任此电脑”")
2025-08-15 20:04:59 +08:00
time.sleep(2)
continue
2025-09-08 13:48:21 +08:00
now_udids = {d.udid for d in lists if d.conn_type == ConnectionType.USB}
# 1. 处理“已插入但未信任”的设备,一旦信任就补连接
for udid in list(self._pending_udids):
if udid not in now_udids:
# 设备已拔出,从 pending 移除
self._pending_udids.discard(udid)
continue
if self.is_device_trusted(udid):
# 已信任,补连接
self._pending_udids.discard(udid)
self.screenProxy += 1
try:
self.connectDevice(udid)
# 补加入 deviceArray用 usbmux 对象)
for d in lists:
if d.udid == udid:
self.deviceArray.append(d)
break
except Exception as e:
LogManager.error(f"补连接设备失败 {udid}: {e}", udid)
# 2. 处理全新插入的设备
2025-08-01 13:43:51 +08:00
for device in lists:
2025-09-08 13:48:21 +08:00
if device.conn_type == ConnectionType.USB and device not in self.deviceArray and len(
self.deviceArray) < self.maxDeviceCount:
if not self.is_device_trusted(device.udid):
# 未信任,记入 pending下次循环再判
self._pending_udids.add(device.udid)
LogManager.warning("设备未信任,已记录,等待信任后自动连接", device.udid)
continue
# 已信任,直接走完整流程
2025-08-01 13:43:51 +08:00
self.screenProxy += 1
2025-08-15 20:04:59 +08:00
try:
self.connectDevice(device.udid)
self.deviceArray.append(device)
except Exception as e:
LogManager.error(f"连接设备失败 {device.udid}: {e}", device.udid)
2025-09-04 20:47:14 +08:00
2025-09-08 13:48:21 +08:00
# 3. 处理拔出
2025-08-15 20:04:59 +08:00
self._removeDisconnected(lists)
2025-08-01 13:43:51 +08:00
time.sleep(1)
2025-08-15 20:04:59 +08:00
# ----------------------------
2025-09-08 13:48:21 +08:00
# 判断设备是否已信任
# ----------------------------
def is_device_trusted(self, udid: str) -> bool:
try:
d = BaseDevice(udid)
d.get_value("DeviceName") # 任意读取一个值,失败即未信任
return True
except Exception:
return False
# ----------------------------
# 连接单台设备:先判断是否信任,再启动 WDA
2025-08-15 20:04:59 +08:00
# ----------------------------
def connectDevice(self, identifier: str):
2025-09-08 13:48:21 +08:00
if not self.is_device_trusted(identifier):
LogManager.warning("设备未信任,跳过 WDA 启动,等待信任后再试", identifier)
return
try:
d = wda.USBClient(identifier, 8100)
2025-08-15 20:04:59 +08:00
LogManager.info("启动 WDA 成功", identifier)
except Exception as e:
LogManager.error(f"启动 WDA 失败请检查手机是否已信任、WDA 是否正常。错误: {e}", identifier)
2025-09-08 13:48:21 +08:00
return
2025-08-13 20:20:13 +08:00
2025-08-15 20:04:59 +08:00
width, height, scale = 0, 0, 1.0
try:
2025-08-13 20:20:13 +08:00
size = d.window_size()
2025-08-15 20:04:59 +08:00
width, height = size.width, size.height
2025-08-13 20:20:13 +08:00
scale = d.scale
2025-08-15 20:04:59 +08:00
except Exception as e:
LogManager.warning(f"读取屏幕信息失败:{e}", identifier)
model = DeviceModel(identifier, self.screenProxy, width, height, scale, type=1)
self.deviceModelList.append(model)
try:
2025-08-13 20:20:13 +08:00
self.manager.send(model.toDict())
2025-08-15 20:04:59 +08:00
except Exception as e:
LogManager.warning(f"向前端发送设备模型失败:{e}", identifier)
2025-08-13 20:20:13 +08:00
2025-08-15 20:04:59 +08:00
try:
d.app_start(WdaAppBundleId)
d.home()
except Exception as e:
2025-08-15 20:04:59 +08:00
LogManager.warning(f"启动/切回桌面失败:{e}", identifier)
2025-08-06 22:11:33 +08:00
time.sleep(2)
2025-08-15 20:04:59 +08:00
2025-08-13 20:20:13 +08:00
target = self.relayDeviceScreenPort(identifier)
2025-08-28 19:51:57 +08:00
if target is not None:
with self._lock:
self.pidList.append({"target": target, "id": identifier})
2025-09-08 13:48:21 +08:00
# ----------------------------
# 以下方法未改动,省略以节省篇幅
# ----------------------------
2025-08-28 19:51:57 +08:00
def _terminate_proc(self, p: subprocess.Popen):
if not p:
return
if p.poll() is not None:
return
try:
2025-09-08 13:48:21 +08:00
p.terminate()
2025-08-28 19:51:57 +08:00
p.wait(timeout=3)
except Exception:
try:
if os.name == "posix":
try:
os.killpg(os.getpgid(p.pid), signal.SIGKILL)
except Exception:
p.kill()
else:
2025-09-08 13:48:21 +08:00
p.kill()
p.wait(timeout=2)
2025-08-28 19:51:57 +08:00
except Exception:
pass
2025-08-15 20:04:59 +08:00
def _removeDisconnected(self, current_list):
2025-08-28 19:51:57 +08:00
try:
2025-09-10 22:01:15 +08:00
# 当前在线的 deviceId 集合(即 UDID
now_device_ids = {d.udid for d in current_list if hasattr(d, 'udid')}
2025-09-10 16:15:47 +08:00
2025-09-10 22:01:15 +08:00
# 上一次记录的 deviceId 集合
prev_device_ids = {model.deviceId for model in self.deviceModelList}
2025-08-28 19:51:57 +08:00
except Exception as e:
2025-09-10 22:01:15 +08:00
LogManager.error(f"收集 deviceId 失败:{e}", "")
2025-08-28 19:51:57 +08:00
return
2025-09-10 16:15:47 +08:00
2025-09-10 22:01:15 +08:00
removed_device_ids = prev_device_ids - now_device_ids
if not removed_device_ids:
2025-08-28 19:51:57 +08:00
return
2025-08-15 20:04:59 +08:00
2025-08-28 19:51:57 +08:00
with self._lock:
2025-09-10 16:15:47 +08:00
# 清理 deviceModelList
for model in list(self.deviceModelList):
2025-09-10 22:01:15 +08:00
if model.deviceId in removed_device_ids:
2025-09-10 16:15:47 +08:00
model.type = 2
try:
self.manager.send(model.toDict())
except Exception as e:
LogManager.warning(f"发送下线事件失败:{e}", model.deviceId)
self.deviceModelList.remove(model)
2025-08-28 19:51:57 +08:00
2025-09-10 16:15:47 +08:00
# 清理 pidList
2025-08-28 19:51:57 +08:00
survivors = []
2025-09-10 16:15:47 +08:00
for item in self.pidList:
2025-09-10 22:01:15 +08:00
if item.get("id") in removed_device_ids:
2025-09-10 16:15:47 +08:00
p = item.get("target")
2025-08-15 20:04:59 +08:00
try:
2025-08-28 19:51:57 +08:00
self._terminate_proc(p)
except Exception as e:
2025-09-10 16:15:47 +08:00
LogManager.warning(f"关闭 iproxy 异常:{e}", item.get("id"))
2025-08-28 19:51:57 +08:00
else:
2025-09-10 16:15:47 +08:00
survivors.append(item)
2025-08-28 19:51:57 +08:00
self.pidList = survivors
2025-08-15 20:04:59 +08:00
2025-09-10 22:01:15 +08:00
# 清理 deviceArray也统一用 udid
self.deviceArray = [d for d in self.deviceArray if getattr(d, 'udid', None) not in removed_device_ids]
2025-08-28 19:51:57 +08:00
2025-09-10 22:01:15 +08:00
# >>> 新增:记录剩余设备数量 <<<
LogManager.info(f"设备拔出完成,当前剩余设备数:{len(self.deviceModelList)}", "")
for device_id in removed_device_ids:
LogManager.info("设备已拔出,清理完成(下线通知 + 端口映射关闭 + 状态移除)", device_id)
2025-08-01 13:43:51 +08:00
2025-08-15 20:04:59 +08:00
def _base_dir(self) -> Path:
if getattr(sys, "frozen", False):
return Path(sys.executable).resolve().parent
2025-09-08 13:48:21 +08:00
return Path(__file__).resolve().parents[1]
2025-08-15 20:04:59 +08:00
def _iproxy_path(self) -> Path:
2025-08-18 15:51:09 +08:00
exe = "iproxy.exe" if os.name == "nt" else "iproxy"
2025-08-15 22:24:41 +08:00
base = self._base_dir()
candidates = [
2025-09-08 13:48:21 +08:00
base / "resources" / "iproxy" / exe,
2025-08-15 22:24:41 +08:00
]
for p in candidates:
if p.exists():
return p
2025-08-18 22:20:23 +08:00
raise FileNotFoundError(f"iproxy not found, tried: {[str(c) for c in candidates]}")
2025-08-15 20:04:59 +08:00
def relayDeviceScreenPort(self, udid: str) -> Optional[subprocess.Popen]:
2025-08-18 22:20:23 +08:00
if not self._spawn_iproxy:
LogManager.error("iproxy 启动器未就绪,无法建立端口映射(初始化时未找到 iproxy", udid)
return None
try:
p = self._spawn_iproxy(udid, self.screenProxy, 9100)
2025-08-15 20:04:59 +08:00
LogManager.info(f"启动 iproxy 成功,本地 {self.screenProxy} -> 设备 9100", udid)
return p
2025-08-01 13:43:51 +08:00
except Exception as e:
2025-08-15 20:04:59 +08:00
LogManager.error(f"启动 iproxy 失败:{e}", udid)
return None