Files
iOSAI/Utils/ThreadManager.py
2025-11-07 18:32:42 +08:00

635 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# import ctypes
# import threading
# import time
# import os
# import signal
# from concurrent.futures import ThreadPoolExecutor, as_completed
# from typing import Dict, Tuple, List, Optional
#
# from Utils.LogManager import LogManager
#
# try:
# import psutil # 可选:用来级联杀子进程
# except Exception:
# psutil = None
#
#
# def _async_raise(tid: int, exc_type=SystemExit) -> bool:
# """向指定线程异步注入异常(仅对 Python 解释器栈可靠)"""
# if not tid:
# LogManager.method_error("强杀失败: 线程ID为空", "task")
# return False
# try:
# res = ctypes.pythonapi.PyThreadState_SetAsyncExc(
# ctypes.c_long(tid), ctypes.py_object(exc_type)
# )
# if res == 0:
# LogManager.method_info(f"线程 {tid} 不存在", "task")
# return False
# elif res > 1:
# ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), 0)
# LogManager.method_info(f"线程 {tid} 命中多个线程,已回滚", "task")
# return False
# return True
# except Exception as e:
# LogManager.method_error(f"强杀线程失败: {e}", "task")
# return False
#
#
# class ThreadManager:
# """
# 维持你的 add(udid, thread, event) 调用方式不变。
# - 线程统一设为 daemon
# - 停止:协作 -> 多次强杀注入 -> zombie 放弃占位
# - 可选:注册并级联杀掉业务里创建的外部子进程
# """
# _tasks: Dict[str, Dict] = {}
# _lock = threading.RLock()
#
# @classmethod
# def _cleanup_if_dead(cls, udid: str):
# obj = cls._tasks.get(udid)
# if obj:
# th = obj.get("thread")
# if th and not th.is_alive():
# cls._tasks.pop(udid, None)
# LogManager.method_info(f"检测到 [{udid}] 线程已结束,自动清理。", "task")
#
# @classmethod
# def register_child_pid(cls, udid: str, pid: int):
# """业务里如果起了 adb/scrcpy/ffmpeg 等外部进程,请在启动后调用这个登记,便于 stop 时一起杀掉。"""
# with cls._lock:
# obj = cls._tasks.get(udid)
# if not obj:
# return
# pids: set = obj.setdefault("child_pids", set())
# pids.add(int(pid))
# LogManager.method_info(f"[{udid}] 记录子进程 PID={pid}", "task")
#
# @classmethod
# def _kill_child_pids(cls, udid: str, child_pids: Optional[set]):
# if not child_pids:
# return
# for pid in list(child_pids):
# try:
# if psutil:
# if psutil.pid_exists(pid):
# proc = psutil.Process(pid)
# # 先温柔 terminate再等 0.5 秒,仍活则 kill并级联子进程
# for c in proc.children(recursive=True):
# try:
# c.terminate()
# except Exception:
# pass
# proc.terminate()
# gone, alive = psutil.wait_procs([proc], timeout=0.5)
# for a in alive:
# try:
# a.kill()
# except Exception:
# pass
# else:
# if os.name == "nt":
# os.system(f"taskkill /PID {pid} /T /F >NUL 2>&1")
# else:
# try:
# os.kill(pid, signal.SIGTERM)
# time.sleep(0.2)
# os.kill(pid, signal.SIGKILL)
# except Exception:
# pass
# LogManager.method_info(f"[{udid}] 已尝试结束子进程 PID={pid}", "task")
# except Exception as e:
# LogManager.method_error(f"[{udid}] 结束子进程 {pid} 异常: {e}", "task")
#
# @classmethod
# def add(cls, udid: str, thread: threading.Thread, event: threading.Event, force: bool = False) -> Tuple[int, str]:
# with cls._lock:
# cls._cleanup_if_dead(udid)
# old = cls._tasks.get(udid)
# if old and old["thread"].is_alive():
# if not force:
# return 1001, "当前设备已存在任务"
# LogManager.method_info(f"[{udid}] 检测到旧任务,尝试强制停止", "task")
# cls._force_stop_locked(udid)
#
# # 强制守护线程,防止进程被挂死
# try:
# thread.daemon = True
# except Exception:
# pass
#
# try:
# thread.start()
# # 等 ident 初始化
# for _ in range(20):
# if thread.ident:
# break
# time.sleep(0.02)
#
# cls._tasks[udid] = {
# "id": thread.ident,
# "thread": thread,
# "event": event,
# "start_time": time.time(),
# "state": "running",
# "child_pids": set(),
# }
# LogManager.method_info(f"创建任务成功 [{udid}]线程ID={thread.ident}", "task")
# return 200, "创建成功"
# except Exception as e:
# LogManager.method_error(f"线程启动失败: {e}", "task")
# return 1002, f"线程启动失败: {e}"
#
# @classmethod
# def stop(cls, udid: str, stop_timeout: float = 5.0, kill_timeout: float = 2.0) -> Tuple[int, str]:
# with cls._lock:
# obj = cls._tasks.get(udid)
# if not obj:
# return 200, "任务不存在"
#
# thread = obj["thread"]
# event = obj["event"]
# tid = obj["id"]
# child_pids = set(obj.get("child_pids") or [])
#
# LogManager.method_info(f"请求停止 [{udid}] 线程ID={tid}", "task")
#
# if not thread.is_alive():
# cls._tasks.pop(udid, None)
# return 200, "已结束"
#
# obj["state"] = "stopping"
#
# # 先把 event 打开,给协作退出的机会
# try:
# event.set()
# except Exception as e:
# LogManager.method_error(f"[{udid}] 设置停止事件失败: {e}", "task")
#
# def _wait_stop():
# # 高频窗口 1s很多 I/O 点会在这个窗口立刻感知到)
# t0 = time.time()
# while time.time() - t0 < 1.0 and thread.is_alive():
# time.sleep(0.05)
#
# # 子进程先收拾(避免后台外部程序继续卡死)
# cls._kill_child_pids(udid, child_pids)
#
# # 正常 join 窗口
# if thread.is_alive():
# thread.join(timeout=stop_timeout)
#
# # 仍活着 → 多次注入 SystemExit
# if thread.is_alive():
# LogManager.method_info(f"[{udid}] 协作超时 -> 尝试强杀注入", "task")
# for i in range(6):
# ok = _async_raise(tid, SystemExit)
# # 给解释器一些调度时间
# time.sleep(0.06)
# if not thread.is_alive():
# break
#
# # 最后等待 kill_timeout
# if thread.is_alive():
# thread.join(timeout=kill_timeout)
#
# with cls._lock:
# if not thread.is_alive():
# LogManager.method_info(f"[{udid}] 停止成功", "task")
# cls._tasks.pop(udid, None)
# else:
# # 彻底杀不掉:标记 zombie、释放占位
# LogManager.method_error(f"[{udid}] 停止失败(线程卡死),标记为 zombie释放占位", "task")
# obj = cls._tasks.get(udid)
# if obj:
# obj["state"] = "zombie"
# cls._tasks.pop(udid, None)
#
# threading.Thread(target=_wait_stop, daemon=True).start()
# return 200, "停止请求已提交"
#
# @classmethod
# def _force_stop_locked(cls, udid: str):
# """持锁情况下的暴力停止(用于 add(force=True) 覆盖旧任务)"""
# obj = cls._tasks.get(udid)
# if not obj:
# return
# th = obj["thread"]
# tid = obj["id"]
# event = obj["event"]
# child_pids = set(obj.get("child_pids") or [])
# try:
# try:
# event.set()
# except Exception:
# pass
# cls._kill_child_pids(udid, child_pids)
# th.join(timeout=1.5)
# if th.is_alive():
# for _ in range(6):
# _async_raise(tid, SystemExit)
# time.sleep(0.05)
# if not th.is_alive():
# break
# th.join(timeout=0.8)
# except Exception as e:
# LogManager.method_error(f"[{udid}] 强制停止失败: {e}", "task")
# finally:
# cls._tasks.pop(udid, None)
#
# @classmethod
# def status(cls, udid: str) -> Dict:
# with cls._lock:
# obj = cls._tasks.get(udid)
# if not obj:
# return {"exists": False}
# o = {
# "exists": True,
# "state": obj.get("state"),
# "start_time": obj.get("start_time"),
# "thread_id": obj.get("id"),
# "alive": obj["thread"].is_alive(),
# "child_pids": list(obj.get("child_pids") or []),
# }
# return o
#
# # @classmethod
# # def batch_stop(cls, ids: List[str]) -> Tuple[int, str]:
# # failed = []
# # with ThreadPoolExecutor(max_workers=4) as executor:
# # futures = {executor.submit(cls.stop, udid): udid for udid in ids}
# # for future in as_completed(futures):
# # udid = futures[future]
# # try:
# # code, msg = future.result()
# # except Exception as e:
# # LogManager.method_error(f"[{udid}] stop 调用异常: {e}", "task")
# # failed.append(udid)
# # continue
# # if code != 200:
# # failed.append(udid)
# # if failed:
# # return 207, f"部分任务停止失败: {failed}"
# # return 200, "全部停止请求已提交"
#
# @classmethod
# def batch_stop(cls, ids: List[str]) -> Tuple[int, str]:
# failed = []
# results = []
#
# with ThreadPoolExecutor(max_workers=4) as executor:
# futures = {executor.submit(cls.stop, udid): udid for udid in ids}
# for future in as_completed(futures):
# udid = futures[future]
# try:
# code, msg = future.result()
# results.append((udid, code, msg))
# except Exception as e:
# LogManager.method_error(f"[{udid}] stop 调用异常: {e}", "task")
# failed.append(udid)
# continue
# if code != 200:
# failed.append(udid)
#
# # 等待所有线程完全停止
# for udid, code, msg in results:
# if code == 200:
# obj = cls._tasks.get(udid)
# if obj:
# thread = obj["thread"]
# while thread.is_alive():
# time.sleep(0.1)
#
# if failed:
# return 207, f"部分任务停止失败: {failed}"
# return 200, "全部任务已成功停止"
import ctypes
import threading
import time
import os
import signal
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Dict, Tuple, List, Optional
# 假设 LogManager 存在
class MockLogManager:
@staticmethod
def method_error(msg, category):
print(f"[ERROR:{category}] {msg}")
@staticmethod
def method_info(msg, category):
print(f"[INFO:{category}] {msg}")
LogManager = MockLogManager
# from Utils.LogManager import LogManager # 恢复实际导入
try:
import psutil # 可选:用来级联杀子进程
except Exception:
psutil = None
def _async_raise(tid: int, exc_type=SystemExit) -> bool:
"""
向指定线程异步注入异常。
注意此方法在线程阻塞于C/OS调用如I/O等待时可能无效或延迟。
"""
if not tid:
LogManager.method_error("强杀失败: 线程ID为空", "task")
return False
try:
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(
ctypes.c_long(tid), ctypes.py_object(exc_type)
)
if res == 0:
# 线程可能已经退出
return False
elif res > 1:
# 命中多个线程,非常罕见,回滚以防误杀
ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), 0)
LogManager.method_info(f"线程 {tid} 命中多个线程,已回滚", "task")
return False
return True
except Exception as e:
LogManager.method_error(f"强杀线程失败: {e}", "task")
return False
class ThreadManager:
"""
线程管理类支持协作停止、强制注入SystemExit和级联杀死外部子进程。
注意stop 方法已改为同步阻塞直到线程真正停止或被标记为zombie。
"""
_tasks: Dict[str, Dict] = {}
_lock = threading.RLock()
@classmethod
def _cleanup_if_dead(cls, udid: str):
obj = cls._tasks.get(udid)
if obj:
th = obj.get("thread")
if th and not th.is_alive():
cls._tasks.pop(udid, None)
LogManager.method_info(f"检测到 [{udid}] 线程已结束,自动清理。", "task")
@classmethod
def register_child_pid(cls, udid: str, pid: int):
"""登记外部子进程 PID便于 stop 时一起杀掉。"""
with cls._lock:
obj = cls._tasks.get(udid)
if not obj:
return
pids: set = obj.setdefault("child_pids", set())
# 确保 pid 是 int 类型
pids.add(int(pid))
LogManager.method_info(f"[{udid}] 记录子进程 PID={pid}", "task")
@classmethod
def _kill_child_pids(cls, udid: str, child_pids: Optional[set]):
"""终止所有已登记的外部子进程及其子进程(重要:用于解决 I/O 阻塞)。"""
if not child_pids:
return
# 创建一个副本,防止迭代过程中集合被修改
for pid in list(child_pids):
try:
if psutil:
if psutil.pid_exists(pid):
proc = psutil.Process(pid)
# 级联终止所有后代进程
for c in proc.children(recursive=True):
try:
c.terminate()
except Exception:
pass
# 先温柔 terminate
proc.terminate()
gone, alive = psutil.wait_procs([proc], timeout=0.5)
# 仍活则 kill
for a in alive:
try:
a.kill()
except Exception:
pass
else:
# 无 psutil 时的系统命令兜底
if os.name == "nt":
# /T 级联杀死子进程 /F 强制 /NUL 隐藏输出
os.system(f"taskkill /PID {pid} /T /F >NUL 2>&1")
else:
try:
os.kill(pid, signal.SIGTERM)
time.sleep(0.2)
os.kill(pid, signal.SIGKILL)
except Exception:
pass
LogManager.method_info(f"[{udid}] 已尝试结束子进程 PID={pid}", "task")
except Exception as e:
LogManager.method_error(f"[{udid}] 结束子进程 {pid} 异常: {e}", "task")
# 在同步停止模式下,这里不主动清理 set。
@classmethod
def add(cls, udid: str, thread: threading.Thread, event: threading.Event, force: bool = False) -> Tuple[int, str]:
with cls._lock:
cls._cleanup_if_dead(udid)
old = cls._tasks.get(udid)
if old and old["thread"].is_alive():
if not force:
return 1001, "当前设备已存在任务"
LogManager.method_info(f"[{udid}] 检测到旧任务,尝试强制停止", "task")
cls._force_stop_locked(udid)
# 强制守护线程,防止进程被挂死
try:
thread.daemon = True
except Exception:
pass
try:
thread.start()
# 等 ident 初始化
for _ in range(20):
if thread.ident:
break
time.sleep(0.02)
# 获取线程 ID
tid = thread.ident
cls._tasks[udid] = {
"id": tid,
"thread": thread,
"event": event,
"start_time": time.time(),
"state": "running",
"child_pids": set(),
}
LogManager.method_info(f"创建任务成功 [{udid}]线程ID={tid}", "task")
return 200, "创建成功"
except Exception as e:
LogManager.method_error(f"线程启动失败: {e}", "task")
return 1002, f"线程启动失败: {e}"
@classmethod
def stop(cls, udid: str, stop_timeout: float = 5.0, kill_timeout: float = 2.0) -> Tuple[int, str]:
"""同步阻塞请求停止任务直到线程真正停止或被标记为zombie。"""
# 1. 初始检查、状态设置和事件触发 (需要锁)
with cls._lock:
obj = cls._tasks.get(udid)
if not obj:
return 200, "任务不存在"
thread = obj["thread"]
event = obj["event"]
tid = obj["id"]
# 拷贝 child_pids以便在释放锁后使用
child_pids = set(obj.get("child_pids") or [])
LogManager.method_info(f"请求停止 [{udid}] 线程ID={tid}", "task")
if not thread.is_alive():
cls._tasks.pop(udid, None)
return 200, "已结束"
obj["state"] = "stopping"
# 先把 event 打开,给协作退出的机会
try:
event.set()
except Exception as e:
LogManager.method_error(f"[{udid}] 设置停止事件失败: {e}", "task")
# 锁已释放。以下执行耗时的阻塞操作。
# ----------------- 阻塞停止逻辑开始 -----------------
# 2. 预等待窗口 1s
t0 = time.time()
while time.time() - t0 < 1.0 and thread.is_alive():
time.sleep(0.05)
# 3. 子进程先收拾 (优先解决 I/O 阻塞)
cls._kill_child_pids(udid, child_pids)
# 4. 正常 join 窗口
if thread.is_alive():
thread.join(timeout=stop_timeout)
# 5. 仍活着 → 多次注入 SystemExit
if thread.is_alive():
LogManager.method_info(f"[{udid}] 协作超时 -> 尝试强杀注入", "task")
for i in range(6):
# 确保 tid 存在
if tid:
_async_raise(tid, SystemExit)
time.sleep(0.06)
if not thread.is_alive():
break
# 6. 最后等待 kill_timeout
if thread.is_alive():
thread.join(timeout=kill_timeout)
# ----------------- 阻塞停止逻辑结束 -----------------
# 7. 清理和返回结果 (需要重新加锁)
final_result_code: int = 500
final_result_msg: str = "停止失败(线程卡死)"
with cls._lock:
if not thread.is_alive():
LogManager.method_info(f"[{udid}] 停止成功", "task")
cls._tasks.pop(udid, None)
final_result_code = 200
final_result_msg = "停止成功"
else:
# 彻底杀不掉:标记 zombie、释放占位
LogManager.method_error(f"[{udid}] 停止失败(线程卡死),标记为 zombie释放占位", "task")
obj = cls._tasks.get(udid)
if obj:
obj["state"] = "zombie"
# 即使卡死,也移除任务记录,防止后续操作被阻塞
cls._tasks.pop(udid, None)
final_result_code = 500
final_result_msg = "停止失败(线程卡死)"
return final_result_code, final_result_msg
@classmethod
def _force_stop_locked(cls, udid: str):
"""持锁情况下的暴力停止(用于 add(force=True) 覆盖旧任务)"""
obj = cls._tasks.get(udid)
if not obj:
return
th = obj["thread"]
tid = obj["id"]
event = obj["event"]
child_pids = set(obj.get("child_pids") or [])
try:
try:
event.set()
except Exception:
pass
cls._kill_child_pids(udid, child_pids)
th.join(timeout=1.5)
if th.is_alive():
for _ in range(6):
if tid:
_async_raise(tid, SystemExit)
time.sleep(0.05)
if not th.is_alive():
break
th.join(timeout=0.8)
except Exception as e:
LogManager.method_error(f"[{udid}] 强制停止失败: {e}", "task")
finally:
cls._tasks.pop(udid, None)
@classmethod
def status(cls, udid: str) -> Dict:
with cls._lock:
obj = cls._tasks.get(udid)
if not obj:
return {"exists": False}
o = {
"exists": True,
"state": obj.get("state"),
"start_time": obj.get("start_time"),
"thread_id": obj.get("id"),
"alive": obj["thread"].is_alive(),
"child_pids": list(obj.get("child_pids") or []),
}
return o
@classmethod
def batch_stop(cls, ids: List[str]) -> Tuple[int, str]:
"""
批量停止任务。由于 stop 方法现在是同步阻塞的,此方法将等待所有线程完全停止后返回。
"""
failed = []
# 1. 并发发出所有停止请求 (现在是并发执行阻塞停止)
with ThreadPoolExecutor(max_workers=4) as executor:
futures = {executor.submit(cls.stop, udid): udid for udid in ids}
for future in as_completed(futures):
udid = futures[future]
try:
code, msg = future.result()
# 检查是否成功停止(状态码 200
if code != 200:
failed.append(f"{udid} ({msg})")
except Exception as e:
LogManager.method_error(f"[{udid}] stop 调用异常: {e}", "task")
failed.append(f"{udid} (异常)")
# 2. 返回结果
if failed:
# 返回 207 表示部分失败或全部失败
return 207, f"部分任务停止失败: {', '.join(failed)}"
# 返回 200 表示所有任务都已成功停止(因为 stop 方法是同步阻塞的)
return 200, "全部任务已成功停止"