Files
iOSAI/Module/DeviceInfo.py

438 lines
16 KiB
Python
Raw Normal View History

2025-08-15 20:04:59 +08:00
import os
2025-08-28 19:51:57 +08:00
import signal
2025-09-22 14:36:05 +08:00
import subprocess
2025-10-21 15:43:02 +08:00
import threading
2025-08-01 13:43:51 +08:00
import time
2025-09-17 22:23:57 +08:00
from concurrent.futures import ThreadPoolExecutor, as_completed
2025-08-15 20:04:59 +08:00
from pathlib import Path
2025-09-22 14:36:05 +08:00
from typing import Dict, Optional, List
2025-09-24 16:32:05 +08:00
import tidevice
2025-09-18 21:31:23 +08:00
import wda
2025-09-04 20:47:14 +08:00
from tidevice import Usbmux, ConnectionType
2025-09-08 13:48:21 +08:00
from tidevice._device import BaseDevice
2025-08-01 13:43:51 +08:00
from Entity.DeviceModel import DeviceModel
2025-10-21 15:43:02 +08:00
from Entity.Variables import WdaAppBundleId, wdaFunctionPort, wdaScreenPort
2025-08-01 13:43:51 +08:00
from Module.FlaskSubprocessManager import FlaskSubprocessManager
2025-10-21 15:43:02 +08:00
from Module.IOSActivator import IOSActivator
from Utils.LogManager import LogManager
2025-08-01 13:43:51 +08:00
2025-09-28 14:35:09 +08:00
import socket
import http.client
from collections import defaultdict
import psutil
2025-10-23 21:38:18 +08:00
import hashlib
2025-09-11 22:46:55 +08:00
2025-09-22 14:36:05 +08:00
class DeviceInfo:
2025-10-23 21:38:18 +08:00
REMOVE_GRACE_SEC = 5.0 # 设备离线宽限期(秒)
ADD_STABLE_SEC = 1.5 # 设备上线稳定期(秒)
ORPHAN_COOLDOWN = 3.0 # 拓扑变更后暂停孤儿清理(秒)
2025-08-01 13:43:51 +08:00
def __init__(self):
2025-09-22 14:36:05 +08:00
self._port = 9110
2025-09-23 20:17:33 +08:00
self._models: Dict[str, DeviceModel] = {}
self._procs: Dict[str, subprocess.Popen] = {}
2025-09-22 14:36:05 +08:00
self._manager = FlaskSubprocessManager.get_instance()
self._iproxy_path = self._find_iproxy()
self._pool = ThreadPoolExecutor(max_workers=6)
2025-09-28 14:35:09 +08:00
self._last_heal_check_ts = 0.0
self._heal_backoff: Dict[str, float] = defaultdict(float) # udid -> next_allowed_ts
2025-10-23 21:38:18 +08:00
# 并发保护 & 状态表
self._lock = threading.RLock()
self._port_by_udid: Dict[str, int] = {} # UDID -> local_port
self._pid_by_udid: Dict[str, int] = {} # UDID -> iproxy PID
# 抗抖:最近一次看到在线的时间 / 首次看到在线的时间
self._last_seen: Dict[str, float] = {} # udid -> ts
self._first_seen: Dict[str, float] = {} # udid -> ts(首次在线)
self._last_topology_change_ts = 0.0 # 最近一次“新增或真正移除”的时间
2025-09-22 14:36:05 +08:00
# ---------------- 主循环 ----------------
def listen(self):
2025-09-28 14:35:09 +08:00
orphan_gc_tick = 0
2025-09-15 22:40:45 +08:00
while True:
2025-10-23 21:38:18 +08:00
now = time.time()
try:
usb = Usbmux().device_list()
online_now = {d.udid for d in usb if d.conn_type == ConnectionType.USB}
except Exception as e:
# 如果拉设备列表失败,本轮不做增删(避免误杀)
LogManager.warning(f"device_list() 异常:{e}")
time.sleep(1)
continue
2025-09-28 14:35:09 +08:00
2025-10-23 21:38:18 +08:00
# 记录“看到”的时间戳
for u in online_now:
if u not in self._first_seen:
self._first_seen[u] = now
self._last_seen[u] = now
# 处理真正移除(连续缺席超过宽限期)
with self._lock:
known = set(self._models.keys())
for udid in list(known):
last = self._last_seen.get(udid, 0.0)
if udid not in online_now and (now - last) >= self.REMOVE_GRACE_SEC:
self._remove_device(udid) # 真正下线
self._last_topology_change_ts = now
# 处理真正新增(连续在线超过稳定期)
new_candidates = [u for u in online_now if u not in known]
to_add = [u for u in new_candidates if (now - self._first_seen.get(u, now)) >= self.ADD_STABLE_SEC]
if to_add:
futures = {self._pool.submit(self._add_device, u): u for u in to_add}
2025-09-22 14:36:05 +08:00
for f in as_completed(futures, timeout=30):
try:
f.result()
2025-10-23 21:38:18 +08:00
self._last_topology_change_ts = time.time()
2025-09-22 14:36:05 +08:00
except Exception as e:
LogManager.error(f"异步连接失败:{e}")
2025-09-28 14:35:09 +08:00
# 定期健康检查 + 自愈
self._check_and_heal_tunnels(interval=2.0)
2025-10-23 21:38:18 +08:00
# 每 10 次清理一次孤儿 iproxy但在拓扑变更后 N 秒暂停执行,避免插拔风暴期误杀)
2025-09-28 14:35:09 +08:00
orphan_gc_tick += 1
if orphan_gc_tick >= 10:
orphan_gc_tick = 0
2025-10-23 21:38:18 +08:00
if (time.time() - self._last_topology_change_ts) >= self.ORPHAN_COOLDOWN:
self._cleanup_orphan_iproxy()
2025-09-28 14:35:09 +08:00
2025-09-15 22:40:45 +08:00
time.sleep(1)
2025-10-23 21:38:18 +08:00
2025-09-22 14:36:05 +08:00
# ---------------- 新增设备 ----------------
def _add_device(self, udid: str):
if not self._trusted(udid):
return
2025-09-24 16:32:05 +08:00
r = self.startWda(udid)
if r is False:
LogManager.info("启动wda失败")
return
w, h, s = self._screen_info(udid)
if w == 0 or h == 0 or s == 0:
print("未获取到设备屏幕信息")
return
2025-10-21 16:55:40 +08:00
print("获取设备信息成功")
2025-10-23 21:38:18 +08:00
# 固定端口分配(加锁,避免竞态)
with self._lock:
port = self._alloc_port(udid)
2025-09-22 14:36:05 +08:00
proc = self._start_iproxy(udid, port)
if not proc:
2025-10-21 16:55:40 +08:00
print("启动iproxy失败")
2025-09-17 15:43:23 +08:00
return
2025-10-23 21:38:18 +08:00
with self._lock:
model = DeviceModel(deviceId=udid, screenPort=port,
width=w, height=h, scale=s, type=1)
model.ready = True
self._models[udid] = model
self._procs[udid] = proc
self._pid_by_udid[udid] = proc.pid
2025-10-21 16:55:40 +08:00
print("准备添加设备")
2025-09-22 14:36:05 +08:00
self._manager_send(model)
2025-10-23 21:38:18 +08:00
# ---------------- 移除设备(仅在宽限期后调用) ----------------
2025-09-22 14:36:05 +08:00
def _remove_device(self, udid: str):
2025-10-23 21:38:18 +08:00
with self._lock:
model = self._models.pop(udid, None)
proc = self._procs.pop(udid, None)
self._pid_by_udid.pop(udid, None)
# 不清 _port_by_udid端口下次仍复用前端更稳定
2025-09-22 14:36:05 +08:00
if not model:
return
2025-09-22 14:36:05 +08:00
model.type = 2
2025-10-23 21:38:18 +08:00
self._kill(proc)
2025-09-22 14:36:05 +08:00
self._manager_send(model)
2025-09-22 14:36:05 +08:00
# ---------------- 工具函数 ----------------
def _trusted(self, udid: str) -> bool:
try:
2025-09-22 14:36:05 +08:00
BaseDevice(udid).get_value("DeviceName")
return True
except Exception:
return False
2025-09-17 22:23:57 +08:00
2025-09-24 16:32:05 +08:00
def startWda(self, udid):
print("进入启动wda方法")
try:
dev = tidevice.Device(udid)
2025-10-21 15:43:02 +08:00
systemVersion = int(dev.product_version.split(".")[0])
# 判断运行wda的逻辑
if systemVersion > 17:
ios = IOSActivator()
threading.Thread(
target=ios.activate,
args=(udid,)
).start()
else:
dev.app_start(WdaAppBundleId)
2025-09-24 16:32:05 +08:00
print("启动wda成功")
time.sleep(3)
return True
except Exception as e:
print("启动wda遇到错误:", e)
return False
2025-09-22 14:36:05 +08:00
def _screen_info(self, udid: str):
2025-08-15 20:04:59 +08:00
try:
2025-10-21 15:43:02 +08:00
c = wda.USBClient(udid, wdaFunctionPort)
2025-09-24 16:32:05 +08:00
c.home()
2025-09-28 14:35:09 +08:00
size = c.window_size()
2025-09-22 14:36:05 +08:00
scale = c.scale
return int(size.width), int(size.height), float(scale)
2025-09-24 16:32:05 +08:00
except Exception as e:
2025-09-28 14:35:09 +08:00
print("获取设备信息遇到错误:", e)
2025-09-24 16:32:05 +08:00
return 0, 0, 0
2025-09-17 22:23:57 +08:00
2025-10-23 21:38:18 +08:00
# ---------------- 端口映射(保留你之前的“先杀后启”“隐藏黑窗”修复) ----------------
2025-09-22 14:36:05 +08:00
def _start_iproxy(self, udid: str, port: int) -> Optional[subprocess.Popen]:
2025-08-15 20:04:59 +08:00
try:
2025-10-23 21:38:18 +08:00
with self._lock:
old_pid = self._pid_by_udid.get(udid)
if old_pid:
self._kill_pid_gracefully(old_pid)
self._pid_by_udid.pop(udid, None)
time.sleep(0.2)
if not self._is_port_free(port):
port = self._pick_free_port(start=max(self._port, port))
creationflags = 0
startupinfo = None
if os.name == "nt":
creationflags = getattr(subprocess, "CREATE_NO_WINDOW", 0x08000000) | getattr(subprocess,
"CREATE_NEW_PROCESS_GROUP",
0x00000200)
si = subprocess.STARTUPINFO()
si.dwFlags |= subprocess.STARTF_USESHOWWINDOW
si.wShowWindow = 0
startupinfo = si
cmd = [self._iproxy_path, "-u", udid, str(port), str(wdaScreenPort)]
proc = subprocess.Popen(
cmd,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
creationflags=creationflags,
startupinfo=startupinfo
)
self._procs[udid] = proc
self._pid_by_udid[udid] = proc.pid
self._port_by_udid[udid] = port
return proc
2025-09-23 20:17:33 +08:00
except Exception as e:
print(e)
2025-09-22 14:36:05 +08:00
return None
2025-09-17 22:23:57 +08:00
2025-09-22 14:36:05 +08:00
def _kill(self, proc: Optional[subprocess.Popen]):
if not proc:
2025-09-20 20:07:16 +08:00
return
2025-09-11 22:46:55 +08:00
try:
2025-09-22 14:36:05 +08:00
proc.terminate()
proc.wait(timeout=2)
except Exception:
try:
os.kill(proc.pid, signal.SIGKILL)
except Exception:
pass
2025-09-20 20:07:16 +08:00
2025-10-23 21:38:18 +08:00
# ---------------- 端口分配(加锁 + 稳定端口) ----------------
def _alloc_port(self, udid: str) -> int:
"""
UDID 分配一个**稳定**的本地端口
- 同一 UDID 优先复用上次端口减少前端切换
- 初次分配使用 20000-45000 的稳定哈希起点向上探测空闲
"""
# 已有则直接复用
if udid in self._port_by_udid:
p = self._port_by_udid[udid]
if self._is_port_free(p):
return p
# 基于 UDID 计算稳定起点
h = int(hashlib.sha1(udid.encode("utf-8")).hexdigest(), 16)
start = 20000 + (h % 25000) # 20000~44999
# 避免和你类里默认的 9110 等端口冲突,向上找空闲
p = self._pick_free_port(start=start, limit=4000)
self._port_by_udid[udid] = p
return p
2025-09-20 20:07:16 +08:00
2025-09-22 14:36:05 +08:00
def _manager_send(self, model: DeviceModel):
2025-09-11 22:46:55 +08:00
try:
2025-09-22 14:36:05 +08:00
self._manager.send(model.toDict())
except Exception:
pass
2025-09-11 22:46:55 +08:00
2025-09-22 14:36:05 +08:00
def _find_iproxy(self) -> str:
base = Path(__file__).resolve().parent.parent
name = "iproxy.exe"
path = base / "resources" / "iproxy" / name
2025-09-23 20:17:33 +08:00
print(str(path))
2025-09-22 14:36:05 +08:00
if path.is_file():
return str(path)
raise FileNotFoundError(f"iproxy 不存在: {path}")
2025-09-16 15:31:55 +08:00
2025-10-23 21:38:18 +08:00
# ------------ Windows 专用:列出所有 iproxy 命令行(更安全) ------------
2025-09-22 14:36:05 +08:00
def _get_all_iproxy_cmdlines(self) -> List[str]:
lines: List[str] = []
2025-10-23 21:38:18 +08:00
live_pids = set()
with self._lock:
live_pids = set(self._pid_by_udid.values())
2025-09-28 14:35:09 +08:00
for p in psutil.process_iter(attrs=["name", "cmdline", "pid"]):
try:
name = (p.info.get("name") or "").lower()
if name != "iproxy.exe":
continue
2025-10-23 21:38:18 +08:00
# 跳过我们自己登记在册的 iproxy避免误杀
if p.info["pid"] in live_pids:
continue
2025-09-28 14:35:09 +08:00
cmdline = p.info.get("cmdline") or []
if not cmdline:
continue
if "-u" in cmdline:
cmd = " ".join(cmdline)
lines.append(f"{cmd} {p.info['pid']}")
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
2025-09-22 14:36:05 +08:00
return lines
# ------------ 杀孤儿 ------------
def _cleanup_orphan_iproxy(self):
2025-10-23 21:38:18 +08:00
live_udids = set()
live_pids = set()
with self._lock:
live_udids = set(self._models.keys())
live_pids = set(self._pid_by_udid.values())
2025-09-22 14:36:05 +08:00
for ln in self._get_all_iproxy_cmdlines():
parts = ln.split()
try:
udid = parts[parts.index('-u') + 1]
pid = int(parts[-1])
2025-10-23 21:38:18 +08:00
# 既不在我们的 PID 表里,且 UDID 不在线,才算孤儿
if pid not in live_pids and udid not in live_udids:
2025-09-22 14:36:05 +08:00
self._kill_pid_gracefully(pid)
LogManager.warning(f'扫到孤儿 iproxy已清理 {udid} PID={pid}')
except (ValueError, IndexError):
continue
2025-09-16 15:31:55 +08:00
2025-09-22 14:36:05 +08:00
# ------------ 按 PID 强杀 ------------
2025-09-16 15:31:55 +08:00
def _kill_pid_gracefully(self, pid: int):
try:
2025-09-28 14:35:09 +08:00
p = psutil.Process(pid)
p.terminate()
try:
p.wait(timeout=1.0)
except psutil.TimeoutExpired:
p.kill()
2025-09-16 15:31:55 +08:00
except Exception:
pass
2025-10-23 21:38:18 +08:00
# ------------ 端口工具 ------------
2025-09-28 14:35:09 +08:00
def _is_port_free(self, port: int) -> bool:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.settimeout(0.2)
try:
s.bind(("127.0.0.1", port))
return True
except OSError:
return False
def _pick_free_port(self, start: int = None, limit: int = 2000) -> int:
2025-10-23 21:38:18 +08:00
"""从 start 起向上找一个空闲端口。(注意:调用方务必在 self._lock 下)"""
2025-09-28 14:35:09 +08:00
p = self._port if start is None else start
tried = 0
while tried < limit:
p += 1
tried += 1
if self._is_port_free(p):
self._port = p # 更新游标
return p
raise RuntimeError("未找到可用端口(扫描范围内)")
def _health_check_mjpeg(self, port: int, timeout: float = 1.0) -> bool:
"""
http://127.0.0.1:<port>/ 做非常轻量的探活
WDA mjpegServer(默认9100)通常根路径就会有 multipart/x-mixed-replace
"""
try:
conn = http.client.HTTPConnection("127.0.0.1", port, timeout=timeout)
conn.request("GET", "/")
resp = conn.getresponse()
alive = 200 <= resp.status < 400
try:
resp.read(256)
except Exception:
pass
conn.close()
return alive
except Exception:
return False
def _restart_iproxy(self, udid: str):
2025-10-23 21:38:18 +08:00
"""重启某个 udid 的 iproxy带退避"""
2025-09-28 14:35:09 +08:00
now = time.time()
next_allowed = self._heal_backoff[udid]
if now < next_allowed:
return # 处于退避窗口内,先不重启
2025-10-23 21:38:18 +08:00
with self._lock:
proc = self._procs.get(udid)
if proc:
self._kill(proc)
time.sleep(0.3)
model = self._models.get(udid)
if not model:
return
# 如果端口被别的进程占用了,换一个新端口并通知管理器
if not self._is_port_free(model.screenPort):
new_port = self._pick_free_port(start=max(self._port, model.screenPort))
model.screenPort = new_port
self._models[udid] = model
self._port_by_udid[udid] = new_port
self._manager_send(model) # 通知前端/上位机端口变化
proc2 = self._start_iproxy(udid, model.screenPort)
if not proc2:
# 启动失败,设置退避(逐步增加上限)
self._heal_backoff[udid] = now + 2.0
return
self._procs[udid] = proc2
self._pid_by_udid[udid] = proc2.pid
2025-09-28 14:35:09 +08:00
# 成功后缩短退避
self._heal_backoff[udid] = now + 0.5
def _check_and_heal_tunnels(self, interval: float = 2.0):
"""
定期巡检所有在线设备的本地映射端口是否活着不活就重启 iproxy
"""
now = time.time()
if now - self._last_heal_check_ts < interval:
return
self._last_heal_check_ts = now
2025-10-23 21:38:18 +08:00
# 读取时也加锁,避免与增删设备并发冲突
with self._lock:
items = list(self._models.items())
for udid, model in items:
2025-09-28 14:35:09 +08:00
port = model.screenPort
if port <= 0:
continue
ok = self._health_check_mjpeg(port, timeout=0.8)
if not ok:
LogManager.warning(f"端口失活准备自愈udid={udid} port={port}")
self._restart_iproxy(udid)