Files
iOSAI/Module/IOSActivator.py

500 lines
20 KiB
Python
Raw Normal View History

2025-10-24 22:04:28 +08:00
# -*- coding: utf-8 -*-
2025-10-21 15:43:02 +08:00
import os
import re
import sys
2025-10-24 22:04:28 +08:00
import atexit
import signal
2025-10-23 18:53:22 +08:00
import socket
2025-10-21 15:43:02 +08:00
import subprocess
2025-10-24 22:04:28 +08:00
from typing import Optional, List, Tuple, Dict, Set
2025-10-21 15:43:02 +08:00
from Entity.Variables import WdaAppBundleId
class IOSActivator:
"""
2025-10-24 22:04:28 +08:00
轻量 iOS 激活器仅代码调用 - 进程/网卡可控版
1) 启动 `pymobiledevice3 remote tunneld`可常驻/可一次性
2) 自动挂载 DDI
3) 隧道就绪后启动 WDA
4) 程序退出或 keep_tunnel=False 确保 tunneld 进程与虚拟网卡被清理
2025-10-21 15:43:02 +08:00
"""
2025-10-24 22:04:28 +08:00
# ---------- 正则 ----------
HTTP_RE = re.compile(r"http://([0-9.]+):(\d+)")
RSD_CREATED_RE = re.compile(r"Created tunnel\s+--rsd\s+([^\s]+)\s+(\d+)")
RSD_FALLBACK_RE = re.compile(r"--rsd\s+(\S+?)[\s:](\d+)")
IFACE_RE = re.compile(r"\b(pymobiledevice3-tunnel-[^\s/\\]+)\b", re.IGNORECASE)
2025-10-21 15:43:02 +08:00
def __init__(self, python_executable: Optional[str] = None):
2025-10-23 18:53:22 +08:00
self.python = python_executable or None
2025-10-24 22:04:28 +08:00
self._live_procs: Dict[str, subprocess.Popen] = {} # udid -> tunneld proc
self._live_ifaces: Dict[str, Set[str]] = {} # udid -> {iface names}
self._registered = False
# =============== 公共入口 ===============
def activate(
2025-10-27 21:44:16 +08:00
self,
udid: str,
wda_bundle_id: str = WdaAppBundleId,
ready_timeout_sec: float = 60.0,
pre_mount_first: bool = True,
mount_retries: int = 2,
backoff_seconds: float = 1.5,
keep_tunnel: bool = False,
broad_cleanup_on_exit: bool = True,
2025-10-24 22:04:28 +08:00
) -> str:
"""
2025-10-27 21:44:16 +08:00
Windows 简版不读任何 tunneld 日志也不做 RSD 解析
逻辑先探活 -> 开隧道 -> 直接用 HTTP 隧道端口反复尝试启动 WDA -> 探活成功即返回
2025-10-24 22:04:28 +08:00
"""
2025-10-27 21:44:16 +08:00
import time, ctypes, traceback
2025-10-24 22:04:28 +08:00
if not udid or not isinstance(udid, str):
raise ValueError("udid is required and must be a non-empty string")
2025-10-27 21:44:16 +08:00
print(f"[activate] UDID={udid}", flush=True)
# —— 管理员提示Windows 清理虚拟网卡常用)——
try:
if ctypes.windll.shell32.IsUserAnAdmin() == 0:
print("[⚠] 未以管理员运行:若需要移除虚拟网卡,可能失败。", flush=True)
except Exception:
pass
2025-10-24 22:04:28 +08:00
2025-10-27 21:44:16 +08:00
# —— 退出钩子(可选)——
try:
self._ensure_exit_hooks(broad_cleanup_on_exit=broad_cleanup_on_exit) # type: ignore[attr-defined]
except Exception as e:
print(f"[activate] _ensure_exit_hooks warn: {e}", flush=True)
# —— 小工具:探活 WDA —— #
def _wda_alive(timeout: float = 2.0) -> bool:
2025-10-24 22:04:28 +08:00
try:
2025-10-27 21:44:16 +08:00
if hasattr(self, "_wda_alive_now"):
return bool(self._wda_alive_now(udid, timeout=timeout)) # type: ignore[attr-defined]
if hasattr(self, "_wda_client"):
cli = self._wda_client(udid) # type: ignore[attr-defined]
if hasattr(cli, "wait_ready"):
return bool(cli.wait_ready(timeout=timeout))
2025-10-24 22:04:28 +08:00
except Exception:
2025-10-27 21:44:16 +08:00
return False
return False
2025-10-24 22:04:28 +08:00
2025-10-27 21:44:16 +08:00
# 0) 快路径WDA 已活
if _wda_alive(2.0):
print("[activate] WDA already alive, skip launching.", flush=True)
return "WDA already alive"
2025-10-24 22:04:28 +08:00
2025-10-27 21:44:16 +08:00
# 1) 预挂载(失败不致命)
if pre_mount_first and hasattr(self, "_auto_mount_developer_disk"):
2025-10-24 22:04:28 +08:00
try:
2025-10-27 21:44:16 +08:00
self._auto_mount_developer_disk(udid, retries=mount_retries,
backoff_seconds=backoff_seconds) # type: ignore[attr-defined]
time.sleep(1.5)
2025-10-24 22:04:28 +08:00
except Exception as e:
2025-10-27 21:44:16 +08:00
print(f"[activate] 预挂载失败(继续):{e}", flush=True)
2025-10-24 22:04:28 +08:00
2025-10-27 21:44:16 +08:00
# 2) 开隧道(关键:拿到 HTTP 端口即可;不读取任何 stdout/stderr
proc = None
http_host, http_port = "127.0.0.1", None
2025-10-24 22:04:28 +08:00
try:
2025-10-27 21:44:16 +08:00
ret = self._start_tunneld(udid) # type: ignore[attr-defined]
if isinstance(ret, tuple):
proc, http_port = ret[0], ret[1]
else:
proc = ret
if http_port is None:
# 若你的 _start_tunneld 固定端口,可在这里写死(例如 8100/某自定义端口)
raise RuntimeError("未获取到 HTTP 隧道端口_start_tunneld 未返回端口)")
except Exception:
# 即便开隧道失败,也再探活一次(可能本来就活)
if _wda_alive(2.0):
print("[activate] WDA already alive (tunnel start failed but OK).", flush=True)
return "WDA already alive"
raise
print(f"[tunneld] HTTP tunnel at {http_host}:{http_port}", flush=True)
# 3) 直接用 HTTP 隧道反复尝试启动 WDA + 探活
deadline = time.time() + (ready_timeout_sec if ready_timeout_sec > 0 else 60.0)
launched = False
2025-10-24 22:04:28 +08:00
2025-10-27 21:44:16 +08:00
try:
while time.time() < deadline:
# 已活则成功返回
if _wda_alive(1.5):
print("[activate] WDA detected alive.", flush=True)
launched = True
2025-10-24 22:04:28 +08:00
break
2025-10-27 21:44:16 +08:00
# 尝试发起一次 HTTP 启动(失败就下一轮重试)
try:
if hasattr(self, "_launch_wda_via_http_tunnel"):
self._launch_wda_via_http_tunnel( # type: ignore[attr-defined]
bundle_id=wda_bundle_id,
http_host=http_host,
http_port=str(http_port),
udid=udid,
)
except Exception as e:
# 仅打印,不中断;下一次循环再试
print(f"[activate] _launch_wda_via_http_tunnel error: {e}", flush=True)
# 启动后给一点时间让 WDA ready
for _ in range(3):
if _wda_alive(1.0):
launched = True
break
time.sleep(0.5)
if launched:
2025-10-24 22:04:28 +08:00
break
2025-10-27 21:44:16 +08:00
time.sleep(1.0) # 下一轮重试
if not launched:
raise RuntimeError(f"WDA not ready within {ready_timeout_sec}s via HTTP tunnel")
print("[activate] Done.", flush=True)
return f"http://{http_host}:{http_port}"
2025-10-24 22:04:28 +08:00
finally:
if not keep_tunnel:
2025-10-27 21:44:16 +08:00
try:
self.stop_tunnel(udid, broad_cleanup=broad_cleanup_on_exit) # type: ignore[attr-defined]
except Exception as e:
print(f"[activate] stop_tunnel warn: {e}", flush=True)
2025-10-24 22:04:28 +08:00
# =============== 外部可显式调用的清理 ===============
def stop_tunnel(self, udid: str, broad_cleanup: bool = True):
"""关闭某 UDID 的 tunneld并清理已知/残留的 pmd3 虚拟网卡。"""
proc = self._live_procs.pop(udid, None)
ifaces = self._live_ifaces.pop(udid, set())
# 1) 结束进程
if proc:
try:
proc.terminate()
proc.wait(timeout=5)
except subprocess.TimeoutExpired:
try:
proc.kill()
proc.wait(timeout=3)
except Exception:
pass
# 兜底:杀掉本进程树内可能残留的 tunneld
self._kill_stray_tunneld_children()
# 2) 清理网卡
try:
if os.name == "nt":
# 先按已知名精确删除
for name in sorted(ifaces):
self._win_remove_adapter(name)
# 宽匹配兜底
if broad_cleanup:
self._win_remove_all_pmd3_adapters()
else:
# *nix 基本不需要手动删,若有需要可在此处添加 ip link delete 等
pass
except Exception as e:
print(f"[cleanup] adapter cleanup error: {e}")
# =============== 内部:启动 tunneld ===============
def _start_tunneld(self, udid: str) -> Tuple[subprocess.Popen, int]:
port = self._pick_available_port()
launcher, env2 = self._resolve_pmd3_argv_and_env()
env2["PYTHONUNBUFFERED"] = "1"
env2.setdefault("PYTHONIOENCODING", "utf-8")
env2["PYMOBILEDEVICE3_UDID"] = udid
cmd = self._ensure_str_list([*launcher, "remote", "tunneld", "--port", str(port)])
print("[activate] 启动隧道:", " ".join(cmd))
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
universal_newlines=True,
env=env2,
**self._win_hidden_popen_kwargs()
)
return proc, port
2025-10-23 18:53:22 +08:00
2025-10-24 22:04:28 +08:00
# =============== 退出/信号回收 ===============
def _ensure_exit_hooks(self, broad_cleanup_on_exit: bool):
if self._registered:
return
self._registered = True
def _on_exit():
# 逐个关闭存活隧道
for udid in list(self._live_procs.keys()):
try:
self.stop_tunnel(udid, broad_cleanup=broad_cleanup_on_exit)
except Exception:
pass
atexit.register(_on_exit)
def _signal_handler(signum, frame):
_on_exit()
# 默认行为:再次发出信号退出
try:
signal.signal(signum, signal.SIG_DFL)
except Exception:
pass
os.kill(os.getpid(), signum)
for sig in (signal.SIGINT, signal.SIGTERM):
try:
signal.signal(sig, _signal_handler)
except Exception:
pass # 某些环境不允许设置
# =============== Windows 虚拟网卡清理 ===============
def _win_remove_adapter(self, name: str):
"""按名删除一个虚拟网卡。"""
print(f"[cleanup] remove adapter: {name}")
# 先尝试 PowerShell需要管理员
ps = [
"powershell", "-NoProfile", "-ExecutionPolicy", "Bypass", "-Command",
f"$a=Get-NetAdapter -Name '{name}' -ErrorAction SilentlyContinue; "
f"if($a){{ Disable-NetAdapter -Name '{name}' -Confirm:$false -PassThru | Out-Null; "
f"Remove-NetAdapter -Name '{name}' -Confirm:$false -ErrorAction SilentlyContinue; }}"
]
try:
subprocess.run(ps, check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, **self._win_hidden_popen_kwargs())
return
except Exception:
pass
# 兜底netsh 禁用
try:
subprocess.run(
["netsh", "interface", "set", "interface", name, "disable"],
check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, **self._win_hidden_popen_kwargs()
)
except Exception:
pass
def _win_remove_all_pmd3_adapters(self):
"""宽匹配删除所有 pymobiledevice3-tunnel-* 网卡(防残留)。"""
print("[cleanup] sweeping all pymobiledevice3-tunnel-* adapters")
ps = [
"powershell", "-NoProfile", "-ExecutionPolicy", "Bypass", "-Command",
r"$nics=Get-NetAdapter -Name 'pymobiledevice3-tunnel-*' -ErrorAction SilentlyContinue; "
r"foreach($n in $nics){ Disable-NetAdapter -Name $n.Name -Confirm:$false -PassThru | Out-Null; "
r"Remove-NetAdapter -Name $n.Name -Confirm:$false -ErrorAction SilentlyContinue; }"
]
try:
subprocess.run(ps, check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, **self._win_hidden_popen_kwargs())
except Exception:
pass
def _kill_stray_tunneld_children(self):
"""在当前进程空间内尽量清理残留的 tunneld 子进程。"""
import psutil
try:
me = psutil.Process(os.getpid())
for ch in me.children(recursive=True):
try:
cmd = " ".join(ch.cmdline()).lower()
except Exception:
cmd = ""
if "pymobiledevice3" in cmd and "remote" in cmd and "tunneld" in cmd:
try:
ch.terminate()
ch.wait(2)
except Exception:
try:
ch.kill()
except Exception:
pass
except Exception:
pass
# =============== 其它工具 & 你原有的方法(未改动核心逻辑) ===============
def _pmd3_run(self, args: List[str], udid: str, extra_env: Optional[dict] = None) -> str:
2025-10-23 18:53:22 +08:00
launcher, env = self._resolve_pmd3_argv_and_env()
env["PYMOBILEDEVICE3_UDID"] = udid
env["PYTHONUNBUFFERED"] = "1"
env.setdefault("PYTHONIOENCODING", "utf-8")
if extra_env:
for k, v in extra_env.items():
if v is None:
env.pop(k, None)
else:
env[k] = str(v)
cmd = [*launcher, *args]
print("[pmd3]", " ".join(map(str, cmd)))
try:
2025-10-23 21:38:18 +08:00
return subprocess.check_output(
cmd,
text=True,
stderr=subprocess.STDOUT,
env=env,
**self._win_hidden_popen_kwargs()
) or ""
2025-10-23 18:53:22 +08:00
except subprocess.CalledProcessError as exc:
raise RuntimeError(exc.output or f"pymobiledevice3 执行失败,退出码 {exc.returncode}")
2025-10-24 22:04:28 +08:00
def _ensure_str_list(self, seq):
return [str(x) for x in seq]
2025-10-23 18:53:22 +08:00
2025-10-24 22:04:28 +08:00
def _win_hidden_popen_kwargs(self):
if os.name != "nt":
return {}
si = subprocess.STARTUPINFO()
si.dwFlags |= subprocess.STARTF_USESHOWWINDOW
si.wShowWindow = 0 # SW_HIDE
return {
"startupinfo": si,
"creationflags": getattr(subprocess, "CREATE_NO_WINDOW", 0x08000000),
}
2025-10-23 18:53:22 +08:00
def _resolve_pmd3_argv_and_env(self):
2025-10-24 22:04:28 +08:00
import shutil, subprocess
2025-10-23 18:53:22 +08:00
from pathlib import Path
2025-10-21 15:43:02 +08:00
env = os.environ.copy()
2025-10-23 18:53:22 +08:00
env["PYTHONUNBUFFERED"] = "1"
env.setdefault("PYTHONIOENCODING", "utf-8")
prefer_py = env.get("IOSAI_PYTHON")
base = Path(sys.argv[0]).resolve()
base_dir = base.parent if base.is_file() else base
py_name = "python.exe" if os.name == "nt" else "python"
sidecar_candidates = [
2025-10-24 22:04:28 +08:00
base_dir / "python-rt" / py_name,
base_dir / "python-rt" / "Scripts" / py_name,
2025-10-23 18:53:22 +08:00
base_dir.parent / "python-rt" / py_name,
base_dir.parent / "python-rt" / "Scripts" / py_name,
]
if prefer_py:
sidecar_candidates.insert(0, Path(prefer_py))
2025-10-21 15:43:02 +08:00
2025-10-23 18:53:22 +08:00
for cand in sidecar_candidates:
print(f"[IOSAI] 🔎 probing sidecar at: {cand}")
if cand.is_file():
try:
out = subprocess.check_output(
[str(cand), "-c", "import pymobiledevice3;print('ok')"],
2025-10-24 22:04:28 +08:00
text=True, stderr=subprocess.STDOUT, env=env, timeout=6, **self._win_hidden_popen_kwargs()
2025-10-23 18:53:22 +08:00
)
if "ok" in out:
print(f"[IOSAI] ✅ sidecar selected: {cand}")
return ([str(cand), "-u", "-m", "pymobiledevice3"], env)
except Exception:
pass
exe = shutil.which("pymobiledevice3")
if exe:
print(f"[IOSAI] ✅ use PATH executable: {exe}")
return ([exe], env)
py_candidates = []
base_exec = getattr(sys, "_base_executable", None)
if base_exec and os.path.isfile(base_exec):
py_candidates.append(base_exec)
for name in ("python3.exe", "python.exe", "py.exe", "python3", "python"):
p = shutil.which(name)
if p and p not in py_candidates:
py_candidates.append(p)
for py in py_candidates:
print(f"[IOSAI] 🔎 probing system python: {py}")
2025-10-21 15:43:02 +08:00
try:
out = subprocess.check_output(
2025-10-23 18:53:22 +08:00
[py, "-c", "import pymobiledevice3;print('ok')"],
2025-10-24 22:04:28 +08:00
text=True, stderr=subprocess.STDOUT, env=env, timeout=6, **self._win_hidden_popen_kwargs()
2025-10-21 15:43:02 +08:00
)
2025-10-23 18:53:22 +08:00
if "ok" in out:
print(f"[IOSAI] ✅ system python selected: {py}")
return ([py, "-u", "-m", "pymobiledevice3"], env)
except Exception:
continue
raise RuntimeError("未检测到可用的 pymobiledevice3建议携带 python-rt 或安装系统 Python+pmd3")
2025-10-24 22:04:28 +08:00
# -------- DDI / RSD / 启动 WDA (与你原逻辑一致) --------
2025-10-23 18:53:22 +08:00
def _auto_mount_developer_disk(self, udid: str, retries: int = 3, backoff_seconds: float = 2.0) -> None:
2025-10-24 22:04:28 +08:00
import time as _t
last_err = ""
2025-10-23 18:53:22 +08:00
for i in range(max(1, retries)):
try:
out = self._pmd3_run(["mounter", "auto-mount"], udid)
2025-10-21 15:43:02 +08:00
if out:
for line in out.splitlines():
print(f"[mounter] {line}")
2025-10-23 18:53:22 +08:00
if "already mounted" in (out or "").lower():
2025-10-21 15:43:02 +08:00
print("[mounter] Developer disk image already mounted.")
2025-10-23 18:53:22 +08:00
else:
print("[mounter] Developer disk image mounted.")
return
except Exception as e:
2025-10-24 22:04:28 +08:00
last_err = str(e)
2025-10-23 18:53:22 +08:00
if i < retries - 1:
print(f"[mounter] attempt {i+1}/{retries} failed, retrying in {backoff_seconds}s ...")
2025-10-24 22:04:28 +08:00
_t.sleep(backoff_seconds)
2025-10-23 18:53:22 +08:00
else:
2025-10-24 22:04:28 +08:00
raise RuntimeError(f"Auto-mount failed after {retries} attempts.\n{last_err}")
2025-10-21 15:43:02 +08:00
def _is_ipv4_host(self, host: str) -> bool:
2025-10-24 22:04:28 +08:00
return bool(re.match(r"^\d{1,3}(\.\d{1,3}){3}$", host))
2025-10-21 15:43:02 +08:00
def _wait_for_rsd_ready(self, rsd_host: str, rsd_port: str, retries: int = 5, delay: float = 3.0) -> bool:
port_int = int(rsd_port)
for i in range(1, retries + 1):
print(f"[rsd] Probing RSD {rsd_host}:{rsd_port} (attempt {i}/{retries}) ...")
try:
with socket.create_connection((rsd_host, port_int), timeout=2):
print("[rsd] ✅ RSD is reachable and ready.")
return True
except (socket.timeout, ConnectionRefusedError, OSError) as e:
print(f"[rsd] Not ready yet ({e}). Retrying...")
import time as _t
_t.sleep(delay)
print("[rsd] ❌ RSD did not become ready after retries.")
return False
def _launch_wda_via_rsd(self, bundle_id: str, rsd_host: str, rsd_port: str, udid: str) -> None:
print(f"[wda] Launch via RSD {rsd_host}:{rsd_port}, bundle: {bundle_id}")
2025-10-24 22:04:28 +08:00
out = self._pmd3_run(["developer", "dvt", "launch", bundle_id, "--rsd", rsd_host, rsd_port], udid=udid)
2025-10-21 15:43:02 +08:00
if out:
for line in out.splitlines():
print(f"[wda] {line}")
print("[wda] Launch via RSD completed.")
def _launch_wda_via_http_tunnel(self, bundle_id: str, http_host: str, http_port: str, udid: str) -> None:
if not self._is_ipv4_host(http_host):
raise RuntimeError(f"HTTP tunnel host must be IPv4, got {http_host}")
tunnel_endpoint = f"{http_host}:{http_port}"
print(f"[wda] Launch via HTTP tunnel {tunnel_endpoint}, bundle: {bundle_id}")
2025-10-24 22:04:28 +08:00
out = self._pmd3_run(["developer", "dvt", "launch", bundle_id], udid=udid,
extra_env={"PYMOBILEDEVICE3_TUNNEL": tunnel_endpoint})
2025-10-21 15:43:02 +08:00
if out:
for line in out.splitlines():
print(f"[wda] {line}")
print("[wda] Launch via HTTP tunnel completed.")
2025-10-24 22:04:28 +08:00
# -------- 端口挑选 --------
2025-10-21 15:43:02 +08:00
def _pick_available_port(self, base=49151, step=10) -> int:
for i in range(0, 1000, step):
port = base + i
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
if s.connect_ex(("127.0.0.1", port)) != 0:
return port
raise RuntimeError("No free port found for tunneld")
2025-10-24 22:04:28 +08:00
# -------- UDID 过滤 --------
2025-10-23 18:53:22 +08:00
def _line_is_for_udid(self, line: str, udid: str) -> bool:
try:
return udid.lower() in (line or "").lower()
except Exception:
2025-10-24 22:04:28 +08:00
return False