Files
iOSAI/Utils/JsonUtils.py

229 lines
7.5 KiB
Python
Raw Normal View History

2025-09-11 18:55:35 +08:00
import os
import json
from pathlib import Path
2025-09-11 21:14:57 +08:00
from Utils.LogManager import LogManager
2025-09-11 18:55:35 +08:00
class JsonUtils:
@staticmethod
def _normalize_filename(filename: str) -> str:
"""
确保文件名以 .json 结尾
"""
if not filename.endswith(".json"):
filename = f"{filename}.json"
return filename
@staticmethod
def _get_data_path(filename: str) -> str:
"""
根据文件名生成 data 目录下的完整路径
"""
filename = JsonUtils._normalize_filename(filename)
base_dir = os.path.abspath(os.path.dirname(os.path.dirname(__file__))) # 当前项目根目录
data_dir = os.path.join(base_dir, "data")
Path(data_dir).mkdir(parents=True, exist_ok=True) # 确保 data 目录存在
return os.path.join(data_dir, filename)
@staticmethod
def read_json(filename: str) -> dict:
"""
读取 JSON 文件返回字典
如果文件不存在返回空字典
"""
file_path = JsonUtils._get_data_path(filename)
try:
if not os.path.exists(file_path):
return {}
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f)
return data if isinstance(data, dict) else {}
except Exception as e:
print(f"读取 JSON 文件失败: {e}")
return {}
@staticmethod
def write_json(filename: str, data: dict, overwrite: bool = True) -> bool:
"""
将字典写入 JSON 文件
:param filename: 文件名不用写后缀自动补 .json
:param data: 要写入的字典
:param overwrite: True=覆盖写False=合并更新
"""
file_path = JsonUtils._get_data_path(filename)
try:
if not overwrite and os.path.exists(file_path):
with open(file_path, "r", encoding="utf-8") as f:
old_data = json.load(f)
if not isinstance(old_data, dict):
old_data = {}
old_data.update(data)
data = old_data
with open(file_path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=4)
return True
except Exception as e:
print(f"写入 JSON 文件失败: {e}")
return False
@staticmethod
def update_json(filename: str, new_data: dict) -> bool:
"""
修改 JSON 文件
- 如果 key 已存在则修改其值
- 如果 key 不存在则新增
"""
try:
data = JsonUtils.read_json(filename)
if not isinstance(data, dict):
data = {}
data.update(new_data)
return JsonUtils.write_json(filename, data)
except Exception as e:
print(f"更新 JSON 文件失败: {e}")
return False
@staticmethod
def delete_json_key(filename: str, key: str) -> bool:
"""
删除 JSON 文件中的某个 key
"""
try:
data = JsonUtils.read_json(filename)
if not isinstance(data, dict):
data = {}
if key in data:
del data[key]
return JsonUtils.write_json(filename, data)
except Exception as e:
print(f"删除 JSON key 失败: {e}")
return False
2025-09-11 21:14:57 +08:00
# "-------------------------------------------------"
@classmethod
def _read_json_list(cls, file_path: Path) -> list:
try:
if not file_path.exists():
return []
with file_path.open("r", encoding="utf-8") as f:
data = json.load(f)
return data if isinstance(data, list) else []
except Exception:
return []
@classmethod
def _write_json_list(cls, file_path: Path, data: list) -> None:
file_path.parent.mkdir(parents=True, exist_ok=True)
with file_path.open("w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=4)
# --- 新增:通用追加(不做字段校验) ---
@classmethod
def append_json_items(cls, items, filename="log/last_message.json"):
"""
dict [dict, ...] 追加到 JSON 文件数组不校验字段
"""
file_path = Path(filename)
data = cls._read_json_list(file_path)
# 统一成 list
if isinstance(items, dict):
items = [items]
elif not isinstance(items, list):
# 既不是 dict 也不是 list直接忽略
return
# 只接受字典项
items = [it for it in items if isinstance(it, dict)]
if not items:
return
data.extend(items)
LogManager.method_info(filename,"路径")
cls._write_json_list(file_path, data)
@classmethod
def update_json_items(cls, match: dict, patch: dict, filename="log/last_message.json", multi: bool = True) -> int:
"""
修改 JSON 文件数组中符合条件的项
:param match: 匹配条件 {"sender": "xxx"}
:param patch: 要修改/更新的字段 {"status": 1}
:param filename: JSON 文件路径
:param multi: True=修改所有匹配项False=只修改第一项
:return: 修改的条数
"""
file_path = Path(filename)
data = cls._read_json_list(file_path)
if not isinstance(match, dict) or not isinstance(patch, dict):
return 0
updated = 0
for idx, item in enumerate(data):
if not isinstance(item, dict):
continue
# 判断是否匹配
if all(item.get(k) == v for k, v in match.items()):
data[idx].update(patch)
updated += 1
if not multi:
break
if updated > 0:
cls._write_json_list(file_path, data)
return updated
@classmethod
def query_all_json_items(cls, filename="log/last_message.json") -> list:
"""
查询 JSON 文件数组中的所有项
:param filename: JSON 文件路径
:return: list可能为空
"""
file_path = Path(filename)
print(file_path)
data = cls._read_json_list(file_path)
return data if isinstance(data, list) else []
@classmethod
def delete_json_items(cls, match: dict, filename="log/last_message.json", multi: bool = True) -> int:
"""
删除 JSON 文件数组中符合条件的项
:param match: 匹配条件 {"sender": "xxx"}
:param filename: JSON 文件路径
:param multi: True=删除所有匹配项False=只删除第一项
:return: 删除的条数
"""
file_path = Path(filename)
data = cls._read_json_list(file_path)
if not isinstance(match, dict):
return 0
deleted = 0
new_data = []
for item in data:
if not isinstance(item, dict):
continue
# 是否匹配
if all(item.get(k) == v for k, v in match.items()):
deleted += 1
if not multi and deleted > 0:
# 只删除一条 → 剩下的全保留
new_data.extend(data[data.index(item) + 1:])
break
continue
new_data.append(item)
if deleted > 0:
cls._write_json_list(file_path, new_data)
return deleted