Files
iOSAI/Module/IOSActivator.py
2025-10-21 15:43:02 +08:00

301 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import random
import re
import socket
import sys
import subprocess
from typing import Optional
from Entity.Variables import WdaAppBundleId
class IOSActivator:
"""
轻量 iOS 激活器(仅代码调用):
1) 启动 `pymobiledevice3 remote tunneld`(基于传入 UDID
2) 自动挂载 Developer Disk Image
3) 设备隧道就绪后启动 WDA
- 优先使用 `--rsd <host> <port>` 直连(支持 IPv6
- 失败再用 `PYMOBILEDEVICE3_TUNNEL=127.0.0.1:<port>` 作为退路(仅 IPv4
"""
def __init__(self, python_executable: Optional[str] = None):
self.python = python_executable or sys.executable
# --------------------------
# 内部工具
# --------------------------
def _auto_mount_developer_disk(self, udid: str, retries: int = 3, backoff_seconds: float = 2.0) -> None:
"""使用 `pymobiledevice3 mounter auto-mount` 为指定 UDID 挂载开发者镜像(带重试)。"""
env = os.environ.copy()
env["PYMOBILEDEVICE3_UDID"] = udid
max_attempts = max(1, int(retries))
last_err = None
for i in range(max_attempts):
try:
out = subprocess.check_output(
[self.python, "-m", "pymobiledevice3", "mounter", "auto-mount"],
text=True,
stderr=subprocess.STDOUT,
env=env,
)
if out:
for line in out.splitlines():
print(f"[mounter] {line}")
print("[mounter] Developer disk image mounted.")
return
except subprocess.CalledProcessError as exc:
lowered = (exc.output or "").lower()
if "already mounted" in lowered:
print("[mounter] Developer disk image already mounted.")
return
last_err = exc
if i < max_attempts - 1:
print(f"[mounter] attempt {i+1}/{max_attempts} failed, retrying in {backoff_seconds}s ...")
try:
import time as _t
_t.sleep(backoff_seconds)
except Exception:
pass
msg = last_err.output if isinstance(last_err, subprocess.CalledProcessError) else str(last_err)
raise RuntimeError(f"Auto-mount failed after {max_attempts} attempts: {msg}")
def _is_ipv4_host(self, host: str) -> bool:
return bool(re.match(r"^\d{1,3}(\.\d{1,3}){3}$", host))
def _wait_for_rsd_ready(self, rsd_host: str, rsd_port: str, retries: int = 5, delay: float = 3.0) -> bool:
"""
探测 RSD 通道是否就绪:直接尝试 TCP 连接。
"""
port_int = int(rsd_port)
for i in range(1, retries + 1):
print(f"[rsd] Probing RSD {rsd_host}:{rsd_port} (attempt {i}/{retries}) ...")
try:
with socket.create_connection((rsd_host, port_int), timeout=2):
print("[rsd] ✅ RSD is reachable and ready.")
return True
except (socket.timeout, ConnectionRefusedError, OSError) as e:
print(f"[rsd] Not ready yet ({e}). Retrying...")
import time as _t
_t.sleep(delay)
print("[rsd] ❌ RSD did not become ready after retries.")
return False
def _launch_wda_via_rsd(self, bundle_id: str, rsd_host: str, rsd_port: str, udid: str) -> None:
"""
使用 `--rsd <host> <port>` 直连设备隧道来启动 WDA推荐路径IPv4/IPv6 都 OK
不设置 PYMOBILEDEVICE3_TUNNEL避免 IPv6 解析问题。
"""
print(f"[wda] Launch via RSD {rsd_host}:{rsd_port}, bundle: {bundle_id}")
env = os.environ.copy()
env["PYMOBILEDEVICE3_UDID"] = udid
args = [
self.python, "-m", "pymobiledevice3",
"developer", "dvt", "launch", bundle_id,
"--rsd", rsd_host, rsd_port,
]
try:
out = subprocess.check_output(args, text=True, stderr=subprocess.STDOUT, env=env)
except subprocess.CalledProcessError as exc:
raise RuntimeError(f"WDA launch via RSD failed: {exc.output}") from exc
if out:
for line in out.splitlines():
print(f"[wda] {line}")
print("[wda] Launch via RSD completed.")
def _launch_wda_via_http_tunnel(self, bundle_id: str, http_host: str, http_port: str, udid: str) -> None:
"""
退路:通过 HTTP 网关端口设置 PYMOBILEDEVICE3_TUNNEL仅 IPv4
"""
if not self._is_ipv4_host(http_host):
raise RuntimeError(f"HTTP tunnel host must be IPv4, got {http_host}")
tunnel_endpoint = f"{http_host}:{http_port}"
print(f"[wda] Launch via HTTP tunnel {tunnel_endpoint}, bundle: {bundle_id}")
env = os.environ.copy()
env["PYMOBILEDEVICE3_TUNNEL"] = tunnel_endpoint
env["PYMOBILEDEVICE3_UDID"] = udid
args = [self.python, "-m", "pymobiledevice3", "developer", "dvt", "launch", bundle_id]
try:
out = subprocess.check_output(args, text=True, stderr=subprocess.STDOUT, env=env)
except subprocess.CalledProcessError as exc:
raise RuntimeError(f"WDA launch via HTTP tunnel failed: {exc.output}") from exc
if out:
for line in out.splitlines():
print(f"[wda] {line}")
print("[wda] Launch via HTTP tunnel completed.")
def _pick_available_port(self, base=49151, step=10) -> int:
"""挑选一个可用端口(避免重复)"""
for i in range(0, 1000, step):
port = base + i
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
if s.connect_ex(("127.0.0.1", port)) != 0:
return port
raise RuntimeError("No free port found for tunneld")
# --------------------------
# 对外方法
# --------------------------
def activate(
self,
udid: str,
wda_bundle_id: Optional[str] = WdaAppBundleId,
ready_timeout_sec: float = 120.0,
mount_retries: int = 3,
backoff_seconds: float = 2.0,
rsd_probe_retries: int = 5,
rsd_probe_delay_sec: float = 3.0,
) -> str:
"""
执行:开隧道 -> (等待 RSD 就绪)-> 挂载镜像 -> 启动 WDA
- 优先用 `--rsd` 启动(先做 dvt list 探测)
- 失败再用 HTTP 端口作为退路
"""
if not udid or not isinstance(udid, str):
raise ValueError("udid is required and must be a non-empty string")
print(f"[activate] UDID = {udid}")
env = os.environ.copy()
env["PYMOBILEDEVICE3_UDID"] = udid
# 1) 开隧道(子进程常驻)
port = self._pick_available_port()
cmd = [self.python, "-m", "pymobiledevice3", "remote", "tunneld", "--port", str(port)]
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
env=env,
)
captured: list[str] = []
http_host: Optional[str] = None
http_port: Optional[str] = None
rsd_host: Optional[str] = None
rsd_port: Optional[str] = None
device_tunnel_ready = False
wda_started = False
mount_done = False
import time as _t
start_ts = _t.time()
HTTP_RE = re.compile(r"http://([0-9.]+):(\d+)")
RSD_CREATED_RE = re.compile(r"Created tunnel\s+--rsd\s+([^\s]+)\s+(\d+)")
try:
assert proc.stdout is not None
for line in proc.stdout:
captured.append(line)
print(f"[tunneld] {line}", end="")
if proc.poll() is not None:
break
# 捕获 HTTP 网关端口
if http_port is None:
m = HTTP_RE.search(line)
if m:
http_host, http_port = m.group(1), m.group(2)
print(f"[tunneld] Tunnel API: {http_host}:{http_port}")
# 捕获设备 RSD可能 IPv6
m = RSD_CREATED_RE.search(line)
if m:
rsd_host, rsd_port = m.group(1), m.group(2)
device_tunnel_ready = True
print(f"[tunneld] Device-level tunnel ready (RSD {rsd_host}:{rsd_port}).")
# 条件满足后推进
if (not wda_started) and wda_bundle_id and device_tunnel_ready:
try:
if not mount_done:
self._auto_mount_developer_disk(
udid, retries=mount_retries, backoff_seconds=backoff_seconds
)
mount_done = True
# 先做 RSD 就绪探测,再走 RSD 启动
rsd_ok = False
if rsd_host and rsd_port:
rsd_ok = self._wait_for_rsd_ready(
rsd_host, rsd_port,
retries=rsd_probe_retries,
delay=rsd_probe_delay_sec,
)
if rsd_ok:
self._launch_wda_via_rsd(
bundle_id=wda_bundle_id,
rsd_host=rsd_host, # type: ignore[arg-type]
rsd_port=rsd_port, # type: ignore[arg-type]
udid=udid,
)
else:
# RSD 不就绪或失败,回退到 HTTP 网关(必须是 IPv4
if http_host and http_port:
self._launch_wda_via_http_tunnel(
bundle_id=wda_bundle_id,
http_host=http_host,
http_port=http_port,
udid=udid,
)
else:
raise RuntimeError("No valid tunnel endpoint for fallback.")
wda_started = True
except RuntimeError as exc:
print(str(exc), file=sys.stderr)
proc.terminate()
try:
proc.wait(timeout=5)
except subprocess.TimeoutExpired:
proc.kill()
raise
# 超时保护(仍未启动 WDA
if (not wda_started) and ready_timeout_sec > 0 and (_t.time() - start_ts > ready_timeout_sec):
print(f"[tunneld] Timeout waiting for device tunnel ({ready_timeout_sec}s). Aborting.")
proc.terminate()
try:
proc.wait(timeout=5)
except subprocess.TimeoutExpired:
proc.kill()
break
# 等待子进程结束(若已结束)
try:
return_code = proc.wait(timeout=5)
except subprocess.TimeoutExpired:
proc.kill()
return_code = proc.returncode or -9
output = "".join(captured)
if return_code != 0 and not wda_started:
raise RuntimeError(f"tunneld exited with code {return_code}.\n{output}")
print("[activate] Done.")
return output
except KeyboardInterrupt:
print("\n[activate] Interrupted, cleaning up ...", file=sys.stderr)
try:
proc.terminate()
proc.wait(timeout=5)
except Exception:
try:
proc.kill()
except Exception:
pass
raise