108 lines
4.3 KiB
Python
108 lines
4.3 KiB
Python
import threading
|
||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||
from typing import Dict, Tuple, List
|
||
from Utils.LogManager import LogManager
|
||
|
||
|
||
class ThreadManager:
|
||
_tasks: Dict[str, Dict] = {}
|
||
_lock = threading.Lock()
|
||
|
||
@classmethod
|
||
def add(cls, udid: str, thread: threading.Thread, event: threading.Event) -> Tuple[int, str]:
|
||
"""
|
||
添加一个线程到线程管理器。
|
||
:param udid: 设备的唯一标识符
|
||
:param thread: 线程对象
|
||
:param event: 用于控制线程退出的 Event 对象
|
||
:return: 状态码和信息
|
||
"""
|
||
with cls._lock:
|
||
if udid in cls._tasks and cls._tasks[udid].get("running", False):
|
||
LogManager.method_info(f"任务添加失败:设备 {udid} 已存在运行中的任务", method="task")
|
||
return 400, f"该设备中已存在任务 {udid}"
|
||
|
||
# 如果任务已经存在但已停止,清理旧任务记录
|
||
if udid in cls._tasks and not cls._tasks[udid].get("running", False):
|
||
LogManager.method_info(f"清理设备 {udid} 的旧任务记录", method="task")
|
||
del cls._tasks[udid]
|
||
|
||
# 添加新任务记录
|
||
cls._tasks[udid] = {
|
||
"thread": thread,
|
||
"event": event,
|
||
"running": True
|
||
}
|
||
LogManager.method_info(f"设备 {udid} 开始任务成功", method="task")
|
||
return 200, f"创建任务成功 {udid}"
|
||
|
||
@classmethod
|
||
def stop(cls, udid: str) -> Tuple[int, str]:
|
||
"""
|
||
停止指定设备的线程。
|
||
:param udid: 设备的唯一标识符
|
||
:return: 状态码和信息
|
||
"""
|
||
with cls._lock:
|
||
if udid not in cls._tasks or not cls._tasks[udid].get("running", False):
|
||
LogManager.method_info(f"任务停止失败:设备 {udid} 没有执行相关任务", method="task")
|
||
return 400, f"当前设备没有执行相关任务 {udid}"
|
||
|
||
task = cls._tasks[udid]
|
||
event = task["event"]
|
||
thread = task["thread"]
|
||
|
||
LogManager.method_info(f"设备 {udid} 的任务正在停止", method="task")
|
||
|
||
# 设置停止标志位
|
||
event.set()
|
||
|
||
# 等待线程结束
|
||
thread.join(timeout=5) # 可设置超时时间,避免阻塞
|
||
|
||
# 清理任务记录
|
||
del cls._tasks[udid] # 删除任务记录
|
||
LogManager.method_info(f"设备 {udid} 的任务停止成功", method="task")
|
||
return 200, f"当前任务停止成功 {udid}"
|
||
|
||
@classmethod
|
||
def batch_stop(cls, udids: List[str]) -> Tuple[int, List[str], str]:
|
||
"""
|
||
批量停止任务,使用多线程并发执行。
|
||
:param udids: 待停止的设备唯一标识列表
|
||
:return: (code, fail_list, msg)
|
||
code=200 全部成功,fail_list=[]
|
||
code=1001 部分/全部失败,fail_list 为失败描述字符串列表
|
||
"""
|
||
if not udids:
|
||
return 200, [], "无设备需要停止"
|
||
|
||
fail_list: List[str] = []
|
||
|
||
with ThreadPoolExecutor(max_workers=min(32, len(udids))) as executor:
|
||
future_map = {executor.submit(cls.stop, udid): udid for udid in udids}
|
||
for future in as_completed(future_map):
|
||
udid = future_map[future]
|
||
try:
|
||
code, reason = future.result()
|
||
if code != 200:
|
||
fail_list.append(f"设备{udid}停止失败:{reason}")
|
||
except Exception as exc:
|
||
fail_list.append(f"设备{udid}停止异常:{exc}")
|
||
if fail_list:
|
||
return 1001, fail_list, "停止失败"
|
||
return 200, [], "全部设备停止成功"
|
||
|
||
|
||
@classmethod
|
||
def is_task_running(cls, udid: str) -> bool:
|
||
"""
|
||
检查任务是否正在运行。
|
||
:param udid: 设备的唯一标识符
|
||
:return: True 表示任务正在运行,False 表示没有任务运行
|
||
"""
|
||
with cls._lock:
|
||
is_running = cls._tasks.get(udid, {}).get("running", False)
|
||
LogManager.method_info(f"检查设备 {udid} 的任务状态:{'运行中' if is_running else '未运行'}", method="task")
|
||
return is_running
|