Files
iOSAI/Utils/ThreadManager.py

141 lines
5.0 KiB
Python
Raw Normal View History

2025-09-22 19:10:58 +08:00
import ctypes
2025-09-17 22:23:57 +08:00
import threading
2025-10-24 22:04:28 +08:00
import time
2025-09-19 19:39:32 +08:00
from typing import Dict, Tuple, List
2025-09-22 19:10:58 +08:00
2025-08-06 22:11:33 +08:00
from Utils.LogManager import LogManager
2025-10-24 22:04:28 +08:00
def _async_raise(tid: int, exc_type=KeyboardInterrupt) -> bool:
"""向指定线程抛异常(兜底方案)"""
try:
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(
ctypes.c_long(tid), ctypes.py_object(exc_type)
)
if res == 0:
LogManager.method_info(f"线程 {tid} 不存在", "task")
return False
elif res > 1:
ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), 0)
LogManager.method_info(f"线程 {tid} 命中多个线程,已回滚", "task")
return False
return True
except Exception as e:
LogManager.method_error(f"强杀线程失败: {e}", "task")
return False
2025-09-22 19:10:58 +08:00
2025-09-17 22:23:57 +08:00
class ThreadManager:
2025-09-18 20:09:52 +08:00
_tasks: Dict[str, Dict] = {}
2025-10-24 22:04:28 +08:00
_lock = threading.RLock()
@classmethod
def _cleanup_if_dead(cls, udid: str):
"""如果任务线程已结束,清理占位"""
obj = cls._tasks.get(udid)
if obj and not obj["thread"].is_alive():
cls._tasks.pop(udid, None)
LogManager.method_info(f"检测到 [{udid}] 线程已结束,自动清理。", "task")
2025-08-06 22:11:33 +08:00
@classmethod
2025-10-24 22:04:28 +08:00
def add(cls, udid: str, thread: threading.Thread, event: threading.Event, force: bool = False) -> Tuple[int, str]:
2025-09-18 20:09:52 +08:00
with cls._lock:
2025-10-24 22:04:28 +08:00
cls._cleanup_if_dead(udid)
# 已存在任务还在运行
old = cls._tasks.get(udid)
if old and old["thread"].is_alive():
if not force:
return 1001, "当前设备已存在任务"
LogManager.method_info(f"[{udid}] 检测到旧任务,尝试强制停止", "task")
cls._force_stop_locked(udid)
# 启动新任务
try:
thread.start()
cls._tasks[udid] = {
"id": thread.ident,
"thread": thread,
"event": event,
"start_time": time.time(),
}
LogManager.method_info(f"创建任务成功 [{udid}]线程ID={thread.ident}", "task")
return 200, "创建成功"
except Exception as e:
LogManager.method_error(f"线程启动失败: {e}", "task")
return 1002, f"线程启动失败: {e}"
2025-08-06 22:11:33 +08:00
@classmethod
2025-10-24 22:04:28 +08:00
def stop(cls, udid: str, stop_timeout: float = 5.0, kill_timeout: float = 2.0) -> Tuple[int, str]:
"""安全停止单个任务"""
with cls._lock:
obj = cls._tasks.get(udid)
if not obj:
return 200, "任务不存在"
thread = obj["thread"]
event = obj["event"]
tid = obj["id"]
LogManager.method_info(f"请求停止 [{udid}] 线程ID={tid}", "task")
# 已经结束
if not thread.is_alive():
2025-09-22 19:10:58 +08:00
cls._tasks.pop(udid, None)
2025-10-24 22:04:28 +08:00
return 200, "已结束"
# 1. 协作式停止
try:
event.set()
except Exception as e:
LogManager.method_error(f"[{udid}] 设置停止事件失败: {e}", "task")
thread.join(timeout=stop_timeout)
if not thread.is_alive():
cls._tasks.pop(udid, None)
LogManager.method_info(f"[{udid}] 协作式停止成功", "task")
return 200, "已停止"
# 2. 强杀兜底
LogManager.method_info(f"[{udid}] 协作式超时,尝试强杀", "task")
if _async_raise(tid):
thread.join(timeout=kill_timeout)
if not thread.is_alive():
cls._tasks.pop(udid, None)
LogManager.method_info(f"[{udid}] 强杀成功", "task")
return 200, "已停止"
# 3. 最终兜底:标记释放占位
LogManager.method_error(f"[{udid}] 无法停止(线程可能卡死),已释放占位", "task")
cls._tasks.pop(udid, None)
return 206, "停止超时,线程可能仍在后台运行"
2025-09-17 22:23:57 +08:00
2025-09-19 19:39:32 +08:00
@classmethod
2025-10-24 22:04:28 +08:00
def _force_stop_locked(cls, udid: str):
"""内部用,带锁强制停止旧任务"""
obj = cls._tasks.get(udid)
if not obj:
return
2025-09-22 19:10:58 +08:00
try:
2025-10-24 22:04:28 +08:00
event = obj["event"]
event.set()
obj["thread"].join(timeout=2)
if obj["thread"].is_alive():
_async_raise(obj["id"])
obj["thread"].join(timeout=1)
2025-09-22 19:10:58 +08:00
except Exception as e:
2025-10-24 22:04:28 +08:00
LogManager.method_error(f"[{udid}] 强制停止失败: {e}", "task")
finally:
cls._tasks.pop(udid, None)
2025-09-19 19:39:32 +08:00
2025-09-18 20:09:52 +08:00
@classmethod
2025-10-24 22:04:28 +08:00
def batch_stop(cls, ids: List[str]) -> Tuple[int, str]:
failed = []
for udid in ids:
code, msg = cls.stop(udid)
if code != 200:
failed.append(udid)
if failed:
return 207, f"部分任务未成功停止: {failed}"
return 200, "全部停止成功"