Files
iOSAI/Utils/ThreadManager.py

226 lines
7.4 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
2025-11-07 18:32:42 +08:00
2025-09-17 22:23:57 +08:00
import threading
import ctypes
import inspect
2025-10-24 22:04:28 +08:00
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Dict, Optional, List, Tuple, Any
2025-11-07 18:32:42 +08:00
from Utils.LogManager import LogManager
2025-11-07 18:32:42 +08:00
2025-08-06 22:11:33 +08:00
def _raise_async_exception(tid: int, exc_type) -> int:
if not inspect.isclass(exc_type):
raise TypeError("exc_type must be a class")
return ctypes.pythonapi.PyThreadState_SetAsyncExc(
ctypes.c_long(tid), ctypes.py_object(exc_type)
)
2025-08-06 22:11:33 +08:00
2025-10-29 16:56:34 +08:00
def _kill_thread_by_tid(tid: Optional[int]) -> bool:
if tid is None:
2025-10-25 01:18:41 +08:00
return False
res = _raise_async_exception(tid, SystemExit)
if res == 0:
2025-10-24 22:04:28 +08:00
return False
if res > 1:
_raise_async_exception(tid, None)
raise SystemError("PyThreadState_SetAsyncExc affected multiple threads; reverted.")
return True
2025-10-24 22:04:28 +08:00
2025-09-22 19:10:58 +08:00
2025-09-17 22:23:57 +08:00
class ThreadManager:
2025-10-29 16:56:34 +08:00
"""
- add(udid, thread_or_target, *args, **kwargs) -> (code, msg)
- stop(udid, join_timeout=2.0, retries=5, wait_step=0.2) -> (code, msg) # 强杀
- batch_stop(udids, join_timeout_each=2.0, retries_each=5, wait_step_each=0.2) -> (code, msg)
- get_thread / get_tid / is_running / list_udids
2025-10-29 16:56:34 +08:00
"""
_threads: Dict[str, threading.Thread] = {}
2025-10-24 22:04:28 +08:00
_lock = threading.RLock()
# ========== 基础 ==========
2025-10-24 22:04:28 +08:00
@classmethod
def add(cls, udid: str, thread_or_target: Any, *args, **kwargs) -> Tuple[int, str]:
"""
兼容两种用法
1) add(udid, t) # t 是 threading.Thread 实例
2) add(udid, target, *args, **kwargs) # target 是可调用
返回(200, "创建任务成功") / (1001, "任务已存在") / (1001, "创建任务失败")
"""
with cls._lock:
exist = cls._threads.get(udid)
if exist and exist.is_alive():
return 1001, "任务已存在"
if isinstance(thread_or_target, threading.Thread):
t = thread_or_target
try:
t.daemon = True
except Exception:
pass
if not t.name:
t.name = f"task-{udid}"
# 包装 run退出时从表移除
orig_run = t.run
def run_wrapper():
try:
orig_run()
finally:
with cls._lock:
if cls._threads.get(udid) is t:
cls._threads.pop(udid, None)
t.run = run_wrapper # type: ignore
else:
target = thread_or_target
def _wrapper():
try:
target(*args, **kwargs)
finally:
with cls._lock:
cur = cls._threads.get(udid)
if cur is threading.current_thread():
cls._threads.pop(udid, None)
t = threading.Thread(target=_wrapper, daemon=True, name=f"task-{udid}")
try:
t.start()
except Exception:
return 1001, "创建任务失败"
cls._threads[udid] = t
# 保留你原有的创建成功日志
try:
LogManager.method_info(f"创建任务成功 [{udid}]线程ID={t.ident}", "task")
except Exception:
pass
return 200, "创建任务成功"
2025-10-29 16:56:34 +08:00
@classmethod
def get_thread(cls, udid: str) -> Optional[threading.Thread]:
2025-10-29 16:56:34 +08:00
with cls._lock:
return cls._threads.get(udid)
2025-10-29 16:56:34 +08:00
@classmethod
def get_tid(cls, udid: str) -> Optional[int]:
t = cls.get_thread(udid)
return t.ident if t else None
@classmethod
def is_running(cls, udid: str) -> bool:
t = cls.get_thread(udid)
return bool(t and t.is_alive())
2025-11-07 18:32:42 +08:00
2025-08-06 22:11:33 +08:00
@classmethod
def list_udids(cls) -> List[str]:
2025-09-18 20:09:52 +08:00
with cls._lock:
return list(cls._threads.keys())
2025-10-29 16:56:34 +08:00
# ========== 内部:强杀一次 ==========
2025-08-06 22:11:33 +08:00
@classmethod
def _stop_once(cls, udid: str, join_timeout: float, retries: int, wait_step: float) -> bool:
"""
对指定 udid 执行一次强杀流程返回 True=已停止/不存在False=仍存活或被拒
"""
2025-10-24 22:04:28 +08:00
with cls._lock:
t = cls._threads.get(udid)
2025-10-24 22:04:28 +08:00
if not t:
return True # 视为已停止
2025-10-24 22:04:28 +08:00
main_tid = threading.main_thread().ident
cur_tid = threading.get_ident()
if t.ident in (main_tid, cur_tid):
return False
2025-10-24 22:04:28 +08:00
try:
_kill_thread_by_tid(t.ident)
except Exception:
pass
2025-10-24 22:04:28 +08:00
if join_timeout < 0:
join_timeout = 0.0
t.join(join_timeout)
2025-10-29 16:56:34 +08:00
while t.is_alive() and retries > 0:
evt = threading.Event()
evt.wait(wait_step)
retries -= 1
2025-10-25 01:18:41 +08:00
dead = not t.is_alive()
if dead:
with cls._lock:
if cls._threads.get(udid) is t:
cls._threads.pop(udid, None)
return dead
2025-09-17 22:23:57 +08:00
# ========== 对外stop / batch_stop均返回二元组 ==========
2025-09-19 19:39:32 +08:00
2025-10-29 16:56:34 +08:00
@classmethod
def stop(cls, udid: str, join_timeout: float = 2.0,
retries: int = 5, wait_step: float = 0.2) -> Tuple[int, str]:
"""
强杀单个返回 (200, "stopped") (1001, "failed")
"""
ok = cls._stop_once(udid, join_timeout, retries, wait_step)
if ok:
return 200, "stopped"
else:
return 1001, "failed"
2025-10-29 16:56:34 +08:00
2025-09-18 20:09:52 +08:00
@classmethod
def batch_stop(cls, udids: List[str]) -> Tuple[int, str, List[str]]:
2025-11-07 18:32:42 +08:00
"""
并行批量停止简化版
- 只接收 udids 参数
- 其他参数写死join_timeout=2.0, retries=5, wait_step=0.2
- 所有设备同时执行失败的重试 3 每轮间隔 1
- 返回:
(200, "停止任务成功", [])
(1001, "停止任务失败", [失败udid...])
2025-11-07 18:32:42 +08:00
"""
if not udids:
return 200, "停止任务成功", []
join_timeout = 2.0
retries = 5
wait_step = 0.2
retry_rounds = 3
round_interval = 1.0
def _stop_one(u: str) -> Tuple[str, bool]:
ok = cls._stop_once(u, join_timeout, retries, wait_step)
return u, ok
# === 第一轮:并行执行所有设备 ===
fail: List[str] = []
with ThreadPoolExecutor(max_workers=len(udids)) as pool:
futures = [pool.submit(_stop_one, u) for u in udids]
for f in as_completed(futures):
u, ok = f.result()
if not ok:
fail.append(u)
# === 对失败的设备重试 3 轮(每轮间隔 1 秒) ===
for _ in range(retry_rounds):
if not fail:
break
time.sleep(round_interval)
remain: List[str] = []
with ThreadPoolExecutor(max_workers=len(fail)) as pool:
futures = [pool.submit(_stop_one, u) for u in fail]
for f in as_completed(futures):
u, ok = f.result()
if not ok:
remain.append(u)
fail = remain
# === 返回结果 ===
if not fail:
return 200, "停止任务成功", []
else:
return 1001, "停止任务失败", fail