Files
iOSAI/Flask/FlaskService.py

176 lines
5.4 KiB
Python
Raw Normal View History

2025-08-01 13:43:51 +08:00
import json
import os
import socket
import threading
from queue import Queue
import tidevice
import wda
from flask import Flask, request
from flask_cors import CORS
from Entity.ResultData import ResultData
from script.ScriptManager import ScriptManager
app = Flask(__name__)
CORS(app)
listData = []
dataQueue = Queue()
def start_socket_listener():
port = int(os.getenv('FLASK_COMM_PORT', 0))
print(f"Received port from environment: {port}")
if port <= 0:
print("⚠️ 未获取到通信端口跳过Socket监听")
return
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
# 设置端口复用,避免端口被占用时无法绑定
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# 尝试绑定端口
try:
s.bind(('127.0.0.1', port))
print(f"[INFO] Socket successfully bound to port {port}")
except Exception as bind_error:
print(f"[ERROR] ❌ 端口绑定失败: {bind_error}")
return
# 开始监听
s.listen()
print(f"[INFO] Socket listener started on port {port}, waiting for connections...")
while True:
try:
print(f"[INFO] Waiting for a new connection on port {port}...")
conn, addr = s.accept()
print(f"[INFO] Connection accepted from: {addr}")
raw_data = conn.recv(1024).decode('utf-8').strip()
print(f"[INFO] Raw data received: {raw_data}")
data = json.loads(raw_data)
print(f"[INFO] Parsed data: {data}")
dataQueue.put(data)
except Exception as conn_error:
print(f"[ERROR] ❌ 连接处理失败: {conn_error}")
except Exception as e:
print(f"[ERROR] ❌ Socket服务启动失败: {e}")
# 在独立线程中启动Socket服务
listener_thread = threading.Thread(target=start_socket_listener, daemon=True)
listener_thread.start()
# 获取设备列表
@app.route('/deviceList', methods=['GET'])
def deviceList():
while not dataQueue.empty():
obj = dataQueue.get()
type = obj["type"]
if type == 1:
listData.append(obj)
else:
for data in listData:
if data.get("deviceId") == obj.get("deviceId") and data.get("screenPort") == obj.get("screenPort"):
listData.remove(data)
return ResultData(data=listData).toJson()
# 获取设备应用列表
@app.route('/deviceAppList', methods=['POST'])
def deviceAppList():
param = request.get_json()
udid = param["udid"]
t = tidevice.Device(udid)
# 获取已安装的应用列表
apps = []
for app in t.installation.iter_installed():
apps.append({
"name": app.get("CFBundleDisplayName", "Unknown"),
"bundleId": app.get("CFBundleIdentifier", "Unknown"),
"version": app.get("CFBundleShortVersionString", "Unknown"),
"path": app.get("Path", "Unknown")
})
# 筛选非系统级应用(过滤掉以 com.apple 开头的系统应用)
non_system_apps = [app for app in apps if not app["bundleId"].startswith("com.apple")]
return ResultData(data=non_system_apps).toJson()
# 打开置顶app
@app.route('/launchApp', methods=['POST'])
def launchApp():
body = request.get_json()
udid = body.get("udid")
bundleId = body.get("bundleId")
t = tidevice.Device(udid)
t.app_start(bundleId)
return ResultData(data="").toJson()
# 回到首页
@app.route('/toHome', methods=['POST'])
def toHome():
body = request.get_json()
udid = body.get("udid")
client = wda.USBClient(udid)
client.home()
return ResultData(data="").toJson()
# 点击事件
@app.route('/tapAction', methods=['POST'])
def tapAction():
body = request.get_json()
udid = body.get("udid")
x = body.get("x")
y = body.get("y")
client = wda.USBClient(udid)
session = client.session()
session.appium_settings({"snapshotMaxDepth": 0})
session.tap(x, y)
return ResultData(data="").toJson()
# 拖拽事件
@app.route('/swipeAction', methods=['POST'])
def swipeAction():
body = request.get_json()
udid = body.get("udid")
direction = body.get("direction")
client = wda.USBClient(udid)
session = client.session()
session.appium_settings({"snapshotMaxDepth": 0})
if direction == 1:
session.swipe_up()
elif direction == 2:
session.swipe_left()
elif direction == 3:
session.swipe_down()
else:
session.swipe_right()
return ResultData(data="").toJson()
# 长按事件
@app.route('/longPressAction', methods=['POST'])
def longPressAction():
body = request.get_json()
udid = body.get("udid")
x = body.get("x")
y = body.get("y")
client = wda.USBClient(udid)
session = client.session()
session.appium_settings({"snapshotMaxDepth": 5})
session.tap_hold(x,y,1.0)
return ResultData(data="").toJson()
# 养号
@app.route('/growAccount', methods=['POST'])
def growAccount():
body = request.get_json()
udid = body.get("udid")
# 启动脚本
threading.Thread(target=ScriptManager.growAccount, args=(udid,)).start()
return ResultData(data="").toJson()
if __name__ == '__main__':
app.run("0.0.0.0", port=5000, debug=True, use_reloader=False)