Files
iOSAI/Module/FlaskSubprocessManager.py
2025-08-01 13:43:51 +08:00

101 lines
3.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import subprocess
import threading
import atexit
import json
import os
import socket
import time
from typing import Optional, Union, Dict, List
class FlaskSubprocessManager:
_instance: Optional['FlaskSubprocessManager'] = None
_lock: threading.Lock = threading.Lock()
def __new__(cls):
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._init_manager()
return cls._instance
def _init_manager(self):
self.process: Optional[subprocess.Popen] = None
self.comm_port = self._find_available_port()
self._stop_event = threading.Event()
atexit.register(self.stop)
def _find_available_port(self):
"""动态获取可用端口"""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(('0.0.0.0', 0))
return s.getsockname()[1]
def start(self):
"""启动子进程Windows兼容方案"""
with self._lock:
if self.process is not None:
raise RuntimeError("子进程已在运行中!")
# 通过环境变量传递通信端口
env = os.environ.copy()
env['FLASK_COMM_PORT'] = str(self.comm_port)
self.process = subprocess.Popen(
['python', 'Flask/FlaskService.py'], # 启动一个子进程 FlaskService.py
stdin=subprocess.PIPE, # 标准输入流,用于向子进程发送数据
stdout=subprocess.PIPE, # 标准输出流,用于接收子进程的输出
stderr=subprocess.PIPE, # 标准错误流,用于接收子进程的错误信息
text=True, # 以文本模式打开流,否则以二进制模式打开
bufsize=1, # 缓冲区大小设置为 1表示行缓冲
encoding='utf-8', # 指定编码为 UTF-8确保控制台输出不会报错
env=env # 指定子进程的环境变量
)
print(f"Flask子进程启动 (PID: {self.process.pid}, 通信端口: {self.comm_port})")
# 将日志通过主进程输出
def print_output():
while True:
output = self.process.stdout.readline()
if not output:
break
print(output.strip())
while True:
error = self.process.stderr.readline()
if not error:
break
print(f"Error: {error.strip()}")
threading.Thread(target=print_output, daemon=True).start()
def send(self, data: Union[str, Dict, List]) -> bool:
"""通过Socket发送数据"""
try:
if not isinstance(data, str):
data = json.dumps(data)
# 等待子进程启动并准备好
time.sleep(1) # 延时1秒根据实际情况调整
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect(('127.0.0.1', self.comm_port))
s.sendall((data + "\n").encode('utf-8'))
return True
except ConnectionRefusedError:
print(f"连接被拒绝,确保子进程在端口 {self.comm_port} 上监听")
return False
except Exception as e:
print(f"发送失败: {e}")
return False
def stop(self):
with self._lock:
if self.process and self.process.poll() is None:
print(f"[INFO] Stopping Flask child process (PID: {self.process.pid})...")
self.process.terminate()
self.process.wait()
print("[INFO] Flask child process stopped.")
self._stop_event.set()
else:
print("[INFO] No Flask child process to stop.")
@classmethod
def get_instance(cls) -> 'FlaskSubprocessManager':
return cls()