diff --git a/Module/DeviceInfo.py b/Module/DeviceInfo.py index d24773d..349ea48 100644 --- a/Module/DeviceInfo.py +++ b/Module/DeviceInfo.py @@ -3,6 +3,8 @@ 极简稳定版设备监督器(DeviceInfo):加详细 print 日志 - 每个关键节点都会 print,便于人工观察执行到哪一步 - 保留核心逻辑:监听上下线 / 启动 WDA / 起 iproxy / 通知前端 + - 并发提速:_add_device 异步化(受控并发) + - iproxy 守护:本地端口 + /status 探活,不通则自愈重启;连续失败达阈值才移除 """ import os import time @@ -129,72 +131,214 @@ class DeviceInfo: self._last_seen: Dict[str, float] = {} self._manager = FlaskSubprocessManager.get_instance() self._iproxy_path = self._find_iproxy() + + # iproxy 连续失败计数(守护用) + self._iproxy_fail_count: Dict[str, int] = {} + LogManager.info("DeviceInfo 初始化完成", udid="system") print("[Init] DeviceInfo 初始化完成") - threading.Thread(target=self.check_iproxy_ports).start() + # iproxy 守护线程(端口+HTTP探活 → 自愈重启 → 达阈值才移除) + threading.Thread(target=self.check_iproxy_ports, daemon=True).start() - # =============== 核心:端口连通性检测(HTTP 方式) ================= - def _is_local_port_open(self, port: int, udid: str, timeout: float = 5) -> bool: - """ - 使用 HTTP 方式检测:向 http://127.0.0.1:port/ 发送一次 HEAD 请求。 - 只要建立连接并收到合法的 HTTP 响应(任意 1xx~5xx 状态码),即认为 HTTP 可达。 - 遇到连接失败、超时、协议不对等异常,视为不可用。 - """ - if not isinstance(port, int) or port <= 0 or port > 65535: - LogManager.error("端口不可用(非法端口号)", udid=udid) - return False + self._initialized = True # 标记已初始化 + # =============== 并发添加设备:最小改动(包装 _add_device) =============== + def _ensure_add_executor(self): + """ + 懒加载:首次调用 _add_device 时初始化线程池与去重集合。 + 不改 __init__,避免对现有初始化节奏有影响。 + """ + if not hasattr(self, "_add_lock"): + self._add_lock = threading.RLock() + if not hasattr(self, "_adding_udids"): + self._adding_udids = set() + if not hasattr(self, "_add_executor") or self._add_executor is None: + import os + from concurrent.futures import ThreadPoolExecutor + max_workers = max(2, min(6, (os.cpu_count() or 4) // 2)) + self._add_executor = ThreadPoolExecutor( + max_workers=max_workers, + thread_name_prefix="dev-add" + ) + try: + LogManager.info(f"[Init] Device add executor started, max_workers={max_workers}", udid="system") + except Exception: + pass + + def _safe_add_device(self, udid: str): + """ + 后台执行真正的新增实现(_add_device_impl): + - 任何异常只记日志,不抛出 + - 无论成功与否,都在 finally 里清理“正在添加”标记 + """ try: - # HEAD 更轻;若后端对 HEAD 不友好,可改为 "GET", "/" - conn = http.client.HTTPConnection("127.0.0.1", int(port), timeout=timeout) - conn.request("HEAD", "/") - resp = conn.getresponse() - status = resp.status - # 读到响应即可关闭 - conn.close() - # 任何合法 HTTP 状态码都说明“HTTP 服务在监听且可交互”,包括 404/401/403/5xx - if 100 <= status <= 599: - return True - else: - LogManager.error(f"HTTP状态码异常: {status}", udid=udid) - return False - + self._add_device_impl(udid) # ← 这是你原来的重逻辑(见下方) except Exception as e: - # 连接被拒绝、超时、不是HTTP协议正确响应(比如返回了非HTTP的字节流)都会到这里 - LogManager.error(f"HTTP检测失败:{e}", udid=udid) + try: + LogManager.method_error(f"_add_device_impl 异常:{e}", "_safe_add_device", udid=udid) + except Exception: + pass + finally: + with self._add_lock: + self._adding_udids.discard(udid) + + def _add_device(self, udid: str): + """ + 并发包装器:保持所有调用点不变(listen 里仍然调用 _add_device)。 + - 懒加载线程池 + - 同一 udid 防重提交 + - 真实重逻辑放到 _add_device_impl(下方,已把你的原始实现迁过去) + """ + self._ensure_add_executor() + with self._add_lock: + if udid in self._adding_udids: + return + self._adding_udids.add(udid) + try: + self._add_executor.submit(self._safe_add_device, udid) + except Exception as e: + with self._add_lock: + self._adding_udids.discard(udid) + try: + LogManager.method_error(f"提交新增任务失败:{e}", "_add_device", udid=udid) + except Exception: + pass + + # =============== iproxy 健康检查 / 自愈 =============== + def _iproxy_tcp_probe(self, port: int, timeout: float = 0.6) -> bool: + """快速 TCP 探测:能建立连接即认为本地监听正常。""" + try: + with socket.create_connection(("127.0.0.1", int(port)), timeout=timeout): + return True + except Exception: return False - # =============== 一轮检查:发现不通就移除 ================= + def _iproxy_http_status_ok_quick(self, port: int, timeout: float = 1.2) -> bool: + """ + 轻量 HTTP 探测:GET /status + - 成功返回 2xx/3xx 视为 OK + - 4xx/5xx 也说明链路畅通(服务可交互),这里统一认为 OK(避免误判) + """ + try: + conn = http.client.HTTPConnection("127.0.0.1", int(port), timeout=timeout) + conn.request("GET", "/status") + resp = conn.getresponse() + _ = resp.read(128) + code = getattr(resp, "status", 0) + conn.close() + # 任何能返回 HTTP 的,都说明“有服务可交互” + return 100 <= code <= 599 + except Exception: + return False + + def _iproxy_health_ok(self, udid: str, port: int) -> bool: + """综合健康判断:先 TCP,后 HTTP /status。两者任一失败即为不健康。""" + # 先看端口是否真在监听 + if not self._iproxy_tcp_probe(port, timeout=0.6): + return False + # 再看链路到后端是否通(WDA 会回应 /status) + if not self._iproxy_http_status_ok_quick(port, timeout=1.2): + return False + return True + + def _restart_iproxy(self, udid: str, port: int) -> bool: + """干净重启 iproxy:先杀旧的,再启动新的,并等待监听。""" + print(f"[iproxy-guard] 准备重启 iproxy {udid} on {port}") + proc = None + with self._lock: + old = self._iproxy.get(udid) + try: + if old: + self._kill(old) + except Exception as e: + print(f"[iproxy-guard] 杀旧进程异常 {udid}: {e}") + + # 重新拉起 + try: + proc = self._start_iproxy(udid, local_port=port) + except Exception as e: + print(f"[iproxy-guard] 重启失败 {udid}: {e}") + proc = None + + if not proc: + return False + + # 写回进程表 + with self._lock: + self._iproxy[udid] = proc + + print(f"[iproxy-guard] 重启成功 {udid} port={port}") + return True + + # =============== 一轮检查:先自愈,仍失败才考虑移除 ================= def check_iproxy_ports(self, connect_timeout: float = 3) -> None: + """ + 周期性巡检(默认每 10s 一次): + - 在线设备(type=1): + 1) 先做 TCP 探测(127.0.0.1:screenPort) + 2) 再做 HTTP /status 探测 + 3) 任一失败 → 尝试自愈重启 iproxy;若仍失败,累计失败计数 + 4) 连续失败次数 >= 3 才移除设备(避免短暂抖动) + """ + # 启动延迟,等新增流程跑起来,避免误判 time.sleep(20) + + FAIL_THRESHOLD = 3 # 连续失败阈值 + INTERVAL_SEC = 10 # 巡检间隔 + while True: snapshot = list(self._models.items()) # [(deviceId, DeviceModel), ...] for device_id, model in snapshot: try: - # 只处理在线且端口合法的设备 if model.type != 1: + # 离线设备清零计数 + self._iproxy_fail_count.pop(device_id, None) continue + port = int(model.screenPort) if port <= 0 or port > 65535: continue - ok = self._is_local_port_open(port, timeout=connect_timeout, udid=device_id) - if not ok: - print(f"[iproxy-check] 端口不可连,移除设备 deviceId={device_id} port={port}") + # 健康探测 + ok = self._iproxy_health_ok(device_id, port) + if ok: + # 健康则清零失败计数 + if self._iproxy_fail_count.get(device_id): + self._iproxy_fail_count[device_id] = 0 + # print(f"[iproxy-check] OK deviceId={device_id} port={port}") + continue + + # 第一次失败:尝试自愈重启 + print(f"[iproxy-check] 探活失败,准备自愈重启 deviceId={device_id} port={port}") + healed = self._restart_iproxy(device_id, port) + + # 重启后再探测一次 + ok2 = self._iproxy_health_ok(device_id, port) if healed else False + if ok2: + print(f"[iproxy-check] 自愈成功 deviceId={device_id} port={port}") + self._iproxy_fail_count[device_id] = 0 + continue + + # 自愈失败:累计失败计数 + fails = self._iproxy_fail_count.get(device_id, 0) + 1 + self._iproxy_fail_count[device_id] = fails + print(f"[iproxy-check] 自愈失败 ×{fails} deviceId={device_id} port={port}") + + # 达阈值才移除(避免误杀) + if fails >= FAIL_THRESHOLD: + print(f"[iproxy-check] 连续失败 {fails} 次,移除设备 deviceId={device_id} port={port}") try: - self._remove_device(device_id) # 这里面可安全地改 self._models + self._remove_device(device_id) except Exception as e: print(f"[iproxy-check] _remove_device 异常 deviceId={device_id}: {e}") - else: - # 心跳日志按需开启,避免刷屏 - # print(f"[iproxy-check] OK deviceId={device_id} port={port}") - pass + finally: + self._iproxy_fail_count.pop(device_id, None) except Exception as e: print(f"[iproxy-check] 单设备检查异常: {e}") - # 8秒间隔 - time.sleep(10) + + time.sleep(INTERVAL_SEC) def listen(self): LogManager.method_info("进入主循环", "listen", udid="system") @@ -221,7 +365,7 @@ class DeviceInfo: if (now - self._first_seen.get(udid, now)) >= self.ADD_STABLE_SEC: print(f"[Add] 检测到新设备: {udid}") try: - self._add_device(udid) + self._add_device(udid) # ← 并发包装器 except Exception as e: LogManager.method_error(f"新增失败:{e}", "listen", udid=udid) print(f"[Add] 新增失败 {udid}: {e}") @@ -265,65 +409,83 @@ class DeviceInfo: print(f"[WDA] /status@{local_port} 等待超时 {udid}") return False - - def _add_device(self, udid: str): + # ---------------- 原 _add_device 实现:整体改名为 _add_device_impl ---------------- + def _add_device_impl(self, udid: str): print(f"[Add] 开始新增设备 {udid}") if not self._trusted(udid): print(f"[Add] 未信任设备 {udid}, 跳过") return - try: - dev = tidevice.Device(udid) - major = int(dev.product_version.split(".")[0]) - except Exception: - major = 0 - - if not self._wda_http_status_ok_once(udid): - if major > 17: - print("进入iOS17设备的分支") - out = IOSActivator().activate(udid) - print("wda启动完成") - else: - print(f"[WDA] iOS<=17 启动 WDA app_start (port={wdaScreenPort})") - dev = tidevice.Device(udid) - dev.app_start(WdaAppBundleId) - time.sleep(2) - if not self._wait_wda_ready_http(udid, self.WDA_READY_TIMEOUT): - print(f"[WDA] WDA 未在超时内就绪, 放弃新增 {udid}") - return - - print(f"[WDA] WDA 就绪,准备获取屏幕信息 {udid}") - # 给 WDA 一点稳定时间,避免刚 ready 就查询卡住 - time.sleep(0.5) - # 带超时的屏幕信息获取,避免卡死在 USBClient 调用里 - w, h, s = self._screen_info_with_timeout(udid, timeout=3.5) - if not (w and h and s): - # 再做几次快速重试(带超时) - for i in range(4): - print(f"[Screen] 第{i + 1}次获取失败, 重试中... {udid}") - time.sleep(0.6) - w, h, s = self._screen_info_with_timeout(udid, timeout=3.5) - if w and h and s: - break - - if not (w and h and s): - print(f"[Screen] 屏幕信息仍为空,继续添加 {udid}") - + # 先分配一个“正式使用”的本地端口,并立即起 iproxy(只起这一回) port = self._alloc_port() - print(f"[iproxy] 准备启动 iproxy 映射 {port}->{wdaScreenPort}") + print(f"[iproxy] 准备启动 iproxy 映射 {port}->{wdaScreenPort} (正式)") proc = self._start_iproxy(udid, local_port=port) if not proc: self._release_port(port) print(f"[iproxy] 启动失败,放弃新增 {udid}") return + # 判断 WDA 是否已就绪;如果未就绪,按原逻辑拉起 WDA 并等到就绪 + try: + dev = tidevice.Device(udid) + major = int(dev.product_version.split(".")[0]) + except Exception: + major = 0 + + # 直接用“正式端口”探测 /status,避免再启一次临时 iproxy + if not self._wait_wda_ready_on_port(udid, local_port=port, total_timeout_sec=3.0): + # 如果还没起来,按你原逻辑拉起 WDA 再等 + if major > 17: + print("进入iOS17设备的分支") + try: + IOSActivator().activate(udid) + print("wda启动完成") + except Exception as e: + print(f"[WDA] iOS17 激活异常: {e}") + else: + print(f"[WDA] iOS<=17 启动 WDA app_start (port={wdaScreenPort})") + try: + dev = tidevice.Device(udid) + dev.app_start(WdaAppBundleId) + time.sleep(2) + except Exception as e: + print(f"[WDA] app_start 异常: {e}") + + if not self._wait_wda_ready_on_port(udid, local_port=port, total_timeout_sec=self.WDA_READY_TIMEOUT): + print(f"[WDA] WDA 未在超时内就绪, 放弃新增 {udid}") + # 清理已起的正式 iproxy + try: + self._kill(proc) + except Exception: + pass + self._release_port(port) + return + + print(f"[WDA] WDA 就绪,准备获取屏幕信息 {udid}") + time.sleep(0.5) + + # 带超时的屏幕信息获取(保留你原有容错/重试) + w, h, s = self._screen_info_with_timeout(udid, timeout=3.5) + if not (w and h and s): + for i in range(4): + print(f"[Screen] 第{i + 1}次获取失败, 重试中... {udid}") + time.sleep(0.6) + w, h, s = self._screen_info_with_timeout(udid, timeout=3.5) + if w and h and s: + break + if not (w and h and s): + print(f"[Screen] 屏幕信息仍为空,继续添加 {udid}") + + # 写入模型 & 发送前端 with self._lock: model = DeviceModel(deviceId=udid, screenPort=port, width=w, height=h, scale=s, type=1) model.ready = True self._models[udid] = model self._iproxy[udid] = proc self._port_by_udid[udid] = port + if hasattr(self, "_iproxy_fail_count"): + self._iproxy_fail_count[udid] = 0 print(f"[Manager] 准备发送设备数据到前端 {udid}") self._manager_send(model) @@ -343,6 +505,7 @@ class DeviceInfo: self._port_by_udid.pop(udid, None) self._first_seen.pop(udid, None) self._last_seen.pop(udid, None) + self._iproxy_fail_count.pop(udid, None) # --- 2. 锁外执行重操作 --- # 杀进程 @@ -528,11 +691,23 @@ class DeviceInfo: print(f"[Proc] 结束进程异常: {e}") def _manager_send(self, model: DeviceModel): + """ + 轻量自愈:首次 send 失败 → start() 一次并重试一次;不抛异常。 + 这样 34566 刚起时不丢“上车”事件,前端更快看到设备。 + """ try: - self._manager.send(model.toDict()) - print(f"[Manager] 已发送前端数据 {model.deviceId}") + if self._manager.send(model.toDict()): + print(f"[Manager] 已发送前端数据 {model.deviceId}") + return except Exception as e: - print(f"[Manager] 发送异常: {e}") + print(f"[Manager] 首次发送异常: {e}") + # 自愈:拉起一次并重试一次 + try: + if self._manager.start() and self._manager.send(model.toDict()): + print(f"[Manager] 重试发送成功 {model.deviceId}") + return + except Exception as e: + print(f"[Manager] 重试发送异常: {e}") def _find_iproxy(self) -> str: env_path = os.getenv("IPROXY_PATH") @@ -545,4 +720,4 @@ class DeviceInfo: if path.is_file(): print(f"[iproxy] 使用默认路径 {path}") return str(path) - raise FileNotFoundError(f"iproxy 不存在: {path}") + raise FileNotFoundError(f"iproxy 不存在: {path}") \ No newline at end of file diff --git a/Module/FlaskService.py b/Module/FlaskService.py index b0a1427..c677d00 100644 --- a/Module/FlaskService.py +++ b/Module/FlaskService.py @@ -103,39 +103,59 @@ def _apply_device_event(obj: Dict[str, Any]): # ============ 设备事件 socket 监听 ============ def _handle_conn(conn: socket.socket, addr): - """统一的连接处理函数(外部全局,避免内嵌函数覆盖)""" + """统一的连接处理函数(拆 JSON 行 → 正常化 type → 应用到 listData)""" try: with conn: + try: + conn.settimeout(3.0) # 避免永久阻塞 + except Exception: + pass + buffer = "" while True: - data = conn.recv(1024) - if not data: # 对端关闭 - break - buffer += data.decode('utf-8', errors='ignore') - # 按行切 JSON;发送端每条以 '\n' 结尾 - while True: - line, sep, buffer = buffer.partition('\n') - if not sep: + try: + data = conn.recv(1024) + if not data: # 对端关闭 break - line = line.strip() - if not line: - continue - try: - obj = json.loads(line) - except json.JSONDecodeError as e: - LogManager.warning(f"[SOCKET][WARN] 非法 JSON 丢弃: {line[:120]} err={e}") - continue - dev_id = obj.get("deviceId") - typ = _normalize_type(obj.get("type", 1)) - obj["type"] = typ - LogManager.info(f"[SOCKET][RECV] deviceId={dev_id} type={typ} keys={list(obj.keys())}") - _apply_device_event(obj) - LogManager.info(f"[SOCKET][APPLY] deviceId={dev_id} type={typ}") + buffer += data.decode('utf-8', errors='ignore') + + # 按行切 JSON;发送端每条以 '\n' 结尾 + while True: + line, sep, buffer = buffer.partition('\n') + if not sep: + break + line = line.strip() + if not line: + continue + try: + obj = json.loads(line) + except json.JSONDecodeError as e: + LogManager.warning(f"[SOCKET][WARN] 非法 JSON 丢弃: {line[:120]} err={e}") + continue + + dev_id = obj.get("deviceId") + typ = _normalize_type(obj.get("type", 1)) + obj["type"] = typ # 规范 1/0 + LogManager.info(f"[SOCKET][RECV] deviceId={dev_id} type={typ} keys={list(obj.keys())}") + + try: + _apply_device_event(obj) # ← 保持你的原设备增删逻辑 + LogManager.info(f"[SOCKET][APPLY] deviceId={dev_id} type={typ}") + except Exception as e: + # 单条业务异常不让线程死 + LogManager.error(f"[DEVICE][APPLY_EVT][ERROR] {e}") + + except (socket.timeout, ConnectionResetError, BrokenPipeError): + # 连接级异常:关闭该连接,回到 accept + break + except Exception as e: + LogManager.warning(f"[SOCKET][WARN] recv error: {e}") + break except Exception as e: LogManager.error(f"[SOCKET][ERROR] 连接处理异常: {e}") def start_socket_listener(): - """启动设备事件监听(与 HTTP 端口无关,走 FLASK_COMM_PORT)""" + """启动设备事件监听(仅走 FLASK_COMM_PORT;增强健壮性,不改业务)""" # 统一使用 FLASK_COMM_PORT,默认 34566 port = int(os.getenv('FLASK_COMM_PORT', 34566)) LogManager.info(f"Received port from environment: {port}") @@ -146,29 +166,64 @@ def start_socket_listener(): print("未获取到通信端口,跳过Socket监听") return - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - - try: - s.bind(('127.0.0.1', port)) - print(f"[INFO] Socket successfully bound to port {port}") - LogManager.info(f"[INFO] Socket successfully bound to port {port}") - except Exception as bind_error: - print(f"[ERROR]端口绑定失败: {bind_error}") - LogManager.info(f"[ERROR]端口绑定失败: {bind_error}") - return - - s.listen() - LogManager.info(f"[INFO] Socket listener started on port {port}, waiting for connections...") - print(f"[INFO] Socket listener started on port {port}, waiting for connections...") - + backoff = 0.5 # 自愈退避,起于 0.5s,上限 8s while True: + s = None try: - conn, addr = s.accept() + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + try: + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + except Exception as e: + LogManager.warning(f"[SOCKET][WARN] setsockopt SO_REUSEADDR failed: {e}") + + try: + s.bind(('127.0.0.1', port)) + print(f"[INFO] Socket successfully bound to port {port}") + LogManager.info(f"[INFO] Socket successfully bound to port {port}") + except Exception as bind_error: + print(f"[ERROR]端口绑定失败: {bind_error}") + LogManager.info(f"[ERROR]端口绑定失败: {bind_error}") + # 绑定失败通常是端口未释放/竞争,退避后重试 + time.sleep(backoff) + backoff = min(backoff * 2, 8.0) + continue + + s.listen() + try: + s.settimeout(1.5) # accept 超时,便于检查自愈循环 + except Exception: + pass + + LogManager.info(f"[INFO] Socket listener started on port {port}, waiting for connections...") + print(f"[INFO] Socket listener started on port {port}, waiting for connections...") + # 监听成功 → 退避复位 + backoff = 0.5 + + while True: + try: + conn, addr = s.accept() + except socket.timeout: + # 定期“透气”,避免永久卡死;继续等待 + continue + except Exception as e: + # 发生 accept 级错误:重建 socket(进入外层 while 自愈) + LogManager.error(f"[ERROR] accept 失败: {e}") + break + + # 每个连接独立线程处理,保持你原来的做法 + threading.Thread(target=_handle_conn, args=(conn, addr), daemon=True).start() + except Exception as e: - LogManager.error(f"[ERROR] accept 失败: {e}") - continue - threading.Thread(target=_handle_conn, args=(conn, addr), daemon=True).start() + # 任意未兜住的异常,记录并进入退避自愈 + LogManager.error(f"[SOCKET][ERROR] 监听主循环异常: {e}") + time.sleep(backoff) + backoff = min(backoff * 2, 8.0) + finally: + try: + if s: + s.close() + except Exception: + pass # 独立线程启动 Socket 服务 + 看门狗 listener_thread = threading.Thread(target=start_socket_listener, daemon=True) diff --git a/Module/FlaskSubprocessManager.py b/Module/FlaskSubprocessManager.py index 94b83be..528b4b1 100644 --- a/Module/FlaskSubprocessManager.py +++ b/Module/FlaskSubprocessManager.py @@ -1,3 +1,4 @@ +# -*- coding: utf-8 -*- import subprocess import sys import threading @@ -14,8 +15,10 @@ from Utils.LogManager import LogManager class FlaskSubprocessManager: + """Flask 子进程守护 + 看门狗 + 稳定增强""" + _instance: Optional['FlaskSubprocessManager'] = None - _lock: threading.Lock = threading.Lock() + _lock = threading.Lock() def __new__(cls): with cls._lock: @@ -29,48 +32,75 @@ class FlaskSubprocessManager: self.comm_port = 34566 self._stop_event = threading.Event() self._monitor_thread: Optional[threading.Thread] = None - # 新增:启动前先把可能残留的 Flask 干掉 + + # 看门狗参数 + self._FAIL_THRESHOLD = int(os.getenv("FLASK_WD_FAIL_THRESHOLD", "3")) # 连续失败多少次重启 + self._COOLDOWN_SEC = float(os.getenv("FLASK_WD_COOLDOWN", "8.0")) # 两次重启间隔 + self._MAX_RESTARTS = int(os.getenv("FLASK_WD_MAX_RESTARTS", "5")) # 10分钟最多几次重启 + self._RESTART_WINDOW = 600 # 10分钟 + self._restart_times: List[float] = [] + self._fail_count = 0 + self._last_restart_time = 0.0 + + # Windows 隐藏子窗口启动参数 + self._si = None + if os.name == "nt": + si = subprocess.STARTUPINFO() + si.dwFlags |= subprocess.STARTF_USESHOWWINDOW + si.wShowWindow = 0 + self._si = si + self._kill_orphan_flask() atexit.register(self.stop) - LogManager.info("FlaskSubprocessManager 单例已初始化", udid="system") + self._log("info", "FlaskSubprocessManager 初始化完成") + # ========= 日志工具 ========= + def _log(self, level: str, msg: str, udid="system"): + """同时写 LogManager + 控制台""" + try: + if level == "info": + LogManager.info(msg, udid=udid) + elif level in ("warn", "warning"): + LogManager.warning(msg, udid=udid) + elif level == "error": + LogManager.error(msg, udid=udid) + else: + LogManager.info(msg, udid=udid) + except Exception: + pass + print(msg) + + # ========= 杀残留 Flask ========= def _kill_orphan_flask(self): - """根据端口 34566 把遗留进程全部杀掉""" try: if os.name == "nt": - # Windows - out = subprocess.check_output( - ["netstat", "-ano"], - text=True, startupinfo=self._si - ) + out = subprocess.check_output(["netstat", "-ano"], text=True, startupinfo=self._si) for line in out.splitlines(): if f"127.0.0.1:{self.comm_port}" in line and "LISTENING" in line: pid = int(line.strip().split()[-1]) if pid != os.getpid(): subprocess.run(["taskkill", "/F", "/PID", str(pid)], - startupinfo=self._si, - capture_output=True) + startupinfo=self._si, capture_output=True) + self._log("warn", f"[FlaskMgr] 杀死残留进程 PID={pid}") else: - # macOS / Linux - out = subprocess.check_output( - ["lsof", "-t", f"-iTCP:{self.comm_port}", "-sTCP:LISTEN"], - text=True - ) + out = subprocess.check_output(["lsof", "-t", f"-iTCP:{self.comm_port}", "-sTCP:LISTEN"], text=True) for pid in map(int, out.split()): if pid != os.getpid(): os.kill(pid, 9) + self._log("warn", f"[FlaskMgr] 杀死残留进程 PID={pid}") except Exception: pass - # ---------- 启动 ---------- + # ========= 启动 ========= def start(self): with self._lock: if self._is_alive(): - LogManager.warning("子进程已在运行,无需重复启动", udid="system") + self._log("warn", "[FlaskMgr] 子进程已在运行,无需重复启动") return env = os.environ.copy() env["FLASK_COMM_PORT"] = str(self.comm_port) + exe_path = Path(sys.executable).resolve() if exe_path.name.lower() in ("python.exe", "pythonw.exe"): exe_path = Path(sys.argv[0]).resolve() @@ -80,13 +110,20 @@ class FlaskSubprocessManager: cmd = [str(exe_path), "--role=flask"] cwd = str(exe_path.parent) else: - cmd = [sys.executable, "-u", "-m", "Module.Main", "--role=flask"] - cwd = str(Path(__file__).resolve().parent) + project_root = Path(__file__).resolve().parents[1] + candidates = [ + project_root / "Module" / "Main.py", + project_root / "Main.py", + ] + main_path = next((p for p in candidates if p.is_file()), None) + if main_path: + cmd = [sys.executable, "-u", str(main_path), "--role=flask"] + else: + cmd = [sys.executable, "-u", "-m", "Module.Main", "--role=flask"] + cwd = str(project_root) - LogManager.info(f"准备启动 Flask 子进程: {cmd} cwd={cwd}", udid="system") + self._log("info", f"[FlaskMgr] 启动命令: {cmd}, cwd={cwd}") - # 关键:不再自己 open 文件,直接走 LogManager - # 用 PIPE 捕获,再转存到 system 级日志 self.process = subprocess.Popen( cmd, stdin=subprocess.DEVNULL, @@ -98,112 +135,140 @@ class FlaskSubprocessManager: bufsize=1, env=env, cwd=cwd, - start_new_session=True + start_new_session=True, + startupinfo=self._si ) - # 守护线程:把子进程 stdout → LogManager.info/system threading.Thread(target=self._flush_stdout, daemon=True).start() - LogManager.info(f"Flask 子进程已启动,PID={self.process.pid},端口={self.comm_port}", udid="system") + self._log("info", f"[FlaskMgr] Flask 子进程已启动,PID={self.process.pid}") if not self._wait_port_open(timeout=10): - LogManager.error("等待端口监听超时,启动失败", udid="system") + self._log("error", "[FlaskMgr] 启动失败,端口未监听") self.stop() - raise RuntimeError("Flask 启动后 10 s 内未监听端口") + raise RuntimeError("Flask 启动后 10s 内未监听端口") - self._monitor_thread = threading.Thread(target=self._monitor, daemon=True) - self._monitor_thread.start() - LogManager.info("端口守护线程已启动", udid="system") + if not self._monitor_thread or not self._monitor_thread.is_alive(): + self._monitor_thread = threading.Thread(target=self._monitor, daemon=True) + self._monitor_thread.start() + self._log("info", "[FlaskWD] 守护线程已启动") - # ---------- 实时把子进程 stdout 刷到 system 日志 ---------- + # ========= stdout捕获 ========= def _flush_stdout(self): + if not self.process or not self.process.stdout: + return for line in iter(self.process.stdout.readline, ""): if line: - LogManager.info(line.rstrip(), udid="system") - # 同时输出到控制台 - print(line.rstrip()) # 打印到主进程的控制台 + self._log("info", line.rstrip()) self.process.stdout.close() - # ---------- 发送 ---------- + # ========= 发送 ========= def send(self, data: Union[str, Dict, List]) -> bool: if isinstance(data, (dict, list)): data = json.dumps(data, ensure_ascii=False) try: with socket.create_connection(("127.0.0.1", self.comm_port), timeout=3.0) as s: s.sendall((data + "\n").encode("utf-8")) - LogManager.info(f"数据已成功发送到 Flask 端口:{self.comm_port}", udid="system") - return True + self._log("info", f"[FlaskMgr] 数据已发送到端口 {self.comm_port}") + return True except Exception as e: - LogManager.error(f"发送失败:{e}", udid="system") + self._log("error", f"[FlaskMgr] 发送失败: {e}") return False - # ---------- 停止 ---------- + # ========= 停止 ========= def stop(self): with self._lock: if not self.process: return pid = self.process.pid - LogManager.info(f"正在停止 Flask 子进程 PID={pid}", udid="system") + self._log("info", f"[FlaskMgr] 正在停止子进程 PID={pid}") try: - # 1. 杀整棵树(Windows 也适用) parent = psutil.Process(pid) for child in parent.children(recursive=True): - child.kill() + try: + child.kill() + except Exception: + pass parent.kill() - gone, alive = psutil.wait_procs([parent] + parent.children(), timeout=3) - for p in alive: - p.kill() # 保险再补一刀 - self.process.wait() + parent.wait(timeout=3) except psutil.NoSuchProcess: pass except Exception as e: - LogManager.error(f"停止子进程异常:{e}", udid="system") + self._log("error", f"[FlaskMgr] 停止子进程异常: {e}") finally: self.process = None self._stop_event.set() - # ---------- 端口守护 ---------- + # ========= 看门狗 ========= def _monitor(self): - LogManager.info("守护线程开始运行,周期性检查端口存活", udid="system") - while not self._stop_event.wait(1.0): - if not self._port_alive(): - LogManager.error("检测到端口不通,准备重启 Flask", udid="system") + self._log("info", "[FlaskWD] 看门狗线程启动") + verbose = os.getenv("FLASK_WD_VERBOSE", "0") == "1" + last_ok = 0.0 + + while not self._stop_event.wait(2.0): + alive = self._port_alive() + if alive: + self._fail_count = 0 + if verbose and (time.time() - last_ok) >= 60: + self._log("info", f"[FlaskWD] OK {self.comm_port} alive") + last_ok = time.time() + continue + + self._fail_count += 1 + self._log("warn", f"[FlaskWD] 探测失败 {self._fail_count}/{self._FAIL_THRESHOLD}") + + if self._fail_count >= self._FAIL_THRESHOLD: + now = time.time() + if now - self._last_restart_time < self._COOLDOWN_SEC: + self._log("warn", "[FlaskWD] 冷却中,跳过重启") + continue + + # 限速:10分钟内超过MAX_RESTARTS则不再重启 + self._restart_times = [t for t in self._restart_times if now - t < self._RESTART_WINDOW] + if len(self._restart_times) >= self._MAX_RESTARTS: + self._log("error", f"[FlaskWD] 10分钟内重启次数过多({len(self._restart_times)}次),暂停看门狗") + break + + self._restart_times.append(now) + self._log("warn", "[FlaskWD] 端口不通,准备重启 Flask") + with self._lock: - if self.process and self.process.poll() is None: + try: self.stop() - try: - self.start() - from Module.DeviceInfo import DeviceInfo - # 重新发送设备相关数据到flask - info = DeviceInfo() - for model in info._models.keys(): - self.send(model) - except Exception as e: - LogManager.error(f"自动重启失败:{e}", udid="system") - time.sleep(2) - - # ---------- 辅助 ---------- - def _is_port_busy(self, port: int) -> bool: - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.settimeout(0.2) - return s.connect_ex(("127.0.0.1", port)) == 0 + time.sleep(1) + self.start() + self._fail_count = 0 + self._last_restart_time = now + self._log("info", "[FlaskWD] Flask 已成功重启") + from Module.DeviceInfo import DeviceInfo + info = DeviceInfo() + with info._lock: + for m in info._models.values(): + try: + self.send(m.toDict()) + except Exception: + pass + except Exception as e: + self._log("error", f"[FlaskWD] 自动重启失败: {e}") + time.sleep(3) + # ========= 辅助 ========= def _port_alive(self) -> bool: try: - with socket.create_connection(("127.0.0.1", self.comm_port), timeout=0.5): + with socket.create_connection(("127.0.0.1", self.comm_port), timeout=0.6): return True except Exception: return False def _wait_port_open(self, timeout: float) -> bool: - t0 = time.time() - while time.time() - t0 < timeout: + start = time.time() + while time.time() - start < timeout: if self._port_alive(): return True time.sleep(0.2) return False def _is_alive(self) -> bool: - return self.process is not None and self.process.poll() is None and self._port_alive() + return self.process and self.process.poll() is None and self._port_alive() @classmethod def get_instance(cls) -> 'FlaskSubprocessManager': diff --git a/Module/__pycache__/DeviceInfo.cpython-312.pyc b/Module/__pycache__/DeviceInfo.cpython-312.pyc index 0aeb4c2..e15744f 100644 Binary files a/Module/__pycache__/DeviceInfo.cpython-312.pyc and b/Module/__pycache__/DeviceInfo.cpython-312.pyc differ diff --git a/Module/__pycache__/FlaskService.cpython-312.pyc b/Module/__pycache__/FlaskService.cpython-312.pyc index 8be2a43..c90f3b2 100644 Binary files a/Module/__pycache__/FlaskService.cpython-312.pyc and b/Module/__pycache__/FlaskService.cpython-312.pyc differ diff --git a/Module/__pycache__/FlaskSubprocessManager.cpython-312.pyc b/Module/__pycache__/FlaskSubprocessManager.cpython-312.pyc index 5eb7486..2604357 100644 Binary files a/Module/__pycache__/FlaskSubprocessManager.cpython-312.pyc and b/Module/__pycache__/FlaskSubprocessManager.cpython-312.pyc differ diff --git a/Module/__pycache__/Main.cpython-312.pyc b/Module/__pycache__/Main.cpython-312.pyc index bdb6698..a238f0f 100644 Binary files a/Module/__pycache__/Main.cpython-312.pyc and b/Module/__pycache__/Main.cpython-312.pyc differ diff --git a/Utils/__pycache__/LogManager.cpython-312.pyc b/Utils/__pycache__/LogManager.cpython-312.pyc index 7f2af26..6c6a5e8 100644 Binary files a/Utils/__pycache__/LogManager.cpython-312.pyc and b/Utils/__pycache__/LogManager.cpython-312.pyc differ diff --git a/Utils/__pycache__/ThreadManager.cpython-312.pyc b/Utils/__pycache__/ThreadManager.cpython-312.pyc index d2834ff..4604693 100644 Binary files a/Utils/__pycache__/ThreadManager.cpython-312.pyc and b/Utils/__pycache__/ThreadManager.cpython-312.pyc differ diff --git a/script/__pycache__/ScriptManager.cpython-312.pyc b/script/__pycache__/ScriptManager.cpython-312.pyc index 6ba832b..c536b66 100644 Binary files a/script/__pycache__/ScriptManager.cpython-312.pyc and b/script/__pycache__/ScriptManager.cpython-312.pyc differ