diff --git a/Module/DeviceInfo.py b/Module/DeviceInfo.py index 349ea48..9701058 100644 --- a/Module/DeviceInfo.py +++ b/Module/DeviceInfo.py @@ -11,6 +11,7 @@ import time import threading import subprocess import socket +from concurrent.futures import ThreadPoolExecutor from pathlib import Path from typing import Dict, Optional, List, Any import platform @@ -132,6 +133,11 @@ class DeviceInfo: self._manager = FlaskSubprocessManager.get_instance() self._iproxy_path = self._find_iproxy() + # 懒加载线程池属性(供 _add_device 并发使用) + self._add_lock: Optional[threading.RLock] = None + self._adding_udids: Optional[set[str]] = None + self._add_executor: Optional["ThreadPoolExecutor"] = None + # iproxy 连续失败计数(守护用) self._iproxy_fail_count: Dict[str, int] = {} @@ -147,15 +153,20 @@ class DeviceInfo: def _ensure_add_executor(self): """ 懒加载:首次调用 _add_device 时初始化线程池与去重集合。 - 不改 __init__,避免对现有初始化节奏有影响。 + 注意:不要只用 hasattr;属性可能已在 __init__ 里置为 None。 """ - if not hasattr(self, "_add_lock"): + # 1) 锁 + if getattr(self, "_add_lock", None) is None: self._add_lock = threading.RLock() - if not hasattr(self, "_adding_udids"): + + # 2) 去重集合 + if getattr(self, "_adding_udids", None) is None: self._adding_udids = set() - if not hasattr(self, "_add_executor") or self._add_executor is None: - import os + + # 3) 线程池 + if getattr(self, "_add_executor", None) is None: from concurrent.futures import ThreadPoolExecutor + import os max_workers = max(2, min(6, (os.cpu_count() or 4) // 2)) self._add_executor = ThreadPoolExecutor( max_workers=max_workers, @@ -180,28 +191,40 @@ class DeviceInfo: except Exception: pass finally: - with self._add_lock: + lock = getattr(self, "_add_lock", None) + if lock is None: + # 极端容错,避免再次抛异常 + self._add_lock = lock = threading.RLock() + with lock: self._adding_udids.discard(udid) def _add_device(self, udid: str): - """ - 并发包装器:保持所有调用点不变(listen 里仍然调用 _add_device)。 - - 懒加载线程池 - - 同一 udid 防重提交 - - 真实重逻辑放到 _add_device_impl(下方,已把你的原始实现迁过去) - """ + """并发包装器:保持所有调用点不变。""" self._ensure_add_executor() - with self._add_lock: - if udid in self._adding_udids: + + # 保险:即使极端情况下属性仍是 None,也就地补齐一次 + lock = getattr(self, "_add_lock", None) + if lock is None: + self._add_lock = lock = threading.RLock() + adding = getattr(self, "_adding_udids", None) + if adding is None: + self._adding_udids = adding = set() + + # 去重:同一 udid 只提交一次 + with lock: + if udid in adding: return - self._adding_udids.add(udid) + adding.add(udid) + try: + # 注意:submit(fn, udid) —— 这里不是 *args=udid,直接传第二个位置参数即可 self._add_executor.submit(self._safe_add_device, udid) except Exception as e: - with self._add_lock: - self._adding_udids.discard(udid) + # 提交失败要把去重标记清掉 + with lock: + adding.discard(udid) try: - LogManager.method_error(f"提交新增任务失败:{e}", "_add_device", udid=udid) + LogManager.method_error(text=f"提交新增任务失败:{e}", method="_add_device", udid=udid) except Exception: pass @@ -233,13 +256,13 @@ class DeviceInfo: return False def _iproxy_health_ok(self, udid: str, port: int) -> bool: - """综合健康判断:先 TCP,后 HTTP /status。两者任一失败即为不健康。""" - # 先看端口是否真在监听 + # 1) 监听检测:不通直接 False if not self._iproxy_tcp_probe(port, timeout=0.6): return False - # 再看链路到后端是否通(WDA 会回应 /status) + # 2) 业务探测:/status 慢可能是 WDA 卡顿;失败不等同于“端口坏” if not self._iproxy_http_status_ok_quick(port, timeout=1.2): - return False + print(f"[iproxy-health] /status 超时,视为轻微异常 {udid}:{port}") + return True return True def _restart_iproxy(self, udid: str, port: int) -> bool: @@ -276,16 +299,16 @@ class DeviceInfo: """ 周期性巡检(默认每 10s 一次): - 在线设备(type=1): - 1) 先做 TCP 探测(127.0.0.1:screenPort) - 2) 再做 HTTP /status 探测 - 3) 任一失败 → 尝试自愈重启 iproxy;若仍失败,累计失败计数 - 4) 连续失败次数 >= 3 才移除设备(避免短暂抖动) + 1) 先做 TCP+HTTP(/status) 探测(封装在 _iproxy_health_ok) + 2) 失败 → 自愈重启 iproxy;仍失败则累计失败计数 + 3) 连续失败次数 >= 阈值 → 【不删除设备】只标记降级(ready=False, streamBroken=True) + 4) 恢复时清零计数并标记恢复(ready=True, streamBroken=False) """ # 启动延迟,等新增流程跑起来,避免误判 time.sleep(20) - FAIL_THRESHOLD = 3 # 连续失败阈值 - INTERVAL_SEC = 10 # 巡检间隔 + FAIL_THRESHOLD = int(os.getenv("IPROXY_FAIL_THRESHOLD", "3")) # 连续失败阈值(可用环境变量调) + INTERVAL_SEC = int(os.getenv("IPROXY_CHECK_INTERVAL", "10")) # 巡检间隔 while True: snapshot = list(self._models.items()) # [(deviceId, DeviceModel), ...] @@ -303,9 +326,32 @@ class DeviceInfo: # 健康探测 ok = self._iproxy_health_ok(device_id, port) if ok: - # 健康则清零失败计数 + # 健康:清零计数 if self._iproxy_fail_count.get(device_id): self._iproxy_fail_count[device_id] = 0 + + # CHANGED: 若之前降级过,这里标记恢复并上报 + need_report = False + with self._lock: + m = self._models.get(device_id) + if m: + prev_ready = getattr(m, "ready", True) + prev_broken = getattr(m, "streamBroken", False) + if (not prev_ready) or prev_broken: + m.ready = True + if prev_broken: + try: + delattr(m, "streamBroken") + except Exception: + setattr(m, "streamBroken", False) + need_report = True + if need_report and m: + try: + print(f"[iproxy-check] 自愈成功,恢复就绪 deviceId={device_id} port={port}") + self._manager_send(m) + except Exception as e: + print(f"[iproxy-check] 上报恢复异常 deviceId={device_id}: {e}") + # print(f"[iproxy-check] OK deviceId={device_id} port={port}") continue @@ -318,6 +364,27 @@ class DeviceInfo: if ok2: print(f"[iproxy-check] 自愈成功 deviceId={device_id} port={port}") self._iproxy_fail_count[device_id] = 0 + + # CHANGED: 若之前降级过,这里也顺便恢复并上报 + need_report = False + with self._lock: + m = self._models.get(device_id) + if m: + prev_ready = getattr(m, "ready", True) + prev_broken = getattr(m, "streamBroken", False) + if (not prev_ready) or prev_broken: + m.ready = True + if prev_broken: + try: + delattr(m, "streamBroken") + except Exception: + setattr(m, "streamBroken", False) + need_report = True + if need_report and m: + try: + self._manager_send(m) + except Exception as e: + print(f"[iproxy-check] 上报恢复异常 deviceId={device_id}: {e}") continue # 自愈失败:累计失败计数 @@ -325,15 +392,20 @@ class DeviceInfo: self._iproxy_fail_count[device_id] = fails print(f"[iproxy-check] 自愈失败 ×{fails} deviceId={device_id} port={port}") - # 达阈值才移除(避免误杀) + # 达阈值 → 【不移除设备】,改为降级并上报(避免“删了又加”的抖动) if fails >= FAIL_THRESHOLD: - print(f"[iproxy-check] 连续失败 {fails} 次,移除设备 deviceId={device_id} port={port}") + with self._lock: + m = self._models.get(device_id) + if m: + m.ready = False + setattr(m, "streamBroken", True) try: - self._remove_device(device_id) + if m: + print( + f"[iproxy-check] 连续失败 {fails} 次,降级设备(保留在线) deviceId={device_id} port={port}") + self._manager_send(m) except Exception as e: - print(f"[iproxy-check] _remove_device 异常 deviceId={device_id}: {e}") - finally: - self._iproxy_fail_count.pop(device_id, None) + print(f"[iproxy-check] 上报降级异常 deviceId={device_id}: {e}") except Exception as e: print(f"[iproxy-check] 单设备检查异常: {e}") @@ -691,19 +763,17 @@ class DeviceInfo: print(f"[Proc] 结束进程异常: {e}") def _manager_send(self, model: DeviceModel): - """ - 轻量自愈:首次 send 失败 → start() 一次并重试一次;不抛异常。 - 这样 34566 刚起时不丢“上车”事件,前端更快看到设备。 - """ try: if self._manager.send(model.toDict()): print(f"[Manager] 已发送前端数据 {model.deviceId}") return except Exception as e: print(f"[Manager] 首次发送异常: {e}") - # 自愈:拉起一次并重试一次 + + # 自愈:拉起一次并重试一次(不要用 and 连接) try: - if self._manager.start() and self._manager.send(model.toDict()): + self._manager.start() # 不关心返回值 + if self._manager.send(model.toDict()): print(f"[Manager] 重试发送成功 {model.deviceId}") return except Exception as e: diff --git a/Module/FlaskService.py b/Module/FlaskService.py index c677d00..067946e 100644 --- a/Module/FlaskService.py +++ b/Module/FlaskService.py @@ -188,7 +188,7 @@ def start_socket_listener(): backoff = min(backoff * 2, 8.0) continue - s.listen() + s.listen(256) try: s.settimeout(1.5) # accept 超时,便于检查自愈循环 except Exception: diff --git a/Module/FlaskSubprocessManager.py b/Module/FlaskSubprocessManager.py index 528b4b1..997c812 100644 --- a/Module/FlaskSubprocessManager.py +++ b/Module/FlaskSubprocessManager.py @@ -34,8 +34,8 @@ class FlaskSubprocessManager: self._monitor_thread: Optional[threading.Thread] = None # 看门狗参数 - self._FAIL_THRESHOLD = int(os.getenv("FLASK_WD_FAIL_THRESHOLD", "3")) # 连续失败多少次重启 - self._COOLDOWN_SEC = float(os.getenv("FLASK_WD_COOLDOWN", "8.0")) # 两次重启间隔 + self._FAIL_THRESHOLD = int(os.getenv("FLASK_WD_FAIL_THRESHOLD", "5")) # 连续失败多少次重启 + self._COOLDOWN_SEC = float(os.getenv("FLASK_WD_COOLDOWN", "10")) # 两次重启间隔 self._MAX_RESTARTS = int(os.getenv("FLASK_WD_MAX_RESTARTS", "5")) # 10分钟最多几次重启 self._RESTART_WINDOW = 600 # 10分钟 self._restart_times: List[float] = [] @@ -176,6 +176,15 @@ class FlaskSubprocessManager: # ========= 停止 ========= def stop(self): + with self._lock: + if not self.process: return + try: + if self.process.stdout: + self.process.stdout.flush() + time.sleep(0.1) # 让读取线程跟上 + except Exception: + pass + with self._lock: if not self.process: return @@ -253,11 +262,16 @@ class FlaskSubprocessManager: # ========= 辅助 ========= def _port_alive(self) -> bool: - try: - with socket.create_connection(("127.0.0.1", self.comm_port), timeout=0.6): - return True - except Exception: - return False + def ping(p): + try: + with socket.create_connection(("127.0.0.1", p), timeout=0.6): + return True + except Exception: + return False + + p1 = self.comm_port + p2 = self.comm_port + 1 + return ping(p1) or ping(p2) def _wait_port_open(self, timeout: float) -> bool: start = time.time() diff --git a/Module/__pycache__/DeviceInfo.cpython-312.pyc b/Module/__pycache__/DeviceInfo.cpython-312.pyc index e15744f..3ada30a 100644 Binary files a/Module/__pycache__/DeviceInfo.cpython-312.pyc and b/Module/__pycache__/DeviceInfo.cpython-312.pyc differ diff --git a/Module/__pycache__/FlaskService.cpython-312.pyc b/Module/__pycache__/FlaskService.cpython-312.pyc index c90f3b2..da3025b 100644 Binary files a/Module/__pycache__/FlaskService.cpython-312.pyc and b/Module/__pycache__/FlaskService.cpython-312.pyc differ diff --git a/Module/__pycache__/FlaskSubprocessManager.cpython-312.pyc b/Module/__pycache__/FlaskSubprocessManager.cpython-312.pyc index 2604357..ead2a69 100644 Binary files a/Module/__pycache__/FlaskSubprocessManager.cpython-312.pyc and b/Module/__pycache__/FlaskSubprocessManager.cpython-312.pyc differ diff --git a/Utils/__pycache__/ControlUtils.cpython-312.pyc b/Utils/__pycache__/ControlUtils.cpython-312.pyc index 9720d96..b26fe18 100644 Binary files a/Utils/__pycache__/ControlUtils.cpython-312.pyc and b/Utils/__pycache__/ControlUtils.cpython-312.pyc differ diff --git a/Utils/__pycache__/LogManager.cpython-312.pyc b/Utils/__pycache__/LogManager.cpython-312.pyc index 6c6a5e8..f3c3b82 100644 Binary files a/Utils/__pycache__/LogManager.cpython-312.pyc and b/Utils/__pycache__/LogManager.cpython-312.pyc differ diff --git a/script/__pycache__/ScriptManager.cpython-312.pyc b/script/__pycache__/ScriptManager.cpython-312.pyc index c536b66..2e70013 100644 Binary files a/script/__pycache__/ScriptManager.cpython-312.pyc and b/script/__pycache__/ScriptManager.cpython-312.pyc differ