Files
iOSAI/Utils/ThreadManager.py

139 lines
5.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Dict, Tuple, List
from Utils.LogManager import LogManager
class ThreadManager:
_tasks: Dict[str, Dict] = {}
_lock = threading.Lock()
@classmethod
def add(cls, udid: str, thread: threading.Thread, event: threading.Event) -> Tuple[int, str]:
"""
添加一个线程到线程管理器。
:param udid: 设备的唯一标识符
:param thread: 线程对象
:param event: 用于控制线程退出的 Event 对象
:return: 状态码和信息
"""
with cls._lock:
if udid in cls._tasks and cls._tasks[udid].get("running", False):
LogManager.method_info(f"任务添加失败:设备 {udid} 已存在运行中的任务", method="task")
return 400, f"该设备中已存在任务 {udid}"
# 如果任务已经存在但已停止,清理旧任务记录
if udid in cls._tasks and not cls._tasks[udid].get("running", False):
LogManager.method_info(f"清理设备 {udid} 的旧任务记录", method="task")
del cls._tasks[udid]
# 添加新任务记录
cls._tasks[udid] = {
"thread": thread,
"event": event,
"running": True
}
LogManager.method_info(f"设备 {udid} 开始任务成功", method="task")
return 200, f"创建任务成功 {udid}"
@classmethod
def stop(cls, udid: str) -> Tuple[int, str]:
"""
停止指定设备的线程。
:param udid: 设备的唯一标识符
:return: 状态码和信息
"""
with cls._lock:
if udid not in cls._tasks or not cls._tasks[udid].get("running", False):
LogManager.method_info(f"任务停止失败:设备 {udid} 没有执行相关任务", method="task")
return 400, f"当前设备没有执行相关任务 {udid}"
task = cls._tasks[udid]
event = task["event"]
thread = task["thread"]
LogManager.method_info(f"设备 {udid} 的任务正在停止", method="task")
# 设置停止标志位
event.set()
# 等待线程结束
thread.join(timeout=5) # 可设置超时时间,避免阻塞
# 清理任务记录
del cls._tasks[udid] # 删除任务记录
LogManager.method_info(f"设备 {udid} 的任务停止成功", method="task")
return 200, f"当前任务停止成功 {udid}"
@classmethod
def batch_stop(cls, udids: List[str]) -> Tuple[int, List[str], str]:
"""
批量停止任务——瞬间下发停止信号,仍统计失败结果。
返回格式与原接口 100% 兼容:
(code, fail_list, msg)
code=200 全部成功
code=1001 部分/全部失败
"""
if not udids:
return 200, [], "无设备需要停止"
fail_list: List[str] = []
# ---------- 1. 瞬间置位 event ----------
with cls._lock:
for udid in udids:
task = cls._tasks.get(udid)
if not task or not task.get("running"):
# fail_list.append(f"设备{udid}停止失败:当前设备没有执行相关任务")
continue
task["event"].set() # 下发停止信号
# ---------- 2. 并发等 0.2 s 收尾 ----------
def _wait_and_clean(udid: str) -> Tuple[int, str]:
with cls._lock:
task = cls._tasks.get(udid)
if not task:
return 400, "任务记录已丢失"
thread = task["thread"]
# 第一次等 3 秒,让“分片睡眠”有机会退出
thread.join(timeout=3)
# 如果还活,再补 2 秒
if thread.is_alive():
thread.join(timeout=2)
# 最终仍活,记录日志但不硬杀,避免僵尸
with cls._lock:
cls._tasks.pop(udid, None)
if thread.is_alive():
LogManager.warning(f"[batch_stop] 线程 5s 未退出,已清理记录但线程仍跑 {udid}")
return 201, "已下发停止,线程超长任务未立即结束"
return 200, "已停止"
with ThreadPoolExecutor(max_workers=min(32, len(udids))) as executor:
future_map = {executor.submit(_wait_and_clean, udid): udid for udid in udids}
for future in as_completed(future_map):
udid = future_map[future]
try:
code, reason = future.result()
if code != 200:
fail_list.append(f"设备{udid}停止失败:{reason}")
except Exception as exc:
fail_list.append(f"设备{udid}停止异常:{exc}")
# ---------- 3. 返回兼容格式 ----------
if fail_list:
return 1001, fail_list, "部分设备停止失败"
return 200, [], "全部设备停止成功"
@classmethod
def is_task_running(cls, udid: str) -> bool:
"""
检查任务是否正在运行。
:param udid: 设备的唯一标识符
:return: True 表示任务正在运行False 表示没有任务运行
"""
with cls._lock:
is_running = cls._tasks.get(udid, {}).get("running", False)
LogManager.method_info(f"检查设备 {udid} 的任务状态:{'运行中' if is_running else '未运行'}", method="task")
return is_running