Files
iOSAI/Module/DeviceInfo.py

626 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
极简稳定版设备监督器DeviceInfo加详细 print 日志
- 每个关键节点都会 print便于人工观察执行到哪一步
- 保留核心逻辑:监听上下线 / 启动 WDA / 起 iproxy / 通知前端
"""
import os
import time
import threading
import subprocess
import socket
from pathlib import Path
from typing import Dict, Optional, List, Any
import platform
import psutil
import http.client
import tidevice
import wda
from tidevice import Usbmux, ConnectionType
from tidevice._device import BaseDevice
from Entity.DeviceModel import DeviceModel
from Entity.Variables import WdaAppBundleId, wdaScreenPort, wdaFunctionPort
from Module.FlaskSubprocessManager import FlaskSubprocessManager
from Module.IOSActivator import IOSActivator
from Utils.LogManager import LogManager
def _monotonic() -> float:
return time.monotonic()
def _is_port_free(port: int, host: str = "127.0.0.1") -> bool:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind((host, port))
return True
except OSError:
return False
finally:
s.close()
def _pick_free_port(low: int = 20000, high: int = 48000) -> int:
"""全局兜底的端口选择:先随机后顺扫,避免固定起点导致碰撞。支持通过环境变量覆盖范围:
PORT_RANGE_LOW / PORT_RANGE_HIGH
"""
try:
low = int(os.getenv("PORT_RANGE_LOW", str(low)))
high = int(os.getenv("PORT_RANGE_HIGH", str(high)))
except Exception:
pass
if high - low < 100:
high = low + 100
import random
# 随机尝试 64 次
tried = set()
for _ in range(64):
p = random.randint(low, high)
if p in tried:
continue
tried.add(p)
if _is_port_free(p):
return p
# 顺序兜底
for p in range(low, high):
if p in tried:
continue
if _is_port_free(p):
return p
raise RuntimeError("未找到可用端口")
class DeviceInfo:
_instance = None
_instance_lock = threading.Lock()
def __new__(cls, *args, **kwargs):
# 双重检查锁,确保线程安全单例
if not cls._instance:
with cls._instance_lock:
if not cls._instance:
cls._instance = super().__new__(cls)
return cls._instance
# ---- 端口分配:加一个最小的“保留池”,避免并发选到同一个端口 ----
def _alloc_port(self) -> int:
with self._lock:
busy = set(self._port_by_udid.values()) | set(self._reserved_ports)
# 优先随机尝试若干次,减少并发碰撞
import random
low = int(os.getenv("PORT_RANGE_LOW", "20000"))
high = int(os.getenv("PORT_RANGE_HIGH", "48000"))
for _ in range(128):
p = random.randint(low, high)
with self._lock:
if p not in busy and p not in self._reserved_ports and _is_port_free(p):
self._reserved_ports.add(p)
return p
# 兜底顺序扫描
for p in range(low, high):
with self._lock:
if p in self._reserved_ports or p in busy:
continue
if _is_port_free(p):
with self._lock:
self._reserved_ports.add(p)
return p
raise RuntimeError("端口分配失败:没有可用端口")
def _release_port(self, port: int):
with self._lock:
self._reserved_ports.discard(port)
ADD_STABLE_SEC = float(os.getenv("ADD_STABLE_SEC", "2.0"))
REMOVE_GRACE_SEC = float(os.getenv("REMOVE_GRACE_SEC", "6.0"))
WDA_READY_TIMEOUT = float(os.getenv("WDA_READY_TIMEOUT", "35.0"))
def __init__(self) -> None:
# 防止多次初始化(因为 __init__ 每次调用 DeviceInfo() 都会执行)
if getattr(self, "_initialized", False):
return
self._lock = threading.RLock()
self._models: Dict[str, DeviceModel] = {}
self._iproxy: Dict[str, subprocess.Popen] = {}
self._port_by_udid: Dict[str, int] = {}
self._reserved_ports: set[int] = set()
self._first_seen: Dict[str, float] = {}
self._last_seen: Dict[str, float] = {}
self._manager = FlaskSubprocessManager.get_instance()
self._iproxy_path = self._find_iproxy()
self._check_fail: Dict[str, int] = {}
self.MAX_DEVICES = 6
LogManager.info("DeviceInfo 初始化完成", udid="system")
print("[Init] DeviceInfo 初始化完成")
threading.Thread(target=self.check_iproxy_ports).start()
# =============== 核心端口连通性检测HTTP 方式) =================
def _is_local_port_open(self, port: int, udid: str, timeout: float = 5) -> bool:
"""
使用 HTTP 方式检测:对 http://127.0.0.1:port/status 发送 GET。
✅ 1xx~5xx 任意状态码都视作“HTTP 可达”WDA 常返回 200/404/401
✅ 超时改为默认 5 秒,更抗抖。
"""
if not isinstance(port, int) or port <= 0 or port > 65535:
LogManager.error("端口不可用(非法端口号)", udid=udid)
return False
try:
conn = http.client.HTTPConnection("127.0.0.1", int(port), timeout=timeout)
conn.request("GET", "/status")
resp = conn.getresponse()
_ = resp.read(128)
status = resp.status
conn.close()
if 100 <= status <= 599:
return True
else:
LogManager.error(f"HTTP状态码异常: {status}", udid=udid)
return False
except Exception as e:
LogManager.error(f"HTTP检测失败{e}", udid=udid)
return False
# =============== 一轮检查:发现不通就移除 =================
def check_iproxy_ports(self, connect_timeout: float = 3) -> None:
"""
周期性健康检查 iproxy -> WDA HTTP 可达性。
✅ 改为“连续失败 3 次才移除”,大幅降低抖动下的误删。
"""
# 给系统和 WDA 一点缓冲时间
time.sleep(20)
FAIL_THRESHOLD = 3 # 连续失败 N 次才视为离线
INTERVAL_SEC = 10 # 巡检间隔
while True:
snapshot = list(self._models.items()) # [(deviceId, DeviceModel), ...]
for device_id, model in snapshot:
try:
# 只处理在线的 iOStype==1
if model.type != 1:
continue
port = int(model.screenPort)
if port <= 0 or port > 65535:
continue
ok = self._is_local_port_open(port, udid=device_id, timeout=connect_timeout)
if ok:
# 成功即清零失败计数
try:
self._check_fail[device_id] = 0
except Exception:
pass
# 可选:打印心跳日志过于刷屏,这里省略
continue
# 记录失败计数
cnt = self._check_fail.get(device_id, 0) + 1
self._check_fail[device_id] = cnt
print(f"[iproxy-check] FAIL #{cnt} deviceId={device_id} port={port}")
if cnt >= FAIL_THRESHOLD:
print(f"[iproxy-check] 连续失败{cnt}次,移除设备 deviceId={device_id} port={port}")
# 清掉计数并移除
self._check_fail.pop(device_id, None)
try:
self._remove_device(device_id)
except Exception as e:
print(f"[iproxy-check] _remove_device 异常 deviceId={device_id}: {e}")
except Exception as e:
print(f"[iproxy-check] 单设备检查异常: {e}")
time.sleep(INTERVAL_SEC)
def listen(self):
LogManager.method_info("进入主循环", "listen", udid="system")
print("[Listen] 开始监听设备上下线...")
while True:
try:
usb = Usbmux().device_list()
online = {d.udid for d in usb if d.conn_type == ConnectionType.USB}
except Exception as e:
LogManager.warning(f"[device_list] 异常:{e}", udid="system")
print(f"[Listen] 获取设备列表异常: {e}")
time.sleep(1)
continue
now = _monotonic()
for u in online:
self._first_seen.setdefault(u, now)
self._last_seen[u] = now
with self._lock:
known = set(self._models.keys())
current_online_count = sum(1 for m in self._models.values() if getattr(m, "type", 2) == 1)
# ==== 限流新增(最多 6 台)====
candidates = list(online - known)
stable_candidates = [udid for udid in candidates
if (now - self._first_seen.get(udid, now)) >= self.ADD_STABLE_SEC]
# 谁先出现谁优先
stable_candidates.sort(key=lambda u: self._first_seen.get(u, now))
capacity = max(0, self.MAX_DEVICES - current_online_count)
to_add = stable_candidates[:capacity] if capacity > 0 else []
for udid in to_add:
print(f"[Add] 检测到新设备: {udid}")
try:
self._add_device(udid)
except Exception as e:
LogManager.method_error(f"新增失败:{e}", "listen", udid=udid)
print(f"[Add] 新增失败 {udid}: {e}")
# ==== 处理离线 ====
for udid in list(known):
if udid in online:
continue
last = self._last_seen.get(udid, 0.0)
if (now - last) >= self.REMOVE_GRACE_SEC:
print(f"[Remove] 检测到设备离线: {udid}")
try:
self._remove_device(udid)
except Exception as e:
LogManager.method_error(f"移除失败:{e}", "listen", udid=udid)
print(f"[Remove] 移除失败 {udid}: {e}")
time.sleep(1)
def _wait_wda_ready_on_port(self, udid: str, local_port: int, total_timeout_sec: float = None) -> bool:
"""在给定的本地映射端口上等待 /status 就绪。"""
import http.client, time
if total_timeout_sec is None:
total_timeout_sec = self.WDA_READY_TIMEOUT
deadline = _monotonic() + total_timeout_sec
attempt = 0
while _monotonic() < deadline:
attempt += 1
try:
conn = http.client.HTTPConnection("127.0.0.1", local_port, timeout=1.8)
conn.request("GET", "/status")
resp = conn.getresponse()
_ = resp.read(128)
code = getattr(resp, "status", 0)
ok = 200 <= code < 400
print(f"[WDA] /status@{local_port}{attempt}次 code={code}, ok={ok} {udid}")
if ok:
return True
except Exception as e:
print(f"[WDA] /status@{local_port} 异常({attempt}): {e}")
time.sleep(0.5)
print(f"[WDA] /status@{local_port} 等待超时 {udid}")
return False
def _add_device(self, udid: str):
print(f"[Add] 开始新增设备 {udid}")
# ====== 上限保护:并发安全的首次检查 ======
with self._lock:
if udid in self._models:
print(f"[Add] 设备已存在,跳过 {udid}")
return
current_online_count = sum(1 for m in self._models.values() if getattr(m, "type", 2) == 1)
if current_online_count >= self.MAX_DEVICES:
print(f"[Add] 已达设备上限 {self.MAX_DEVICES},忽略新增 {udid}")
return
if not self._trusted(udid):
print(f"[Add] 未信任设备 {udid}, 跳过")
return
try:
dev = tidevice.Device(udid)
major = int(dev.product_version.split(".")[0])
except Exception:
major = 0
if not self._wda_http_status_ok_once(udid):
if major > 17:
print("进入iOS17设备的分支")
out = IOSActivator().activate(udid)
print("wda启动完成")
else:
print(f"[WDA] iOS<=17 启动 WDA app_start (port={wdaScreenPort})")
dev = tidevice.Device(udid)
dev.app_start(WdaAppBundleId)
time.sleep(2)
if not self._wait_wda_ready_http(udid, self.WDA_READY_TIMEOUT):
print(f"[WDA] WDA 未在超时内就绪, 放弃新增 {udid}")
return
print(f"[WDA] WDA 就绪,准备获取屏幕信息 {udid}")
time.sleep(0.5)
w, h, s = self._screen_info_with_timeout(udid, timeout=3.5)
if not (w and h and s):
for i in range(4):
print(f"[Screen] 第{i + 1}次获取失败, 重试中... {udid}")
time.sleep(0.6)
w, h, s = self._screen_info_with_timeout(udid, timeout=3.5)
if w and h and s:
break
if not (w and h and s):
print(f"[Screen] 屏幕信息仍为空,继续添加 {udid}")
# ====== 上限保护:在分配端口前做二次检查(防并发越界)======
with self._lock:
current_online_count = sum(1 for m in self._models.values() if getattr(m, "type", 2) == 1)
if current_online_count >= self.MAX_DEVICES:
print(f"[Add](二次检查)已达设备上限 {self.MAX_DEVICES},忽略新增 {udid}")
return
port = self._alloc_port()
print(f"[iproxy] 准备启动 iproxy 映射 {port}->{wdaScreenPort}")
proc = self._start_iproxy(udid, local_port=port)
if not proc:
self._release_port(port)
print(f"[iproxy] 启动失败,放弃新增 {udid}")
return
with self._lock:
model = DeviceModel(deviceId=udid, screenPort=port, width=w, height=h, scale=s, type=1)
model.ready = True
self._models[udid] = model
self._iproxy[udid] = proc
self._port_by_udid[udid] = port
print(f"[Manager] 准备发送设备数据到前端 {udid}")
self._manager_send(model)
print(f"[Add] 设备添加成功 {udid}, port={port}, {w}x{h}@{s}")
def _remove_device(self, udid: str):
"""
移除设备及其转发,通知上层(幂等)。
✅ 同时释放 self._reserved_ports 中可能残留的保留端口。
✅ 同时清理 _iproxy / _port_by_udid。
"""
print(f"[Remove] 正在移除设备 {udid}")
# --- 1. 锁内取出并清空字典 ---
with self._lock:
model = self._models.pop(udid, None)
proc = self._iproxy.pop(udid, None)
port = self._port_by_udid.pop(udid, None)
# --- 2. 杀进程 ---
try:
self._kill(proc)
except Exception as e:
print(f"[Remove] 杀进程异常 {udid}: {e}")
# --- 3. 释放“保留端口”(如果还在集合里)---
if isinstance(port, int) and port > 0:
try:
self._release_port(port)
except Exception as e:
print(f"[Remove] 释放保留端口异常 {udid}: {e}")
# --- 4. 构造下线模型并通知 ---
if model is None:
model = DeviceModel(deviceId=udid, screenPort=-1, width=0, height=0, scale=0.0, type=2)
model.type = 2
model.ready = False
model.screenPort = -1
try:
self._manager_send(model)
except Exception as e:
print(f"[Remove] 通知上层异常 {udid}: {e}")
# --- 5. 清理失败计数(健康检查用)---
try:
if hasattr(self, "_check_fail"):
self._check_fail.pop(udid, None)
except Exception:
pass
print(f"[Remove] 设备移除完成 {udid}")
def _trusted(self, udid: str) -> bool:
try:
BaseDevice(udid).get_value("DeviceName")
print(f"[Trust] 设备 {udid} 已信任")
return True
except Exception:
print(f"[Trust] 设备 {udid} 未信任")
return False
def _wda_http_status_ok_once(self, udid: str, timeout_sec: float = 1.8) -> bool:
"""只做一次 /status 探测。任何异常都返回 False不让外层炸掉。"""
tmp_port = None
proc = None
try:
tmp_port = self._alloc_port() # 这里可能抛异常
print(f"[WDA] 启动临时 iproxy 以检测 /status {udid}")
proc = self._spawn_iproxy(udid, local_port=tmp_port, remote_port=wdaScreenPort)
if not proc:
print("[WDA] 启动临时 iproxy 失败")
return False
if not self._wait_until_listening(tmp_port, 3.0):
print(f"[WDA] 临时端口未监听 {tmp_port}")
return False
# 最多两次快速探测
for i in (1, 2):
try:
import http.client
conn = http.client.HTTPConnection("127.0.0.1", tmp_port, timeout=timeout_sec)
conn.request("GET", "/status")
resp = conn.getresponse()
_ = resp.read(128)
code = getattr(resp, "status", 0)
ok = 200 <= code < 400
print(f"[WDA] /status 第{i}次 code={code}, ok={ok}")
if ok:
return True
except Exception as e:
print(f"[WDA] /status 异常({i}): {e}")
time.sleep(0.25)
return False
except Exception as e:
import traceback
print(f"[WDA][probe] 异常:{e}\n{traceback.format_exc()}")
return False
finally:
if proc:
self._kill(proc)
if tmp_port is not None:
self._release_port(tmp_port)
def _wait_wda_ready_http(self, udid: str, total_timeout_sec: float) -> bool:
print(f"[WDA] 等待 WDA Ready (超时 {total_timeout_sec}s) {udid}")
deadline = _monotonic() + total_timeout_sec
while _monotonic() < deadline:
if self._wda_http_status_ok_once(udid):
print(f"[WDA] WDA 就绪 {udid}")
return True
time.sleep(0.6)
print(f"[WDA] WDA 等待超时 {udid}")
return False
def _screen_info(self, udid: str):
try:
# 避免 c.home() 可能触发的阻塞,直接取 window_size
c = wda.USBClient(udid, wdaFunctionPort)
size = c.window_size()
print(f"[Screen] 成功获取屏幕 {int(size.width)}x{int(size.height)} {udid}")
return int(size.width), int(size.height), float(c.scale)
except Exception as e:
print(f"[Screen] 获取屏幕信息异常: {e} {udid}")
return 0, 0, 0.0
def _screen_info_with_timeout(self, udid: str, timeout: float = 3.5):
"""在线程里调用 _screen_info超时返回 0 值,防止卡死。"""
import threading
result = {"val": (0, 0, 0.0)}
done = threading.Event()
def _target():
try:
result["val"] = self._screen_info(udid)
finally:
done.set()
t = threading.Thread(target=_target, daemon=True)
t.start()
if not done.wait(timeout):
print(f"[Screen] 获取屏幕信息超时({timeout}s) {udid}")
return 0, 0, 0.0
return result["val"]
def _wait_until_listening(self, port: int, timeout: float) -> bool:
for to in (1.5, 2.5, 3.5):
deadline = _monotonic() + to
while _monotonic() < deadline:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(0.25)
if s.connect_ex(("127.0.0.1", port)) == 0:
print(f"[Port] 端口 {port} 已监听")
return True
time.sleep(0.05)
print(f"[Port] 端口 {port} 未监听")
return False
def _spawn_iproxy(self, udid: str, local_port: int, remote_port: int) -> Optional[subprocess.Popen]:
"""
启动 iproxy 子进程。
✅ 将 stdout/stderr 写入 log/iproxy/{udid}_{port}.log便于追查“端口被占用/被拦截/崩溃”等原因。
"""
creationflags = 0
startupinfo = None
if os.name == "nt":
creationflags = getattr(subprocess, "CREATE_NO_WINDOW", 0x08000000) | \
getattr(subprocess, "CREATE_NEW_PROCESS_GROUP", 0x00000200)
si = subprocess.STARTUPINFO()
si.dwFlags |= subprocess.STARTF_USESHOWWINDOW
si.wShowWindow = 0
startupinfo = si
cmd = [self._iproxy_path, "-u", udid, str(local_port), str(remote_port)]
# 日志文件
log_dir = Path("log/iproxy")
try:
log_dir.mkdir(parents=True, exist_ok=True)
except Exception:
pass
log_path = log_dir / f"{udid}_{local_port}.log"
try:
print(f"[iproxy] 启动进程 {cmd} (log={log_path})")
logfile = open(log_path, "ab", buffering=0)
return subprocess.Popen(
cmd,
stdout=logfile,
stderr=logfile,
creationflags=creationflags,
startupinfo=startupinfo,
)
except Exception as e:
print(f"[iproxy] 创建进程失败: {e}")
LogManager.error(f"[iproxy] 创建进程失败: {e}", "system")
def _start_iproxy(self, udid: str, local_port: int) -> Optional[subprocess.Popen]:
"""
启动 iproxy 并等待本地端口监听成功。
✅ 监听成功后,立刻释放 self._reserved_ports 对应的“保留”,避免保留池越攒越多。
"""
proc = self._spawn_iproxy(udid, local_port=local_port, remote_port=wdaScreenPort)
if not proc:
print(f"[iproxy] 启动失败 {udid}")
return None
if not self._wait_until_listening(local_port, 3.0):
self._kill(proc)
print(f"[iproxy] 未监听, 已杀死 {udid}")
return None
# ✅ 监听成功,释放“保留端口”
try:
self._release_port(local_port)
except Exception as e:
print(f"[iproxy] 释放保留端口异常: {e}")
print(f"[iproxy] 启动成功 port={local_port} {udid}")
return proc
def _kill(self, proc: Optional[subprocess.Popen]):
if not proc:
return
try:
p = psutil.Process(proc.pid)
p.terminate()
try:
p.wait(timeout=1.5)
except psutil.TimeoutExpired:
p.kill(); p.wait(timeout=1.5)
print(f"[Proc] 已结束进程 PID={proc.pid}")
except Exception as e:
print(f"[Proc] 结束进程异常: {e}")
def _manager_send(self, model: DeviceModel):
try:
self._manager.send(model.toDict())
print(f"[Manager] 已发送前端数据 {model.deviceId}")
except Exception as e:
print(f"[Manager] 发送异常: {e}")
def _find_iproxy(self) -> str:
env_path = os.getenv("IPROXY_PATH")
if env_path and Path(env_path).is_file():
print(f"[iproxy] 使用环境变量路径 {env_path}")
return env_path
base = Path(__file__).resolve().parent.parent
name = "iproxy.exe" if os.name == "nt" else "iproxy"
path = base / "resources" / "iproxy" / name
if path.is_file():
print(f"[iproxy] 使用默认路径 {path}")
return str(path)
raise FileNotFoundError(f"iproxy 不存在: {path}")