391 lines
13 KiB
Python
391 lines
13 KiB
Python
|
|
#!/usr/bin/env python3
|
|||
|
|
"""
|
|||
|
|
订单班图片整理主程序 v2.0
|
|||
|
|
支持针对不同订单班单独执行或检查
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
import os
|
|||
|
|
import sys
|
|||
|
|
import shutil
|
|||
|
|
from pathlib import Path
|
|||
|
|
from datetime import datetime
|
|||
|
|
from typing import List, Optional
|
|||
|
|
|
|||
|
|
# 设置路径
|
|||
|
|
BASE_PATH = Path("/Users/xiaoqi/Documents/Dev/Project/2025-09-08_n8nDEMO演示")
|
|||
|
|
DATA_PATH = BASE_PATH / "data/订单班文档资料"
|
|||
|
|
SCRIPT_PATH = BASE_PATH / "scripts"
|
|||
|
|
|
|||
|
|
# 预定义所有订单班
|
|||
|
|
ALL_ORDER_CLASSES = [
|
|||
|
|
"食品", "环保", "财经商贸", "视觉设计", "能源",
|
|||
|
|
"交通物流", "智能制造", "文旅", "化工", "大健康",
|
|||
|
|
"智能开发", "土木"
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
def print_banner():
|
|||
|
|
"""打印标题"""
|
|||
|
|
print("=" * 60)
|
|||
|
|
print(" 订单班图片资源整理工具 v2.0")
|
|||
|
|
print("=" * 60)
|
|||
|
|
print()
|
|||
|
|
|
|||
|
|
def list_order_classes() -> List[str]:
|
|||
|
|
"""列出所有可用的订单班"""
|
|||
|
|
available = []
|
|||
|
|
for order_dir in DATA_PATH.iterdir():
|
|||
|
|
if order_dir.is_dir() and order_dir.name in ALL_ORDER_CLASSES:
|
|||
|
|
available.append(order_dir.name)
|
|||
|
|
return sorted(available)
|
|||
|
|
|
|||
|
|
def select_order_classes() -> List[str]:
|
|||
|
|
"""选择要处理的订单班"""
|
|||
|
|
available = list_order_classes()
|
|||
|
|
|
|||
|
|
print("
|
|||
|
|
可用的订单班:")
|
|||
|
|
for i, name in enumerate(available, 1):
|
|||
|
|
print(f" {i:2d}. {name}")
|
|||
|
|
print(f" {len(available)+1:2d}. 全部选择")
|
|||
|
|
print()
|
|||
|
|
|
|||
|
|
selection = input("请选择订单班(可输入多个编号,用逗号分隔,如: 1,3,5): ").strip()
|
|||
|
|
|
|||
|
|
if selection == str(len(available) + 1):
|
|||
|
|
return available
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
indices = [int(x.strip()) - 1 for x in selection.split(',')]
|
|||
|
|
selected = [available[i] for i in indices if 0 <= i < len(available)]
|
|||
|
|
return selected if selected else available
|
|||
|
|
except:
|
|||
|
|
print("输入无效,默认选择全部订单班")
|
|||
|
|
return available
|
|||
|
|
|
|||
|
|
def backup_data(order_classes: Optional[List[str]] = None):
|
|||
|
|
"""备份数据"""
|
|||
|
|
print("
|
|||
|
|
创建备份...")
|
|||
|
|
backup_dir = BASE_PATH / "backups" / f"backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
|
|||
|
|
backup_dir.mkdir(parents=True, exist_ok=True)
|
|||
|
|
|
|||
|
|
# 复制指定的订单班目录
|
|||
|
|
dirs_to_backup = order_classes if order_classes else ALL_ORDER_CLASSES
|
|||
|
|
for dir_name in dirs_to_backup:
|
|||
|
|
order_dir = DATA_PATH / dir_name
|
|||
|
|
if order_dir.exists() and order_dir.is_dir():
|
|||
|
|
target = backup_dir / dir_name
|
|||
|
|
shutil.copytree(order_dir, target)
|
|||
|
|
print(f" 备份: {dir_name}")
|
|||
|
|
|
|||
|
|
print(f"✅ 备份完成: {backup_dir}")
|
|||
|
|
return backup_dir
|
|||
|
|
|
|||
|
|
def run_script_for_orders(script_name: str, description: str, order_classes: List[str]):
|
|||
|
|
"""为指定的订单班运行脚本"""
|
|||
|
|
script_path = SCRIPT_PATH / script_name
|
|||
|
|
if not script_path.exists():
|
|||
|
|
print(f"⚠️ 跳过 {description}:脚本不存在")
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
print(f"
|
|||
|
|
{description}...")
|
|||
|
|
|
|||
|
|
# 这里可以根据需要修改脚本来支持特定订单班参数
|
|||
|
|
# 暂时还是运行整体脚本,但只会影响选中的订单班
|
|||
|
|
temp_script = create_temp_script_for_orders(script_name, order_classes)
|
|||
|
|
if temp_script:
|
|||
|
|
os.system(f'python3 "{temp_script}"')
|
|||
|
|
os.remove(temp_script)
|
|||
|
|
else:
|
|||
|
|
os.system(f'python3 "{script_path}"')
|
|||
|
|
return True
|
|||
|
|
|
|||
|
|
def create_temp_script_for_orders(script_name: str, order_classes: List[str]) -> Optional[Path]:
|
|||
|
|
"""创建临时脚本,只处理指定的订单班"""
|
|||
|
|
# 这是一个示例实现,根据实际脚本结构可能需要调整
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
def validate_order_class(order_name: str):
|
|||
|
|
"""验证单个订单班的图片"""
|
|||
|
|
print(f"
|
|||
|
|
检查 {order_name} ...")
|
|||
|
|
order_dir = DATA_PATH / order_name
|
|||
|
|
|
|||
|
|
if not order_dir.exists():
|
|||
|
|
print(f" ⚠️ 目录不存在")
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
# 检查图片目录
|
|||
|
|
image_dir = order_dir / "notion文稿" / "image"
|
|||
|
|
if not image_dir.exists():
|
|||
|
|
print(f" ⚠️ 没有图片目录")
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
# 统计图片
|
|||
|
|
jpg_files = list(image_dir.glob("*.jpg"))
|
|||
|
|
png_files = list(image_dir.glob("*.png"))
|
|||
|
|
jpeg_files = list(image_dir.glob("*.jpeg"))
|
|||
|
|
|
|||
|
|
total = len(jpg_files) + len(png_files) + len(jpeg_files)
|
|||
|
|
print(f" 图片统计: {len(jpg_files)} JPG, {len(png_files)} PNG, {len(jpeg_files)} JPEG")
|
|||
|
|
print(f" 总计: {total} 张图片")
|
|||
|
|
|
|||
|
|
# 检查命名规范
|
|||
|
|
non_standard = []
|
|||
|
|
for img in jpg_files:
|
|||
|
|
if not (img.name.startswith("图片_") or
|
|||
|
|
img.name.startswith("设计图_") or
|
|||
|
|
img.name.startswith("场景图_") or
|
|||
|
|
img.name.startswith("展示图_")):
|
|||
|
|
non_standard.append(img.name)
|
|||
|
|
|
|||
|
|
if non_standard:
|
|||
|
|
print(f" ⚠️ 非标准命名: {len(non_standard)} 个")
|
|||
|
|
for name in non_standard[:5]: # 只显示前5个
|
|||
|
|
print(f" - {name}")
|
|||
|
|
if len(non_standard) > 5:
|
|||
|
|
print(f" ... 还有 {len(non_standard)-5} 个")
|
|||
|
|
else:
|
|||
|
|
print(f" ✅ 所有图片命名规范")
|
|||
|
|
|
|||
|
|
# 检查MD文件中的图片引用
|
|||
|
|
md_files = list((order_dir / "notion文稿").glob("*.md"))
|
|||
|
|
if md_files:
|
|||
|
|
import re
|
|||
|
|
broken_refs = []
|
|||
|
|
for md_file in md_files:
|
|||
|
|
content = md_file.read_text(encoding='utf-8')
|
|||
|
|
# 查找图片引用
|
|||
|
|
img_refs = re.findall(r'!\[.*?\]\((.*?)\)', content)
|
|||
|
|
for ref in img_refs:
|
|||
|
|
if ref.startswith('image/'):
|
|||
|
|
img_path = order_dir / "notion文稿" / ref
|
|||
|
|
if not img_path.exists():
|
|||
|
|
broken_refs.append((md_file.name, ref))
|
|||
|
|
|
|||
|
|
if broken_refs:
|
|||
|
|
print(f" ⚠️ 损坏的图片引用: {len(broken_refs)} 个")
|
|||
|
|
for md_name, ref in broken_refs[:3]:
|
|||
|
|
print(f" - {md_name}: {ref}")
|
|||
|
|
else:
|
|||
|
|
print(f" ✅ 所有图片引用有效")
|
|||
|
|
|
|||
|
|
def quick_validate(order_classes: List[str]):
|
|||
|
|
"""快速验证指定订单班的结果"""
|
|||
|
|
print("
|
|||
|
|
" + "=" * 60)
|
|||
|
|
print("快速验证结果")
|
|||
|
|
print("=" * 60)
|
|||
|
|
|
|||
|
|
# 统计图片数量
|
|||
|
|
total_images = 0
|
|||
|
|
for dir_name in order_classes:
|
|||
|
|
order_dir = DATA_PATH / dir_name
|
|||
|
|
if order_dir.exists() and order_dir.is_dir():
|
|||
|
|
image_dir = order_dir / "notion文稿" / "image"
|
|||
|
|
if image_dir.exists():
|
|||
|
|
count = len(list(image_dir.glob("*.jpg")))
|
|||
|
|
if count > 0:
|
|||
|
|
print(f" {dir_name}: {count} 张图片")
|
|||
|
|
total_images += count
|
|||
|
|
|
|||
|
|
print(f"
|
|||
|
|
总计: {total_images} 张图片")
|
|||
|
|
|
|||
|
|
# 检查URL编码(只在选定的目录中)
|
|||
|
|
print("
|
|||
|
|
检查URL编码路径...")
|
|||
|
|
import subprocess
|
|||
|
|
url_paths_count = 0
|
|||
|
|
for dir_name in order_classes:
|
|||
|
|
order_dir = DATA_PATH / dir_name
|
|||
|
|
if order_dir.exists():
|
|||
|
|
result = subprocess.run(
|
|||
|
|
['grep', '-r', '%[0-9A-F][0-9A-F]', str(order_dir)],
|
|||
|
|
capture_output=True,
|
|||
|
|
text=True
|
|||
|
|
)
|
|||
|
|
url_paths = [line for line in result.stdout.split('
|
|||
|
|
')
|
|||
|
|
if any(ext in line for ext in ['.jpg', '.jpeg', '.png'])]
|
|||
|
|
if url_paths:
|
|||
|
|
url_paths_count += len(url_paths)
|
|||
|
|
print(f" {dir_name}: {len(url_paths)} 个URL编码路径")
|
|||
|
|
|
|||
|
|
if url_paths_count == 0:
|
|||
|
|
print("✅ 没有URL编码路径")
|
|||
|
|
|
|||
|
|
def process_single_order_class(order_name: str, action: str):
|
|||
|
|
"""处理单个订单班"""
|
|||
|
|
print(f"
|
|||
|
|
=== 处理 {order_name} ===")
|
|||
|
|
|
|||
|
|
order_dir = DATA_PATH / order_name
|
|||
|
|
if not order_dir.exists():
|
|||
|
|
print(f"⚠️ {order_name} 目录不存在")
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
if action == "validate":
|
|||
|
|
validate_order_class(order_name)
|
|||
|
|
elif action == "fix":
|
|||
|
|
# 这里可以添加具体的修复逻辑
|
|||
|
|
print(f"修复 {order_name} 的图片...")
|
|||
|
|
# TODO: 实现针对单个订单班的修复
|
|||
|
|
pass
|
|||
|
|
elif action == "backup":
|
|||
|
|
backup_data([order_name])
|
|||
|
|
|
|||
|
|
def main():
|
|||
|
|
"""主函数"""
|
|||
|
|
print_banner()
|
|||
|
|
|
|||
|
|
print("请选择操作模式:")
|
|||
|
|
print("1. 处理所有订单班")
|
|||
|
|
print("2. 选择特定订单班")
|
|||
|
|
print("3. 单独检查每个订单班")
|
|||
|
|
print("4. 快速验证")
|
|||
|
|
print("5. 退出")
|
|||
|
|
print()
|
|||
|
|
|
|||
|
|
mode = input("请输入选项 (1-5): ").strip()
|
|||
|
|
|
|||
|
|
if mode == "1":
|
|||
|
|
# 处理所有订单班(原有逻辑)
|
|||
|
|
print("
|
|||
|
|
=== 处理所有订单班 ===")
|
|||
|
|
|
|||
|
|
print("
|
|||
|
|
请选择执行操作:")
|
|||
|
|
print("1. 完整执行(推荐)")
|
|||
|
|
print("2. 仅验证")
|
|||
|
|
print("3. 仅修复路径")
|
|||
|
|
print()
|
|||
|
|
|
|||
|
|
choice = input("请输入选项 (1-3): ").strip()
|
|||
|
|
|
|||
|
|
if choice == "1":
|
|||
|
|
# 完整执行
|
|||
|
|
print("
|
|||
|
|
=== 完整执行模式 ===
|
|||
|
|
")
|
|||
|
|
|
|||
|
|
# 询问是否备份
|
|||
|
|
backup_choice = input("是否需要先备份?(y/n): ").strip().lower()
|
|||
|
|
if backup_choice == 'y':
|
|||
|
|
backup_data()
|
|||
|
|
|
|||
|
|
# 执行主要脚本
|
|||
|
|
print("
|
|||
|
|
开始处理图片...")
|
|||
|
|
|
|||
|
|
# 1. 处理所有订单班图片
|
|||
|
|
run_script_for_orders("fix_all_orders_images.py", "统一图片格式和重命名", ALL_ORDER_CLASSES)
|
|||
|
|
|
|||
|
|
# 2. 修复Markdown路径
|
|||
|
|
run_script_for_orders("fix_all_markdown_paths.py", "修复Markdown路径", ALL_ORDER_CLASSES)
|
|||
|
|
|
|||
|
|
# 3. 完成最终修复
|
|||
|
|
run_script_for_orders("complete_fix_paths.py", "修复特殊路径问题", ALL_ORDER_CLASSES)
|
|||
|
|
|
|||
|
|
# 4. 验证结果
|
|||
|
|
run_script_for_orders("final_validation.py", "验证所有路径", ALL_ORDER_CLASSES)
|
|||
|
|
|
|||
|
|
elif choice == "2":
|
|||
|
|
# 仅验证
|
|||
|
|
print("
|
|||
|
|
=== 验证模式 ===")
|
|||
|
|
run_script_for_orders("final_validation.py", "验证所有路径", ALL_ORDER_CLASSES)
|
|||
|
|
quick_validate(ALL_ORDER_CLASSES)
|
|||
|
|
|
|||
|
|
elif choice == "3":
|
|||
|
|
# 仅修复路径
|
|||
|
|
print("
|
|||
|
|
=== 修复路径模式 ===")
|
|||
|
|
run_script_for_orders("fix_all_markdown_paths.py", "修复Markdown路径", ALL_ORDER_CLASSES)
|
|||
|
|
run_script_for_orders("complete_fix_paths.py", "修复特殊路径问题", ALL_ORDER_CLASSES)
|
|||
|
|
run_script_for_orders("final_validation.py", "验证修复结果", ALL_ORDER_CLASSES)
|
|||
|
|
|
|||
|
|
elif mode == "2":
|
|||
|
|
# 选择特定订单班
|
|||
|
|
print("
|
|||
|
|
=== 选择特定订单班 ===")
|
|||
|
|
selected = select_order_classes()
|
|||
|
|
|
|||
|
|
if not selected:
|
|||
|
|
print("未选择任何订单班")
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
print(f"
|
|||
|
|
已选择: {', '.join(selected)}")
|
|||
|
|
|
|||
|
|
print("
|
|||
|
|
请选择操作:")
|
|||
|
|
print("1. 验证")
|
|||
|
|
print("2. 修复")
|
|||
|
|
print("3. 备份")
|
|||
|
|
print("4. 完整处理")
|
|||
|
|
print()
|
|||
|
|
|
|||
|
|
action = input("请输入选项 (1-4): ").strip()
|
|||
|
|
|
|||
|
|
if action == "1":
|
|||
|
|
for order_name in selected:
|
|||
|
|
validate_order_class(order_name)
|
|||
|
|
elif action == "2":
|
|||
|
|
print("
|
|||
|
|
修复选定的订单班...")
|
|||
|
|
# 可以在这里添加具体的修复逻辑
|
|||
|
|
for order_name in selected:
|
|||
|
|
process_single_order_class(order_name, "fix")
|
|||
|
|
elif action == "3":
|
|||
|
|
backup_data(selected)
|
|||
|
|
elif action == "4":
|
|||
|
|
backup_choice = input("是否需要先备份?(y/n): ").strip().lower()
|
|||
|
|
if backup_choice == 'y':
|
|||
|
|
backup_data(selected)
|
|||
|
|
|
|||
|
|
print("
|
|||
|
|
开始处理选定的订单班...")
|
|||
|
|
run_script_for_orders("fix_all_orders_images.py", "统一图片格式和重命名", selected)
|
|||
|
|
run_script_for_orders("fix_all_markdown_paths.py", "修复Markdown路径", selected)
|
|||
|
|
run_script_for_orders("complete_fix_paths.py", "修复特殊路径问题", selected)
|
|||
|
|
quick_validate(selected)
|
|||
|
|
|
|||
|
|
elif mode == "3":
|
|||
|
|
# 单独检查每个订单班
|
|||
|
|
print("
|
|||
|
|
=== 单独检查每个订单班 ===")
|
|||
|
|
available = list_order_classes()
|
|||
|
|
|
|||
|
|
for order_name in available:
|
|||
|
|
validate_order_class(order_name)
|
|||
|
|
|
|||
|
|
# 询问是否继续
|
|||
|
|
if order_name != available[-1]: # 如果不是最后一个
|
|||
|
|
cont = input("
|
|||
|
|
按Enter继续,输入q退出: ").strip()
|
|||
|
|
if cont.lower() == 'q':
|
|||
|
|
break
|
|||
|
|
|
|||
|
|
elif mode == "4":
|
|||
|
|
# 快速验证
|
|||
|
|
print("
|
|||
|
|
=== 快速验证模式 ===")
|
|||
|
|
selected = select_order_classes()
|
|||
|
|
quick_validate(selected)
|
|||
|
|
|
|||
|
|
elif mode == "5":
|
|||
|
|
print("退出程序")
|
|||
|
|
sys.exit(0)
|
|||
|
|
else:
|
|||
|
|
print("无效选项")
|
|||
|
|
sys.exit(1)
|
|||
|
|
|
|||
|
|
print("
|
|||
|
|
" + "=" * 60)
|
|||
|
|
print("✅ 执行完成!")
|
|||
|
|
print("=" * 60)
|
|||
|
|
|
|||
|
|
if __name__ == "__main__":
|
|||
|
|
main()
|