Files
iOSAI/Utils/ThreadManager.py

635 lines
24 KiB
Python
Raw Normal View History

2025-11-07 18:32:42 +08:00
# import ctypes
# import threading
# import time
# import os
# import signal
# from concurrent.futures import ThreadPoolExecutor, as_completed
# from typing import Dict, Tuple, List, Optional
#
# from Utils.LogManager import LogManager
#
# try:
# import psutil # 可选:用来级联杀子进程
# except Exception:
# psutil = None
#
#
# def _async_raise(tid: int, exc_type=SystemExit) -> bool:
# """向指定线程异步注入异常(仅对 Python 解释器栈可靠)"""
# if not tid:
# LogManager.method_error("强杀失败: 线程ID为空", "task")
# return False
# try:
# res = ctypes.pythonapi.PyThreadState_SetAsyncExc(
# ctypes.c_long(tid), ctypes.py_object(exc_type)
# )
# if res == 0:
# LogManager.method_info(f"线程 {tid} 不存在", "task")
# return False
# elif res > 1:
# ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), 0)
# LogManager.method_info(f"线程 {tid} 命中多个线程,已回滚", "task")
# return False
# return True
# except Exception as e:
# LogManager.method_error(f"强杀线程失败: {e}", "task")
# return False
#
#
# class ThreadManager:
# """
# 维持你的 add(udid, thread, event) 调用方式不变。
# - 线程统一设为 daemon
# - 停止:协作 -> 多次强杀注入 -> zombie 放弃占位
# - 可选:注册并级联杀掉业务里创建的外部子进程
# """
# _tasks: Dict[str, Dict] = {}
# _lock = threading.RLock()
#
# @classmethod
# def _cleanup_if_dead(cls, udid: str):
# obj = cls._tasks.get(udid)
# if obj:
# th = obj.get("thread")
# if th and not th.is_alive():
# cls._tasks.pop(udid, None)
# LogManager.method_info(f"检测到 [{udid}] 线程已结束,自动清理。", "task")
#
# @classmethod
# def register_child_pid(cls, udid: str, pid: int):
# """业务里如果起了 adb/scrcpy/ffmpeg 等外部进程,请在启动后调用这个登记,便于 stop 时一起杀掉。"""
# with cls._lock:
# obj = cls._tasks.get(udid)
# if not obj:
# return
# pids: set = obj.setdefault("child_pids", set())
# pids.add(int(pid))
# LogManager.method_info(f"[{udid}] 记录子进程 PID={pid}", "task")
#
# @classmethod
# def _kill_child_pids(cls, udid: str, child_pids: Optional[set]):
# if not child_pids:
# return
# for pid in list(child_pids):
# try:
# if psutil:
# if psutil.pid_exists(pid):
# proc = psutil.Process(pid)
# # 先温柔 terminate再等 0.5 秒,仍活则 kill并级联子进程
# for c in proc.children(recursive=True):
# try:
# c.terminate()
# except Exception:
# pass
# proc.terminate()
# gone, alive = psutil.wait_procs([proc], timeout=0.5)
# for a in alive:
# try:
# a.kill()
# except Exception:
# pass
# else:
# if os.name == "nt":
# os.system(f"taskkill /PID {pid} /T /F >NUL 2>&1")
# else:
# try:
# os.kill(pid, signal.SIGTERM)
# time.sleep(0.2)
# os.kill(pid, signal.SIGKILL)
# except Exception:
# pass
# LogManager.method_info(f"[{udid}] 已尝试结束子进程 PID={pid}", "task")
# except Exception as e:
# LogManager.method_error(f"[{udid}] 结束子进程 {pid} 异常: {e}", "task")
#
# @classmethod
# def add(cls, udid: str, thread: threading.Thread, event: threading.Event, force: bool = False) -> Tuple[int, str]:
# with cls._lock:
# cls._cleanup_if_dead(udid)
# old = cls._tasks.get(udid)
# if old and old["thread"].is_alive():
# if not force:
# return 1001, "当前设备已存在任务"
# LogManager.method_info(f"[{udid}] 检测到旧任务,尝试强制停止", "task")
# cls._force_stop_locked(udid)
#
# # 强制守护线程,防止进程被挂死
# try:
# thread.daemon = True
# except Exception:
# pass
#
# try:
# thread.start()
# # 等 ident 初始化
# for _ in range(20):
# if thread.ident:
# break
# time.sleep(0.02)
#
# cls._tasks[udid] = {
# "id": thread.ident,
# "thread": thread,
# "event": event,
# "start_time": time.time(),
# "state": "running",
# "child_pids": set(),
# }
# LogManager.method_info(f"创建任务成功 [{udid}]线程ID={thread.ident}", "task")
# return 200, "创建成功"
# except Exception as e:
# LogManager.method_error(f"线程启动失败: {e}", "task")
# return 1002, f"线程启动失败: {e}"
#
# @classmethod
# def stop(cls, udid: str, stop_timeout: float = 5.0, kill_timeout: float = 2.0) -> Tuple[int, str]:
# with cls._lock:
# obj = cls._tasks.get(udid)
# if not obj:
# return 200, "任务不存在"
#
# thread = obj["thread"]
# event = obj["event"]
# tid = obj["id"]
# child_pids = set(obj.get("child_pids") or [])
#
# LogManager.method_info(f"请求停止 [{udid}] 线程ID={tid}", "task")
#
# if not thread.is_alive():
# cls._tasks.pop(udid, None)
# return 200, "已结束"
#
# obj["state"] = "stopping"
#
# # 先把 event 打开,给协作退出的机会
# try:
# event.set()
# except Exception as e:
# LogManager.method_error(f"[{udid}] 设置停止事件失败: {e}", "task")
#
# def _wait_stop():
# # 高频窗口 1s很多 I/O 点会在这个窗口立刻感知到)
# t0 = time.time()
# while time.time() - t0 < 1.0 and thread.is_alive():
# time.sleep(0.05)
#
# # 子进程先收拾(避免后台外部程序继续卡死)
# cls._kill_child_pids(udid, child_pids)
#
# # 正常 join 窗口
# if thread.is_alive():
# thread.join(timeout=stop_timeout)
#
# # 仍活着 → 多次注入 SystemExit
# if thread.is_alive():
# LogManager.method_info(f"[{udid}] 协作超时 -> 尝试强杀注入", "task")
# for i in range(6):
# ok = _async_raise(tid, SystemExit)
# # 给解释器一些调度时间
# time.sleep(0.06)
# if not thread.is_alive():
# break
#
# # 最后等待 kill_timeout
# if thread.is_alive():
# thread.join(timeout=kill_timeout)
#
# with cls._lock:
# if not thread.is_alive():
# LogManager.method_info(f"[{udid}] 停止成功", "task")
# cls._tasks.pop(udid, None)
# else:
# # 彻底杀不掉:标记 zombie、释放占位
# LogManager.method_error(f"[{udid}] 停止失败(线程卡死),标记为 zombie释放占位", "task")
# obj = cls._tasks.get(udid)
# if obj:
# obj["state"] = "zombie"
# cls._tasks.pop(udid, None)
#
# threading.Thread(target=_wait_stop, daemon=True).start()
# return 200, "停止请求已提交"
#
# @classmethod
# def _force_stop_locked(cls, udid: str):
# """持锁情况下的暴力停止(用于 add(force=True) 覆盖旧任务)"""
# obj = cls._tasks.get(udid)
# if not obj:
# return
# th = obj["thread"]
# tid = obj["id"]
# event = obj["event"]
# child_pids = set(obj.get("child_pids") or [])
# try:
# try:
# event.set()
# except Exception:
# pass
# cls._kill_child_pids(udid, child_pids)
# th.join(timeout=1.5)
# if th.is_alive():
# for _ in range(6):
# _async_raise(tid, SystemExit)
# time.sleep(0.05)
# if not th.is_alive():
# break
# th.join(timeout=0.8)
# except Exception as e:
# LogManager.method_error(f"[{udid}] 强制停止失败: {e}", "task")
# finally:
# cls._tasks.pop(udid, None)
#
# @classmethod
# def status(cls, udid: str) -> Dict:
# with cls._lock:
# obj = cls._tasks.get(udid)
# if not obj:
# return {"exists": False}
# o = {
# "exists": True,
# "state": obj.get("state"),
# "start_time": obj.get("start_time"),
# "thread_id": obj.get("id"),
# "alive": obj["thread"].is_alive(),
# "child_pids": list(obj.get("child_pids") or []),
# }
# return o
#
# # @classmethod
# # def batch_stop(cls, ids: List[str]) -> Tuple[int, str]:
# # failed = []
# # with ThreadPoolExecutor(max_workers=4) as executor:
# # futures = {executor.submit(cls.stop, udid): udid for udid in ids}
# # for future in as_completed(futures):
# # udid = futures[future]
# # try:
# # code, msg = future.result()
# # except Exception as e:
# # LogManager.method_error(f"[{udid}] stop 调用异常: {e}", "task")
# # failed.append(udid)
# # continue
# # if code != 200:
# # failed.append(udid)
# # if failed:
# # return 207, f"部分任务停止失败: {failed}"
# # return 200, "全部停止请求已提交"
#
# @classmethod
# def batch_stop(cls, ids: List[str]) -> Tuple[int, str]:
# failed = []
# results = []
#
# with ThreadPoolExecutor(max_workers=4) as executor:
# futures = {executor.submit(cls.stop, udid): udid for udid in ids}
# for future in as_completed(futures):
# udid = futures[future]
# try:
# code, msg = future.result()
# results.append((udid, code, msg))
# except Exception as e:
# LogManager.method_error(f"[{udid}] stop 调用异常: {e}", "task")
# failed.append(udid)
# continue
# if code != 200:
# failed.append(udid)
#
# # 等待所有线程完全停止
# for udid, code, msg in results:
# if code == 200:
# obj = cls._tasks.get(udid)
# if obj:
# thread = obj["thread"]
# while thread.is_alive():
# time.sleep(0.1)
#
# if failed:
# return 207, f"部分任务停止失败: {failed}"
# return 200, "全部任务已成功停止"
2025-09-22 19:10:58 +08:00
import ctypes
2025-09-17 22:23:57 +08:00
import threading
2025-10-24 22:04:28 +08:00
import time
2025-10-29 16:56:34 +08:00
import os
import signal
2025-10-25 01:18:41 +08:00
from concurrent.futures import ThreadPoolExecutor, as_completed
2025-10-29 16:56:34 +08:00
from typing import Dict, Tuple, List, Optional
2025-09-22 19:10:58 +08:00
2025-11-07 18:32:42 +08:00
# 假设 LogManager 存在
class MockLogManager:
@staticmethod
def method_error(msg, category):
print(f"[ERROR:{category}] {msg}")
@staticmethod
def method_info(msg, category):
print(f"[INFO:{category}] {msg}")
LogManager = MockLogManager
# from Utils.LogManager import LogManager # 恢复实际导入
2025-08-06 22:11:33 +08:00
2025-10-29 16:56:34 +08:00
try:
import psutil # 可选:用来级联杀子进程
except Exception:
psutil = None
2025-08-06 22:11:33 +08:00
2025-10-29 16:56:34 +08:00
def _async_raise(tid: int, exc_type=SystemExit) -> bool:
2025-11-07 18:32:42 +08:00
"""
向指定线程异步注入异常
注意此方法在线程阻塞于C/OS调用如I/O等待时可能无效或延迟
"""
2025-10-25 01:18:41 +08:00
if not tid:
2025-10-29 16:56:34 +08:00
LogManager.method_error("强杀失败: 线程ID为空", "task")
2025-10-25 01:18:41 +08:00
return False
2025-10-24 22:04:28 +08:00
try:
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(
ctypes.c_long(tid), ctypes.py_object(exc_type)
)
if res == 0:
2025-11-07 18:32:42 +08:00
# 线程可能已经退出
2025-10-24 22:04:28 +08:00
return False
elif res > 1:
2025-11-07 18:32:42 +08:00
# 命中多个线程,非常罕见,回滚以防误杀
2025-10-24 22:04:28 +08:00
ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), 0)
LogManager.method_info(f"线程 {tid} 命中多个线程,已回滚", "task")
return False
return True
except Exception as e:
LogManager.method_error(f"强杀线程失败: {e}", "task")
return False
2025-09-22 19:10:58 +08:00
2025-09-17 22:23:57 +08:00
class ThreadManager:
2025-10-29 16:56:34 +08:00
"""
2025-11-07 18:32:42 +08:00
线程管理类支持协作停止强制注入SystemExit和级联杀死外部子进程
注意stop 方法已改为同步阻塞直到线程真正停止或被标记为zombie
2025-10-29 16:56:34 +08:00
"""
2025-09-18 20:09:52 +08:00
_tasks: Dict[str, Dict] = {}
2025-10-24 22:04:28 +08:00
_lock = threading.RLock()
@classmethod
def _cleanup_if_dead(cls, udid: str):
obj = cls._tasks.get(udid)
2025-10-29 16:56:34 +08:00
if obj:
th = obj.get("thread")
if th and not th.is_alive():
cls._tasks.pop(udid, None)
LogManager.method_info(f"检测到 [{udid}] 线程已结束,自动清理。", "task")
@classmethod
def register_child_pid(cls, udid: str, pid: int):
2025-11-07 18:32:42 +08:00
"""登记外部子进程 PID便于 stop 时一起杀掉。"""
2025-10-29 16:56:34 +08:00
with cls._lock:
obj = cls._tasks.get(udid)
if not obj:
return
pids: set = obj.setdefault("child_pids", set())
2025-11-07 18:32:42 +08:00
# 确保 pid 是 int 类型
2025-10-29 16:56:34 +08:00
pids.add(int(pid))
LogManager.method_info(f"[{udid}] 记录子进程 PID={pid}", "task")
@classmethod
def _kill_child_pids(cls, udid: str, child_pids: Optional[set]):
2025-11-07 18:32:42 +08:00
"""终止所有已登记的外部子进程及其子进程(重要:用于解决 I/O 阻塞)。"""
2025-10-29 16:56:34 +08:00
if not child_pids:
return
2025-11-07 18:32:42 +08:00
# 创建一个副本,防止迭代过程中集合被修改
2025-10-29 16:56:34 +08:00
for pid in list(child_pids):
try:
if psutil:
if psutil.pid_exists(pid):
proc = psutil.Process(pid)
2025-11-07 18:32:42 +08:00
# 级联终止所有后代进程
2025-10-29 16:56:34 +08:00
for c in proc.children(recursive=True):
try:
c.terminate()
except Exception:
pass
2025-11-07 18:32:42 +08:00
# 先温柔 terminate
2025-10-29 16:56:34 +08:00
proc.terminate()
gone, alive = psutil.wait_procs([proc], timeout=0.5)
2025-11-07 18:32:42 +08:00
# 仍活则 kill
2025-10-29 16:56:34 +08:00
for a in alive:
try:
a.kill()
except Exception:
pass
else:
2025-11-07 18:32:42 +08:00
# 无 psutil 时的系统命令兜底
2025-10-29 16:56:34 +08:00
if os.name == "nt":
2025-11-07 18:32:42 +08:00
# /T 级联杀死子进程 /F 强制 /NUL 隐藏输出
2025-10-29 16:56:34 +08:00
os.system(f"taskkill /PID {pid} /T /F >NUL 2>&1")
else:
try:
os.kill(pid, signal.SIGTERM)
time.sleep(0.2)
os.kill(pid, signal.SIGKILL)
except Exception:
pass
LogManager.method_info(f"[{udid}] 已尝试结束子进程 PID={pid}", "task")
except Exception as e:
LogManager.method_error(f"[{udid}] 结束子进程 {pid} 异常: {e}", "task")
2025-08-06 22:11:33 +08:00
2025-11-07 18:32:42 +08:00
# 在同步停止模式下,这里不主动清理 set。
2025-08-06 22:11:33 +08:00
@classmethod
2025-10-24 22:04:28 +08:00
def add(cls, udid: str, thread: threading.Thread, event: threading.Event, force: bool = False) -> Tuple[int, str]:
2025-09-18 20:09:52 +08:00
with cls._lock:
2025-10-24 22:04:28 +08:00
cls._cleanup_if_dead(udid)
old = cls._tasks.get(udid)
if old and old["thread"].is_alive():
if not force:
return 1001, "当前设备已存在任务"
LogManager.method_info(f"[{udid}] 检测到旧任务,尝试强制停止", "task")
cls._force_stop_locked(udid)
2025-10-29 16:56:34 +08:00
# 强制守护线程,防止进程被挂死
try:
thread.daemon = True
except Exception:
pass
2025-10-24 22:04:28 +08:00
try:
thread.start()
2025-10-29 16:56:34 +08:00
# 等 ident 初始化
for _ in range(20):
2025-10-25 01:18:41 +08:00
if thread.ident:
break
2025-10-29 16:56:34 +08:00
time.sleep(0.02)
2025-10-25 01:18:41 +08:00
2025-11-07 18:32:42 +08:00
# 获取线程 ID
tid = thread.ident
2025-10-24 22:04:28 +08:00
cls._tasks[udid] = {
2025-11-07 18:32:42 +08:00
"id": tid,
2025-10-24 22:04:28 +08:00
"thread": thread,
"event": event,
"start_time": time.time(),
2025-10-29 16:56:34 +08:00
"state": "running",
"child_pids": set(),
2025-10-24 22:04:28 +08:00
}
2025-11-07 18:32:42 +08:00
LogManager.method_info(f"创建任务成功 [{udid}]线程ID={tid}", "task")
2025-10-24 22:04:28 +08:00
return 200, "创建成功"
except Exception as e:
LogManager.method_error(f"线程启动失败: {e}", "task")
return 1002, f"线程启动失败: {e}"
2025-08-06 22:11:33 +08:00
@classmethod
2025-10-24 22:04:28 +08:00
def stop(cls, udid: str, stop_timeout: float = 5.0, kill_timeout: float = 2.0) -> Tuple[int, str]:
2025-11-07 18:32:42 +08:00
"""同步阻塞请求停止任务直到线程真正停止或被标记为zombie。"""
# 1. 初始检查、状态设置和事件触发 (需要锁)
2025-10-24 22:04:28 +08:00
with cls._lock:
obj = cls._tasks.get(udid)
if not obj:
return 200, "任务不存在"
thread = obj["thread"]
event = obj["event"]
tid = obj["id"]
2025-11-07 18:32:42 +08:00
# 拷贝 child_pids以便在释放锁后使用
2025-10-29 16:56:34 +08:00
child_pids = set(obj.get("child_pids") or [])
2025-10-24 22:04:28 +08:00
LogManager.method_info(f"请求停止 [{udid}] 线程ID={tid}", "task")
if not thread.is_alive():
2025-09-22 19:10:58 +08:00
cls._tasks.pop(udid, None)
2025-10-24 22:04:28 +08:00
return 200, "已结束"
2025-10-29 16:56:34 +08:00
obj["state"] = "stopping"
# 先把 event 打开,给协作退出的机会
2025-10-24 22:04:28 +08:00
try:
event.set()
except Exception as e:
LogManager.method_error(f"[{udid}] 设置停止事件失败: {e}", "task")
2025-11-07 18:32:42 +08:00
# 锁已释放。以下执行耗时的阻塞操作。
# ----------------- 阻塞停止逻辑开始 -----------------
2025-10-27 21:44:16 +08:00
2025-11-07 18:32:42 +08:00
# 2. 预等待窗口 1s
t0 = time.time()
while time.time() - t0 < 1.0 and thread.is_alive():
time.sleep(0.05)
2025-10-29 16:56:34 +08:00
2025-11-07 18:32:42 +08:00
# 3. 子进程先收拾 (优先解决 I/O 阻塞)
cls._kill_child_pids(udid, child_pids)
2025-10-24 22:04:28 +08:00
2025-11-07 18:32:42 +08:00
# 4. 正常 join 窗口
if thread.is_alive():
thread.join(timeout=stop_timeout)
# 5. 仍活着 → 多次注入 SystemExit
if thread.is_alive():
LogManager.method_info(f"[{udid}] 协作超时 -> 尝试强杀注入", "task")
for i in range(6):
# 确保 tid 存在
if tid:
_async_raise(tid, SystemExit)
time.sleep(0.06)
if not thread.is_alive():
break
# 6. 最后等待 kill_timeout
2025-10-29 16:56:34 +08:00
if thread.is_alive():
2025-11-07 18:32:42 +08:00
thread.join(timeout=kill_timeout)
2025-10-29 16:56:34 +08:00
2025-11-07 18:32:42 +08:00
# ----------------- 阻塞停止逻辑结束 -----------------
# 7. 清理和返回结果 (需要重新加锁)
final_result_code: int = 500
final_result_msg: str = "停止失败(线程卡死)"
2025-10-25 01:18:41 +08:00
2025-11-07 18:32:42 +08:00
with cls._lock:
if not thread.is_alive():
LogManager.method_info(f"[{udid}] 停止成功", "task")
cls._tasks.pop(udid, None)
final_result_code = 200
final_result_msg = "停止成功"
else:
# 彻底杀不掉:标记 zombie、释放占位
LogManager.method_error(f"[{udid}] 停止失败(线程卡死),标记为 zombie释放占位", "task")
obj = cls._tasks.get(udid)
if obj:
obj["state"] = "zombie"
# 即使卡死,也移除任务记录,防止后续操作被阻塞
2025-10-29 16:56:34 +08:00
cls._tasks.pop(udid, None)
2025-11-07 18:32:42 +08:00
final_result_code = 500
final_result_msg = "停止失败(线程卡死)"
2025-10-24 22:04:28 +08:00
2025-11-07 18:32:42 +08:00
return final_result_code, final_result_msg
2025-09-17 22:23:57 +08:00
2025-09-19 19:39:32 +08:00
@classmethod
2025-10-24 22:04:28 +08:00
def _force_stop_locked(cls, udid: str):
2025-10-29 16:56:34 +08:00
"""持锁情况下的暴力停止(用于 add(force=True) 覆盖旧任务)"""
2025-10-24 22:04:28 +08:00
obj = cls._tasks.get(udid)
if not obj:
return
2025-10-29 16:56:34 +08:00
th = obj["thread"]
tid = obj["id"]
event = obj["event"]
child_pids = set(obj.get("child_pids") or [])
2025-09-22 19:10:58 +08:00
try:
2025-10-29 16:56:34 +08:00
try:
event.set()
except Exception:
pass
cls._kill_child_pids(udid, child_pids)
th.join(timeout=1.5)
if th.is_alive():
for _ in range(6):
2025-11-07 18:32:42 +08:00
if tid:
_async_raise(tid, SystemExit)
2025-10-29 16:56:34 +08:00
time.sleep(0.05)
if not th.is_alive():
break
th.join(timeout=0.8)
2025-09-22 19:10:58 +08:00
except Exception as e:
2025-10-24 22:04:28 +08:00
LogManager.method_error(f"[{udid}] 强制停止失败: {e}", "task")
finally:
cls._tasks.pop(udid, None)
2025-09-19 19:39:32 +08:00
2025-10-29 16:56:34 +08:00
@classmethod
def status(cls, udid: str) -> Dict:
with cls._lock:
obj = cls._tasks.get(udid)
if not obj:
return {"exists": False}
o = {
"exists": True,
"state": obj.get("state"),
"start_time": obj.get("start_time"),
"thread_id": obj.get("id"),
"alive": obj["thread"].is_alive(),
"child_pids": list(obj.get("child_pids") or []),
}
return o
2025-09-18 20:09:52 +08:00
@classmethod
2025-10-24 22:04:28 +08:00
def batch_stop(cls, ids: List[str]) -> Tuple[int, str]:
2025-11-07 18:32:42 +08:00
"""
批量停止任务由于 stop 方法现在是同步阻塞的此方法将等待所有线程完全停止后返回
"""
2025-10-24 22:04:28 +08:00
failed = []
2025-11-07 18:32:42 +08:00
# 1. 并发发出所有停止请求 (现在是并发执行阻塞停止)
2025-10-25 01:18:41 +08:00
with ThreadPoolExecutor(max_workers=4) as executor:
futures = {executor.submit(cls.stop, udid): udid for udid in ids}
for future in as_completed(futures):
2025-10-27 21:44:16 +08:00
udid = futures[future]
try:
code, msg = future.result()
2025-11-07 18:32:42 +08:00
# 检查是否成功停止(状态码 200
if code != 200:
failed.append(f"{udid} ({msg})")
2025-10-27 21:44:16 +08:00
except Exception as e:
LogManager.method_error(f"[{udid}] stop 调用异常: {e}", "task")
2025-11-07 18:32:42 +08:00
failed.append(f"{udid} (异常)")
2025-11-07 18:32:42 +08:00
# 2. 返回结果
2025-10-24 22:04:28 +08:00
if failed:
2025-11-07 18:32:42 +08:00
# 返回 207 表示部分失败或全部失败
return 207, f"部分任务停止失败: {', '.join(failed)}"
# 返回 200 表示所有任务都已成功停止(因为 stop 方法是同步阻塞的)
return 200, "全部任务已成功停止"