Files
iOSAI/Module/DeviceInfo.py

112 lines
3.8 KiB
Python
Raw Normal View History

2025-08-01 13:43:51 +08:00
import subprocess
import threading
import time
import wda
from tidevice import Usbmux
from Entity.DeviceModel import DeviceModel
2025-08-14 15:51:17 +08:00
from Entity.Variables import WdaAppBundleId
2025-08-01 13:43:51 +08:00
from Module.FlaskSubprocessManager import FlaskSubprocessManager
from Utils.LogManager import LogManager
2025-08-01 13:43:51 +08:00
threadLock = threading.Lock()
class Deviceinfo(object):
def __init__(self):
self.deviceIndex = 0
# 投屏端口
self.screenProxy = 9110
# 存放pid的数组
self.pidList = []
# 设备列表
self.deviceArray = []
# 获取到县城管理类
self.manager = FlaskSubprocessManager.get_instance()
# 给前端的设备模型数组
self.deviceModelList = []
# 监听设备连接
def startDeviceListener(self):
while True:
lists = Usbmux().device_list()
# 添加设备逻辑
for device in lists:
if device not in self.deviceArray:
self.screenProxy += 1
self.connectDevice(device.udid)
self.deviceArray.append(device)
# 处理拔出设备的逻辑
def removeDevice():
set1 = set(self.deviceArray)
set2 = set(lists)
difference = set1 - set2
differenceList = list(difference)
for i in differenceList:
for j in self.deviceArray:
# 判断是否为差异设备
if i.udid == j.udid:
# 从设备模型中删除数据
for a in self.deviceModelList:
if i.udid == a.deviceId:
a.type = 2
# 发送数据
self.manager.send(a.toDict())
self.deviceModelList.remove(a)
for k in self.pidList:
# 干掉端口短发进程
if j.udid == k["id"]:
target = k["target"]
target.kill()
self.pidList.remove(k)
# 删除已经拔出的设备
self.deviceArray.remove(j)
removeDevice()
time.sleep(1)
# 连接设备
def connectDevice(self, identifier):
try:
d = wda.USBClient(identifier, 8100)
LogManager.info("启动wda成功", identifier)
2025-08-13 20:20:13 +08:00
size = d.window_size()
width = size.width
height = size.height
scale = d.scale
# 创建模型
model = DeviceModel(identifier, self.screenProxy, width, height, scale, type=1)
self.deviceModelList.append(model)
# 发送数据
self.manager.send(model.toDict())
except Exception as e:
LogManager.error("启动wda失败。请检查wda是否正常", identifier)
return
2025-08-01 13:43:51 +08:00
d.app_start(WdaAppBundleId)
2025-08-05 15:41:20 +08:00
d.home()
2025-08-06 22:11:33 +08:00
time.sleep(2)
2025-08-13 20:20:13 +08:00
target = self.relayDeviceScreenPort(identifier)
2025-08-01 13:43:51 +08:00
self.pidList.append({
"target": target,
"id": identifier
})
2025-08-01 13:43:51 +08:00
# 转发设备端口
2025-08-13 20:20:13 +08:00
def relayDeviceScreenPort(self, udid):
2025-08-01 13:43:51 +08:00
try:
2025-08-13 20:20:13 +08:00
command = f"iproxy.exe -u {udid} {self.screenProxy} 9100"
2025-08-01 13:43:51 +08:00
# 创建一个没有窗口的进程
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
startupinfo.wShowWindow = 0
r = subprocess.Popen(command, shell=True, startupinfo=startupinfo)
return r
except Exception as e:
print(e)
return 0