Files
iOSAI/Module/DeviceInfo.py

204 lines
6.9 KiB
Python
Raw Normal View History

2025-08-15 20:04:59 +08:00
import os
2025-08-28 19:51:57 +08:00
import signal
2025-09-22 14:36:05 +08:00
import subprocess
2025-08-01 13:43:51 +08:00
import time
2025-09-17 22:23:57 +08:00
from concurrent.futures import ThreadPoolExecutor, as_completed
2025-08-15 20:04:59 +08:00
from pathlib import Path
2025-09-22 14:36:05 +08:00
from typing import Dict, Optional, List
2025-09-24 16:32:05 +08:00
import tidevice
2025-09-18 21:31:23 +08:00
import wda
2025-09-04 20:47:14 +08:00
from tidevice import Usbmux, ConnectionType
2025-09-08 13:48:21 +08:00
from tidevice._device import BaseDevice
2025-08-01 13:43:51 +08:00
from Entity.DeviceModel import DeviceModel
2025-09-24 16:32:05 +08:00
from Entity.Variables import WdaAppBundleId
2025-08-01 13:43:51 +08:00
from Module.FlaskSubprocessManager import FlaskSubprocessManager
from Utils.LogManager import LogManager
2025-08-01 13:43:51 +08:00
2025-09-11 22:46:55 +08:00
2025-09-22 14:36:05 +08:00
class DeviceInfo:
2025-08-01 13:43:51 +08:00
def __init__(self):
2025-09-22 14:36:05 +08:00
self._port = 9110
2025-09-23 20:17:33 +08:00
self._models: Dict[str, DeviceModel] = {}
self._procs: Dict[str, subprocess.Popen] = {}
2025-09-22 14:36:05 +08:00
self._manager = FlaskSubprocessManager.get_instance()
self._iproxy_path = self._find_iproxy()
self._pool = ThreadPoolExecutor(max_workers=6)
# ---------------- 主循环 ----------------
def listen(self):
2025-09-15 22:40:45 +08:00
while True:
2025-09-22 14:36:05 +08:00
online = {d.udid for d in Usbmux().device_list() if d.conn_type == ConnectionType.USB}
# 拔掉——同步
for udid in list(self._models):
if udid not in online:
self._remove_device(udid)
# 插上——异步
new = [u for u in online if u not in self._models]
if new:
futures = {self._pool.submit(self._add_device, u): u for u in new}
for f in as_completed(futures, timeout=30):
try:
f.result()
except Exception as e:
LogManager.error(f"异步连接失败:{e}")
2025-09-15 22:40:45 +08:00
time.sleep(1)
2025-09-22 14:36:05 +08:00
# ---------------- 新增设备 ----------------
def _add_device(self, udid: str):
if not self._trusted(udid):
return
2025-09-24 16:32:05 +08:00
r = self.startWda(udid)
if r is False:
LogManager.info("启动wda失败")
return
w, h, s = self._screen_info(udid)
if w == 0 or h == 0 or s == 0:
print("未获取到设备屏幕信息")
return
2025-09-22 14:36:05 +08:00
port = self._alloc_port()
proc = self._start_iproxy(udid, port)
if not proc:
2025-09-17 15:43:23 +08:00
return
2025-09-22 14:36:05 +08:00
model = DeviceModel(deviceId=udid, screenPort=port,
width=w, height=h, scale=s, type=1)
2025-09-12 21:36:29 +08:00
model.ready = True
2025-09-22 14:36:05 +08:00
self._models[udid] = model
self._procs[udid] = proc
self._manager_send(model)
# ---------------- 移除设备 ----------------
def _remove_device(self, udid: str):
model = self._models.pop(udid, None)
if not model:
return
2025-09-22 14:36:05 +08:00
model.type = 2
self._kill(self._procs.pop(udid, None))
self._manager_send(model)
2025-09-22 14:36:05 +08:00
# ---------------- 工具函数 ----------------
def _trusted(self, udid: str) -> bool:
try:
2025-09-22 14:36:05 +08:00
BaseDevice(udid).get_value("DeviceName")
return True
except Exception:
return False
2025-09-17 22:23:57 +08:00
2025-09-24 16:32:05 +08:00
def startWda(self, udid):
print("进入启动wda方法")
try:
dev = tidevice.Device(udid)
print("获取tidevice对象成功准备启动wda")
dev.app_start(WdaAppBundleId)
print("启动wda成功")
time.sleep(3)
return True
except Exception as e:
print("启动wda遇到错误:", e)
return False
2025-09-22 14:36:05 +08:00
def _screen_info(self, udid: str):
2025-08-15 20:04:59 +08:00
try:
2025-09-22 14:36:05 +08:00
c = wda.USBClient(udid, 8100)
2025-09-24 16:32:05 +08:00
c.home()
2025-09-22 14:36:05 +08:00
size = c.window_size()
scale = c.scale
return int(size.width), int(size.height), float(scale)
2025-09-24 16:32:05 +08:00
return 0, 0, 0
except Exception as e:
print("获取设备信息遇到错误:",e)
return 0, 0, 0
2025-09-17 22:23:57 +08:00
2025-09-23 20:17:33 +08:00
...
# ---------------- 原来代码不变,只替换下面一个函数 ----------------
2025-09-22 14:36:05 +08:00
def _start_iproxy(self, udid: str, port: int) -> Optional[subprocess.Popen]:
2025-08-15 20:04:59 +08:00
try:
2025-09-23 20:17:33 +08:00
# 隐藏窗口的核心参数
kw = {"creationflags": subprocess.CREATE_NO_WINDOW}
2025-09-22 14:36:05 +08:00
return subprocess.Popen(
[self._iproxy_path, "-u", udid, str(port), "9100"],
2025-09-23 20:17:33 +08:00
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
**kw
2025-09-22 14:36:05 +08:00
)
2025-09-23 20:17:33 +08:00
except Exception as e:
print(e)
2025-09-22 14:36:05 +08:00
return None
2025-09-17 22:23:57 +08:00
2025-09-22 14:36:05 +08:00
def _kill(self, proc: Optional[subprocess.Popen]):
if not proc:
2025-09-20 20:07:16 +08:00
return
2025-09-11 22:46:55 +08:00
try:
2025-09-22 14:36:05 +08:00
proc.terminate()
proc.wait(timeout=2)
except Exception:
try:
os.kill(proc.pid, signal.SIGKILL)
except Exception:
pass
2025-09-20 20:07:16 +08:00
2025-09-22 14:36:05 +08:00
def _alloc_port(self) -> int:
self._port += 1
return self._port
2025-09-20 20:07:16 +08:00
2025-09-22 14:36:05 +08:00
def _manager_send(self, model: DeviceModel):
2025-09-11 22:46:55 +08:00
try:
2025-09-22 14:36:05 +08:00
self._manager.send(model.toDict())
except Exception:
pass
2025-09-11 22:46:55 +08:00
2025-09-22 14:36:05 +08:00
def _find_iproxy(self) -> str:
base = Path(__file__).resolve().parent.parent
name = "iproxy.exe"
path = base / "resources" / "iproxy" / name
2025-09-23 20:17:33 +08:00
print(str(path))
2025-09-22 14:36:05 +08:00
if path.is_file():
return str(path)
raise FileNotFoundError(f"iproxy 不存在: {path}")
2025-09-16 15:31:55 +08:00
2025-09-22 14:36:05 +08:00
# ------------ Windows 专用:列出所有 iproxy 命令行 ------------
def _get_all_iproxy_cmdlines(self) -> List[str]:
2025-09-16 15:31:55 +08:00
try:
2025-09-22 14:36:05 +08:00
raw = subprocess.check_output(
['wmic', 'process', 'where', "name='iproxy.exe'",
'get', 'CommandLine,ProcessId', '/value'],
stderr=subprocess.DEVNULL, text=True
)
except subprocess.CalledProcessError:
return []
lines: List[str] = []
for block in raw.split('\n\n'):
cmd = pid = ''
for line in block.splitlines():
line = line.strip()
if line.startswith('CommandLine='):
cmd = line[len('CommandLine='):].strip()
elif line.startswith('ProcessId='):
pid = line[len('ProcessId='):].strip()
if cmd and pid and '-u' in cmd:
lines.append(f'{cmd} {pid}')
return lines
# ------------ 杀孤儿 ------------
def _cleanup_orphan_iproxy(self):
live_udids = set(self._models.keys())
for ln in self._get_all_iproxy_cmdlines():
parts = ln.split()
try:
udid = parts[parts.index('-u') + 1]
pid = int(parts[-1])
if udid not in live_udids:
self._kill_pid_gracefully(pid)
LogManager.warning(f'扫到孤儿 iproxy已清理 {udid} PID={pid}')
except (ValueError, IndexError):
continue
2025-09-16 15:31:55 +08:00
2025-09-22 14:36:05 +08:00
# ------------ 按 PID 强杀 ------------
2025-09-16 15:31:55 +08:00
def _kill_pid_gracefully(self, pid: int):
try:
os.kill(pid, signal.SIGTERM)
time.sleep(1)
os.kill(pid, signal.SIGKILL)
except Exception:
pass