Files
iOSAI/Module/FlaskSubprocessManager.py

270 lines
9.1 KiB
Python
Raw Permalink Normal View History

2025-11-05 17:07:51 +08:00
# -*- coding: utf-8 -*-
2025-08-01 13:43:51 +08:00
import atexit
import json
import os
import socket
2025-11-19 17:23:41 +08:00
import subprocess
import sys
import threading
2025-08-01 13:43:51 +08:00
import time
2025-08-15 20:04:59 +08:00
from pathlib import Path
2025-08-01 13:43:51 +08:00
from typing import Optional, Union, Dict, List
2025-11-25 18:13:02 +08:00
2025-08-20 13:48:32 +08:00
from Utils.LogManager import LogManager
2025-08-01 13:43:51 +08:00
class FlaskSubprocessManager:
2025-11-25 18:13:02 +08:00
"""
超稳定版 Flask 子进程守护
- 单线程 watchdog唯一监控点
- 强制端口检测
- 端口不通 / 子进程退出 100% 重启
- 完整支持 exe + Python 模式
- 自动恢复设备列表快照
"""
_instance = None
2025-11-06 21:50:28 +08:00
_lock = threading.RLock()
2025-08-01 13:43:51 +08:00
def __new__(cls):
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
2025-11-25 18:13:02 +08:00
cls._instance._initialize()
return cls._instance
2025-08-01 13:43:51 +08:00
2025-11-25 18:13:02 +08:00
# ========================= 初始化 =========================
def _initialize(self):
2025-08-01 13:43:51 +08:00
self.process: Optional[subprocess.Popen] = None
2025-08-20 13:48:32 +08:00
self.comm_port = 34566
2025-11-25 18:13:02 +08:00
self._watchdog_running = False
2025-08-01 13:43:51 +08:00
self._stop_event = threading.Event()
2025-11-25 18:13:02 +08:00
self._restart_cooldown = 5 # 每次重启最少间隔
self._restart_fail_threshold = 3 # 端口检查连续失败几次才重启
self._restart_fail_count = 0
self._restart_window = 600 # 10 分钟
self._restart_limit = 5 # 最多次数
self._restart_record: List[float] = []
2025-11-05 17:07:51 +08:00
if os.name == "nt":
si = subprocess.STARTUPINFO()
si.dwFlags |= subprocess.STARTF_USESHOWWINDOW
si.wShowWindow = 0
self._si = si
2025-11-25 18:13:02 +08:00
else:
self._si = None
2025-11-05 17:07:51 +08:00
2025-08-01 13:43:51 +08:00
atexit.register(self.stop)
2025-11-25 18:13:02 +08:00
self._kill_orphans()
2025-11-05 17:07:51 +08:00
2025-11-25 18:13:02 +08:00
LogManager.info("FlaskSubprocessManager 初始化完成", udid="flask")
2025-08-01 13:43:51 +08:00
2025-11-25 18:13:02 +08:00
# ========================= 工具 =========================
def _log(self, level, msg):
print(msg)
if level == "info":
LogManager.info(msg, udid="flask")
elif level == "warn":
LogManager.warning(msg, udid="flask")
else:
LogManager.error(msg, udid="flask")
# 杀死残留 python.exe 占用端口
def _kill_orphans(self):
2025-09-17 22:23:57 +08:00
try:
if os.name == "nt":
2025-11-25 18:13:02 +08:00
out = subprocess.check_output(["netstat", "-ano"], text=True)
2025-09-17 22:23:57 +08:00
for line in out.splitlines():
if f"127.0.0.1:{self.comm_port}" in line and "LISTENING" in line:
pid = int(line.strip().split()[-1])
if pid != os.getpid():
2025-11-25 18:13:02 +08:00
subprocess.run(
["taskkill", "/F", "/PID", str(pid)],
capture_output=True
)
self._log("warn", f"[FlaskMgr] 杀死残留 Flask 实例 PID={pid}")
2025-09-17 22:23:57 +08:00
except Exception:
pass
2025-11-25 18:13:02 +08:00
def _port_alive(self):
"""检测 Flask 与 Quart 的两个端口是否活着"""
def _check(p):
try:
with socket.create_connection(("127.0.0.1", p), timeout=0.4):
return True
except Exception:
return False
return _check(self.comm_port) or _check(self.comm_port + 1)
# ========================= 启动 =========================
# ========================= 启动 =========================
2025-08-01 13:43:51 +08:00
def start(self):
with self._lock:
2025-11-25 18:13:02 +08:00
# 已经有一个在跑了就别重复起
if self.process and self.process.poll() is None:
self._log("warn", "[FlaskMgr] Flask 已在运行,跳过")
2025-09-15 16:01:27 +08:00
return
2025-08-15 20:04:59 +08:00
2025-11-25 18:13:02 +08:00
# 设定环境变量,给子进程用
2025-08-01 13:43:51 +08:00
env = os.environ.copy()
2025-08-15 20:04:59 +08:00
env["FLASK_COMM_PORT"] = str(self.comm_port)
2025-11-05 17:07:51 +08:00
2025-11-25 18:13:02 +08:00
# ✅ 正确判断是否是 Nuitka/打包后的 exe
# - 被 Nuitka 打包sys.frozen 会存在/为 True
# - 直接用 python 跑 .pysys.frozen 不存在
is_frozen = bool(getattr(sys, "frozen", False))
2025-08-15 20:04:59 +08:00
if is_frozen:
2025-11-25 18:13:02 +08:00
# 打包后的 exe 模式:直接调用自己
exe = Path(sys.executable).resolve()
cmd = [str(exe), "--role=flask"]
cwd = str(exe.parent)
2025-08-15 20:04:59 +08:00
else:
2025-11-25 18:13:02 +08:00
# 开发模式:用 python 去跑 Module/Main.py --role=flask
2025-11-05 17:07:51 +08:00
project_root = Path(__file__).resolve().parents[1]
2025-11-25 18:13:02 +08:00
main_py = project_root / "Module" / "Main.py"
cmd = [sys.executable, "-u", str(main_py), "--role=flask"]
2025-11-05 17:07:51 +08:00
cwd = str(project_root)
2025-08-15 20:04:59 +08:00
2025-11-25 18:13:02 +08:00
self._log("info", f"[FlaskMgr] 启动 Flask: {cmd}")
2025-08-01 13:43:51 +08:00
self.process = subprocess.Popen(
2025-08-15 20:04:59 +08:00
cmd,
stdout=subprocess.PIPE,
2025-09-15 16:01:27 +08:00
stderr=subprocess.STDOUT,
2025-08-15 20:04:59 +08:00
text=True,
env=env,
cwd=cwd,
2025-11-25 18:13:02 +08:00
bufsize=1,
startupinfo=self._si,
2025-11-05 17:07:51 +08:00
start_new_session=True,
2025-08-01 13:43:51 +08:00
)
2025-08-20 13:48:32 +08:00
2025-11-25 18:13:02 +08:00
# 异步吃子进程 stdout顺便打日志
threading.Thread(target=self._read_stdout, daemon=True).start()
2025-11-18 22:09:19 +08:00
2025-11-25 18:13:02 +08:00
# 看门狗只需要起一次
if not self._watchdog_running:
threading.Thread(target=self._watchdog_loop, daemon=True).start()
self._watchdog_running = True
2025-08-01 13:43:51 +08:00
2025-11-25 18:13:02 +08:00
self._log("info", f"[FlaskMgr] Flask 子进程已启动 PID={self.process.pid}")
2025-09-15 16:01:27 +08:00
2025-11-25 18:13:02 +08:00
def _read_stdout(self):
2025-11-05 17:07:51 +08:00
if not self.process or not self.process.stdout:
return
2025-09-15 16:01:27 +08:00
for line in iter(self.process.stdout.readline, ""):
if line:
2025-11-25 18:13:02 +08:00
self._log("info", f"[Flask] {line.rstrip()}")
2025-09-15 16:01:27 +08:00
2025-11-25 18:13:02 +08:00
# ========================= 停止 =========================
def stop(self):
2025-08-01 13:43:51 +08:00
with self._lock:
2025-11-25 18:13:02 +08:00
if not self.process:
return
try:
self.process.terminate()
except Exception:
pass
try:
self.process.wait(timeout=3)
except Exception:
pass
if self.process.poll() is None:
2025-11-06 21:50:28 +08:00
try:
2025-11-25 18:13:02 +08:00
self.process.kill()
2025-11-06 21:50:28 +08:00
except Exception:
pass
2025-11-25 18:13:02 +08:00
self._log("warn", "[FlaskMgr] 已停止 Flask 子进程")
self.process = None
# ========================= 看门狗 =========================
def _watchdog_loop(self):
self._log("info", "[FlaskWD] 看门狗已启动")
while not self._stop_event.is_set():
time.sleep(1.2)
# 1) 子进程退出
if not self.process or self.process.poll() is not None:
self._log("error", "[FlaskWD] Flask 子进程退出,准备重启")
self._restart()
2025-11-05 17:07:51 +08:00
continue
2025-11-25 18:13:02 +08:00
# 2) 端口不通
if not self._port_alive():
self._restart_fail_count += 1
self._log("warn", f"[FlaskWD] 端口检测失败 {self._restart_fail_count}/"
f"{self._restart_fail_threshold}")
if self._restart_fail_count >= self._restart_fail_threshold:
self._restart()
continue
# 3) 端口正常
self._restart_fail_count = 0
# ========================= 重启核心逻辑 =========================
def _restart(self):
now = time.time()
2025-11-25 18:13:02 +08:00
# 10 分钟限频
self._restart_record = [t for t in self._restart_record if now - t < self._restart_window]
if len(self._restart_record) >= self._restart_limit:
self._log("error", "[FlaskWD] 10 分钟内重启次数太多,暂停监控")
return
# 冷却
if self._restart_record and now - self._restart_record[-1] < self._restart_cooldown:
self._log("warn", "[FlaskWD] 冷却中,暂不重启")
return
2025-09-15 16:01:27 +08:00
2025-11-25 18:13:02 +08:00
self._log("warn", "[FlaskWD] >>> 重启 Flask 子进程 <<<")
2025-09-15 16:01:27 +08:00
2025-11-25 18:13:02 +08:00
# 执行重启
try:
self.stop()
time.sleep(1)
self.start()
self._restart_record.append(now)
self._restart_fail_count = 0
except Exception as e:
self._log("error", f"[FlaskWD] 重启失败: {e}")
# 重启后推送设备快照
self._push_snapshot()
# ========================= 推送设备快照 =========================
def _push_snapshot(self):
"""Flask 重启后重新同步 DeviceInfo 内容"""
try:
from Module.DeviceInfo import DeviceInfo
info = DeviceInfo()
with info._lock:
for m in info._models.values():
self.send(m.toDict())
except Exception:
pass
# ========================= 发送数据 =========================
def send(self, data: Union[str, Dict]):
if isinstance(data, dict):
data = json.dumps(data, ensure_ascii=False)
try:
with socket.create_connection(("127.0.0.1", self.comm_port), timeout=2) as s:
s.sendall((data + "\n").encode())
return True
except Exception:
return False
2025-08-01 13:43:51 +08:00
@classmethod
2025-11-25 18:13:02 +08:00
def get_instance(cls):
return cls()