Files
iOSAI/Module/IOSActivator.py

474 lines
19 KiB
Python
Raw Normal View History

2025-10-21 15:43:02 +08:00
import os
import re
import sys
2025-10-23 18:53:22 +08:00
import socket
2025-10-21 15:43:02 +08:00
import subprocess
from typing import Optional
from Entity.Variables import WdaAppBundleId
class IOSActivator:
"""
2025-10-23 18:53:22 +08:00
轻量 iOS 激活器仅代码调用 - 打包安全版
1) 启动 `pymobiledevice3 remote tunneld`子进程常驻
2) 自动挂载 Developer Disk Image进程内调用 CLI
3) 设备隧道就绪后启动 WDA进程内调用 CLI
- 优先使用 `--rsd <host> <port>`支持 IPv6
- 失败再用 `PYMOBILEDEVICE3_TUNNEL=127.0.0.1:<port>` 回退 IPv4
2025-10-21 15:43:02 +08:00
"""
def __init__(self, python_executable: Optional[str] = None):
2025-10-23 18:53:22 +08:00
# 仅用于旧逻辑兼容;本版本已不依赖 self.python 去 -m 调 CLI
self.python = python_executable or None
# =========================================================================
# 内部工具:进程内调用 pymobiledevice3 CLI
# =========================================================================
def _pmd3_run(self, args: list[str], udid: str, extra_env: Optional[dict] = None) -> str:
import subprocess
launcher, env = self._resolve_pmd3_argv_and_env()
env["PYMOBILEDEVICE3_UDID"] = udid
env["PYTHONUNBUFFERED"] = "1"
env.setdefault("PYTHONIOENCODING", "utf-8")
if extra_env:
for k, v in extra_env.items():
if v is None:
env.pop(k, None)
else:
env[k] = str(v)
cmd = [*launcher, *args]
print("[pmd3]", " ".join(map(str, cmd)))
try:
2025-10-23 21:38:18 +08:00
return subprocess.check_output(
cmd,
text=True,
stderr=subprocess.STDOUT,
env=env,
**self._win_hidden_popen_kwargs()
) or ""
2025-10-23 18:53:22 +08:00
except subprocess.CalledProcessError as exc:
raise RuntimeError(exc.output or f"pymobiledevice3 执行失败,退出码 {exc.returncode}")
def _pmd3_run_subprocess(self, full_args: list[str], extra_env: Optional[dict] = None) -> str:
"""
兜底通过子进程执行 pymobiledevice3使用 _resolve_pmd3_argv_and_env()
"""
import subprocess
launcher, env = self._resolve_pmd3_argv_and_env()
if extra_env:
for k, v in extra_env.items():
if v is None:
env.pop(k, None)
else:
env[k] = str(v)
cmd = [*launcher, *full_args]
cmd = self._ensure_str_list(cmd)
print("[pmd3-subproc]", " ".join(cmd))
try:
2025-10-23 21:38:18 +08:00
out = subprocess.check_output(
cmd,
text=True,
stderr=subprocess.STDOUT,
env=env,
**self._win_hidden_popen_kwargs()
)
2025-10-23 18:53:22 +08:00
return out or ""
except subprocess.CalledProcessError as exc:
raise RuntimeError(exc.output or f"pymobiledevice3 子进程执行失败,代码 {exc.returncode}")
# =========================================================================
# 旧版环境依赖的替代方案:仅用于启动 tunneld 的子进程(需要常驻)
# =========================================================================
def _resolve_pmd3_argv_and_env(self):
import os, sys, shutil, subprocess
from pathlib import Path
2025-10-21 15:43:02 +08:00
env = os.environ.copy()
2025-10-23 18:53:22 +08:00
env["PYTHONUNBUFFERED"] = "1"
env.setdefault("PYTHONIOENCODING", "utf-8")
# 1) 明确使用 IOSAI_PYTHON如果上层已设置
prefer_py = env.get("IOSAI_PYTHON")
# 2) 构造一批候选路径(先根目录,再 Scripts
base = Path(sys.argv[0]).resolve()
base_dir = base.parent if base.is_file() else base
py_name = "python.exe" if os.name == "nt" else "python"
sidecar_candidates = [
base_dir / "python-rt" / py_name, # ✅ 嵌入式/你现在的摆放方式
base_dir / "python-rt" / "Scripts" / py_name, # 兼容虚拟环境式
base_dir.parent / "python-rt" / py_name,
base_dir.parent / "python-rt" / "Scripts" / py_name,
]
# 若外层已通过 IOSAI_PYTHON 指了路径,也放进候选
if prefer_py:
sidecar_candidates.insert(0, Path(prefer_py))
2025-10-21 15:43:02 +08:00
2025-10-23 18:53:22 +08:00
# 打印探测
for cand in sidecar_candidates:
print(f"[IOSAI] 🔎 probing sidecar at: {cand}")
if cand.is_file():
try:
out = subprocess.check_output(
[str(cand), "-c", "import pymobiledevice3;print('ok')"],
2025-10-23 21:38:18 +08:00
text=True,
stderr=subprocess.STDOUT,
env=env,
timeout=6,
**self._win_hidden_popen_kwargs()
2025-10-23 18:53:22 +08:00
)
if "ok" in out:
print(f"[IOSAI] ✅ sidecar selected: {cand}")
return ([str(cand), "-u", "-m", "pymobiledevice3"], env)
except Exception:
# 候选解释器存在但没装 pmd3就继续找下一个
pass
# 3) 回退:系统 PATH 里直接找 pymobiledevice3 可执行
exe = shutil.which("pymobiledevice3")
if exe:
print(f"[IOSAI] ✅ use PATH executable: {exe}")
return ([exe], env)
# 4) 兜底:从系统 Python 里找装了 pmd3 的解释器
py_candidates = []
base_exec = getattr(sys, "_base_executable", None)
if base_exec and os.path.isfile(base_exec):
py_candidates.append(base_exec)
for name in ("python3.exe", "python.exe", "py.exe", "python3", "python"):
p = shutil.which(name)
if p and p not in py_candidates:
py_candidates.append(p)
for py in py_candidates:
print(f"[IOSAI] 🔎 probing system python: {py}")
2025-10-21 15:43:02 +08:00
try:
out = subprocess.check_output(
2025-10-23 18:53:22 +08:00
[py, "-c", "import pymobiledevice3;print('ok')"],
2025-10-23 21:38:18 +08:00
text=True,
stderr=subprocess.STDOUT,
env=env,
timeout=6,
**self._win_hidden_popen_kwargs()
2025-10-21 15:43:02 +08:00
)
2025-10-23 18:53:22 +08:00
if "ok" in out:
print(f"[IOSAI] ✅ system python selected: {py}")
return ([py, "-u", "-m", "pymobiledevice3"], env)
except Exception:
continue
raise RuntimeError("未检测到可用的 pymobiledevice3建议携带 python-rt 或安装系统 Python+pmd3")
def _ensure_str_list(self, seq):
return [str(x) for x in seq]
2025-10-23 21:38:18 +08:00
def _win_hidden_popen_kwargs(self):
"""在 Windows 上隐藏子进程窗口;非 Windows 返回空参数。"""
if os.name != "nt":
return {}
import subprocess as _sp
si = _sp.STARTUPINFO()
si.dwFlags |= _sp.STARTF_USESHOWWINDOW
si.wShowWindow = 0 # SW_HIDE
return {
"startupinfo": si,
"creationflags": getattr(_sp, "CREATE_NO_WINDOW", 0x08000000),
}
2025-10-23 18:53:22 +08:00
# =========================================================================
# 功能函数-
# =========================================================================
def _auto_mount_developer_disk(self, udid: str, retries: int = 3, backoff_seconds: float = 2.0) -> None:
"""
使用进程内 CLIpymobiledevice3 mounter auto-mount带重试
"""
import time
last_err_text = ""
for i in range(max(1, retries)):
try:
out = self._pmd3_run(["mounter", "auto-mount"], udid)
2025-10-21 15:43:02 +08:00
if out:
for line in out.splitlines():
print(f"[mounter] {line}")
2025-10-23 18:53:22 +08:00
if "already mounted" in (out or "").lower():
2025-10-21 15:43:02 +08:00
print("[mounter] Developer disk image already mounted.")
2025-10-23 18:53:22 +08:00
else:
print("[mounter] Developer disk image mounted.")
return
except Exception as e:
last_err_text = str(e)
if i < retries - 1:
print(f"[mounter] attempt {i+1}/{retries} failed, retrying in {backoff_seconds}s ...")
2025-10-21 15:43:02 +08:00
try:
2025-10-23 18:53:22 +08:00
time.sleep(backoff_seconds)
2025-10-21 15:43:02 +08:00
except Exception:
pass
2025-10-23 18:53:22 +08:00
else:
raise RuntimeError(f"Auto-mount failed after {retries} attempts.\n{last_err_text}")
2025-10-21 15:43:02 +08:00
def _is_ipv4_host(self, host: str) -> bool:
2025-10-23 18:53:22 +08:00
import re as _re
return bool(_re.match(r"^\d{1,3}(\.\d{1,3}){3}$", host))
2025-10-21 15:43:02 +08:00
def _wait_for_rsd_ready(self, rsd_host: str, rsd_port: str, retries: int = 5, delay: float = 3.0) -> bool:
"""
探测 RSD 通道是否就绪直接尝试 TCP 连接
"""
port_int = int(rsd_port)
for i in range(1, retries + 1):
print(f"[rsd] Probing RSD {rsd_host}:{rsd_port} (attempt {i}/{retries}) ...")
try:
with socket.create_connection((rsd_host, port_int), timeout=2):
print("[rsd] ✅ RSD is reachable and ready.")
return True
except (socket.timeout, ConnectionRefusedError, OSError) as e:
print(f"[rsd] Not ready yet ({e}). Retrying...")
import time as _t
_t.sleep(delay)
print("[rsd] ❌ RSD did not become ready after retries.")
return False
def _launch_wda_via_rsd(self, bundle_id: str, rsd_host: str, rsd_port: str, udid: str) -> None:
"""
2025-10-23 18:53:22 +08:00
使用进程内 CLIpymobiledevice3 developer dvt launch <bundle> --rsd <host> <port>
2025-10-21 15:43:02 +08:00
"""
print(f"[wda] Launch via RSD {rsd_host}:{rsd_port}, bundle: {bundle_id}")
2025-10-23 18:53:22 +08:00
out = self._pmd3_run(
["developer", "dvt", "launch", bundle_id, "--rsd", rsd_host, rsd_port],
udid=udid,
)
2025-10-21 15:43:02 +08:00
if out:
for line in out.splitlines():
print(f"[wda] {line}")
print("[wda] Launch via RSD completed.")
def _launch_wda_via_http_tunnel(self, bundle_id: str, http_host: str, http_port: str, udid: str) -> None:
"""
退路通过 HTTP 网关端口设置 PYMOBILEDEVICE3_TUNNEL IPv4
2025-10-23 18:53:22 +08:00
使用进程内 CLI 启动 WDA
2025-10-21 15:43:02 +08:00
"""
if not self._is_ipv4_host(http_host):
raise RuntimeError(f"HTTP tunnel host must be IPv4, got {http_host}")
tunnel_endpoint = f"{http_host}:{http_port}"
print(f"[wda] Launch via HTTP tunnel {tunnel_endpoint}, bundle: {bundle_id}")
2025-10-23 18:53:22 +08:00
out = self._pmd3_run(
["developer", "dvt", "launch", bundle_id],
udid=udid,
extra_env={"PYMOBILEDEVICE3_TUNNEL": tunnel_endpoint},
)
2025-10-21 15:43:02 +08:00
if out:
for line in out.splitlines():
print(f"[wda] {line}")
print("[wda] Launch via HTTP tunnel completed.")
def _pick_available_port(self, base=49151, step=10) -> int:
"""挑选一个可用端口(避免重复)"""
for i in range(0, 1000, step):
port = base + i
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
if s.connect_ex(("127.0.0.1", port)) != 0:
return port
raise RuntimeError("No free port found for tunneld")
2025-10-23 18:53:22 +08:00
# =========================================================================
2025-10-21 15:43:02 +08:00
# 对外方法
2025-10-23 18:53:22 +08:00
# =========================================================================
2025-10-21 15:43:02 +08:00
def activate(
2025-10-23 18:53:22 +08:00
self,
udid: str,
wda_bundle_id: Optional[str] = WdaAppBundleId,
ready_timeout_sec: float = 120.0,
mount_retries: int = 3,
backoff_seconds: float = 2.0,
rsd_probe_retries: int = 5,
rsd_probe_delay_sec: float = 3.0,
pre_mount_first: bool = True,
2025-10-21 15:43:02 +08:00
) -> str:
"""
2025-10-23 18:53:22 +08:00
执行挂镜像(可选) -> 开隧道 -> 等待 RSD 就绪-> 启动 WDA
- 优先用 `--rsd` 启动
- 失败再用 HTTP 网关作为退路
2025-10-21 15:43:02 +08:00
"""
if not udid or not isinstance(udid, str):
raise ValueError("udid is required and must be a non-empty string")
print(f"[activate] UDID = {udid}")
2025-10-23 18:53:22 +08:00
# ⚠️ 检查管理员权限
if os.name == "nt":
import ctypes
try:
is_admin = ctypes.windll.shell32.IsUserAnAdmin() != 0
except Exception:
is_admin = False
if not is_admin:
print("[⚠️] 当前进程未以管理员身份运行tunneld 可能无法创建 USB 隧道。")
import time as _t
start_ts = _t.time()
# 1) (可选) 先挂载,避免 tidevice 并发 DDI 竞态
if pre_mount_first:
try:
self._auto_mount_developer_disk(udid, retries=mount_retries, backoff_seconds=backoff_seconds)
_t.sleep(2) # 稳定 DDI
except Exception as e:
print(f"[activate] 预挂载失败(继续尝试开隧道后再挂载一次):{e}")
# 2) 启动 tunneld 子进程
2025-10-21 15:43:02 +08:00
port = self._pick_available_port()
2025-10-23 18:53:22 +08:00
launcher, env2 = self._resolve_pmd3_argv_and_env()
env2["PYTHONUNBUFFERED"] = "1"
env2.setdefault("PYTHONIOENCODING", "utf-8")
env2["PYMOBILEDEVICE3_UDID"] = udid
cmd = self._ensure_str_list([*launcher, "remote", "tunneld", "--port", str(port)])
print("[activate] 使用命令启动隧道:", " ".join(cmd))
2025-10-21 15:43:02 +08:00
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
2025-10-23 18:53:22 +08:00
bufsize=1,
universal_newlines=True,
env=env2,
2025-10-23 21:38:18 +08:00
**self._win_hidden_popen_kwargs()
2025-10-21 15:43:02 +08:00
)
captured: list[str] = []
http_host: Optional[str] = None
http_port: Optional[str] = None
rsd_host: Optional[str] = None
rsd_port: Optional[str] = None
device_tunnel_ready = False
wda_started = False
2025-10-23 18:53:22 +08:00
mount_done = pre_mount_first
2025-10-21 15:43:02 +08:00
HTTP_RE = re.compile(r"http://([0-9.]+):(\d+)")
RSD_CREATED_RE = re.compile(r"Created tunnel\s+--rsd\s+([^\s]+)\s+(\d+)")
2025-10-23 18:53:22 +08:00
RSD_FALLBACK_RE = re.compile(r"--rsd\s+(\S+?)[\s:](\d+)")
2025-10-21 15:43:02 +08:00
try:
assert proc.stdout is not None
for line in proc.stdout:
captured.append(line)
print(f"[tunneld] {line}", end="")
if proc.poll() is not None:
break
# 捕获 HTTP 网关端口
if http_port is None:
m = HTTP_RE.search(line)
if m:
http_host, http_port = m.group(1), m.group(2)
print(f"[tunneld] Tunnel API: {http_host}:{http_port}")
2025-10-23 18:53:22 +08:00
# ✅ 捕获 RSD仅识别当前 UDID 的行)
if self._line_is_for_udid(line, udid):
m = RSD_CREATED_RE.search(line) or RSD_FALLBACK_RE.search(line)
if m and not device_tunnel_ready:
rsd_host, rsd_port = m.group(1), m.group(2)
device_tunnel_ready = True
print(f"[tunneld] Device-level tunnel ready (RSD {rsd_host}:{rsd_port}).")
else:
continue # 其它设备的隧道日志忽略
# 当设备隧道准备好后启动 WDA
2025-10-21 15:43:02 +08:00
if (not wda_started) and wda_bundle_id and device_tunnel_ready:
try:
if not mount_done:
self._auto_mount_developer_disk(
udid, retries=mount_retries, backoff_seconds=backoff_seconds
)
2025-10-23 18:53:22 +08:00
_t.sleep(2)
2025-10-21 15:43:02 +08:00
mount_done = True
rsd_ok = False
if rsd_host and rsd_port:
rsd_ok = self._wait_for_rsd_ready(
rsd_host, rsd_port,
retries=rsd_probe_retries,
delay=rsd_probe_delay_sec,
)
2025-10-23 18:53:22 +08:00
2025-10-21 15:43:02 +08:00
if rsd_ok:
self._launch_wda_via_rsd(
bundle_id=wda_bundle_id,
2025-10-23 18:53:22 +08:00
rsd_host=rsd_host,
rsd_port=rsd_port,
2025-10-21 15:43:02 +08:00
udid=udid,
)
else:
if http_host and http_port:
self._launch_wda_via_http_tunnel(
bundle_id=wda_bundle_id,
http_host=http_host,
http_port=http_port,
udid=udid,
)
else:
raise RuntimeError("No valid tunnel endpoint for fallback.")
wda_started = True
except RuntimeError as exc:
print(str(exc), file=sys.stderr)
proc.terminate()
try:
proc.wait(timeout=5)
except subprocess.TimeoutExpired:
proc.kill()
raise
2025-10-23 18:53:22 +08:00
# 超时保护
2025-10-21 15:43:02 +08:00
if (not wda_started) and ready_timeout_sec > 0 and (_t.time() - start_ts > ready_timeout_sec):
print(f"[tunneld] Timeout waiting for device tunnel ({ready_timeout_sec}s). Aborting.")
proc.terminate()
try:
proc.wait(timeout=5)
except subprocess.TimeoutExpired:
proc.kill()
break
2025-10-23 18:53:22 +08:00
# 结束清理
2025-10-21 15:43:02 +08:00
try:
return_code = proc.wait(timeout=5)
except subprocess.TimeoutExpired:
proc.kill()
return_code = proc.returncode or -9
output = "".join(captured)
if return_code != 0 and not wda_started:
raise RuntimeError(f"tunneld exited with code {return_code}.\n{output}")
print("[activate] Done.")
return output
except KeyboardInterrupt:
print("\n[activate] Interrupted, cleaning up ...", file=sys.stderr)
try:
proc.terminate()
proc.wait(timeout=5)
except Exception:
try:
proc.kill()
except Exception:
pass
raise
2025-10-23 18:53:22 +08:00
def _line_is_for_udid(self, line: str, udid: str) -> bool:
"""日志行是否属于目标 UDID。"""
try:
return udid.lower() in (line or "").lower()
except Exception:
return False