Files
n8n_Demo/scripts/organize_images_main.py

391 lines
13 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""
订单班图片整理主程序 v2.0
支持针对不同订单班单独执行或检查
"""
import os
import sys
import shutil
from pathlib import Path
from datetime import datetime
from typing import List, Optional
# 设置路径
BASE_PATH = Path("/Users/xiaoqi/Documents/Dev/Project/2025-09-08_n8nDEMO演示")
DATA_PATH = BASE_PATH / "data/订单班文档资料"
SCRIPT_PATH = BASE_PATH / "scripts"
# 预定义所有订单班
ALL_ORDER_CLASSES = [
"食品", "环保", "财经商贸", "视觉设计", "能源",
"交通物流", "智能制造", "文旅", "化工", "大健康",
"智能开发", "土木"
]
def print_banner():
"""打印标题"""
print("=" * 60)
print(" 订单班图片资源整理工具 v2.0")
print("=" * 60)
print()
def list_order_classes() -> List[str]:
"""列出所有可用的订单班"""
available = []
for order_dir in DATA_PATH.iterdir():
if order_dir.is_dir() and order_dir.name in ALL_ORDER_CLASSES:
available.append(order_dir.name)
return sorted(available)
def select_order_classes() -> List[str]:
"""选择要处理的订单班"""
available = list_order_classes()
print("
可用的订单班")
for i, name in enumerate(available, 1):
print(f" {i:2d}. {name}")
print(f" {len(available)+1:2d}. 全部选择")
print()
selection = input("请选择订单班(可输入多个编号,用逗号分隔,如: 1,3,5: ").strip()
if selection == str(len(available) + 1):
return available
try:
indices = [int(x.strip()) - 1 for x in selection.split(',')]
selected = [available[i] for i in indices if 0 <= i < len(available)]
return selected if selected else available
except:
print("输入无效,默认选择全部订单班")
return available
def backup_data(order_classes: Optional[List[str]] = None):
"""备份数据"""
print("
创建备份...")
backup_dir = BASE_PATH / "backups" / f"backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
backup_dir.mkdir(parents=True, exist_ok=True)
# 复制指定的订单班目录
dirs_to_backup = order_classes if order_classes else ALL_ORDER_CLASSES
for dir_name in dirs_to_backup:
order_dir = DATA_PATH / dir_name
if order_dir.exists() and order_dir.is_dir():
target = backup_dir / dir_name
shutil.copytree(order_dir, target)
print(f" 备份: {dir_name}")
print(f"✅ 备份完成: {backup_dir}")
return backup_dir
def run_script_for_orders(script_name: str, description: str, order_classes: List[str]):
"""为指定的订单班运行脚本"""
script_path = SCRIPT_PATH / script_name
if not script_path.exists():
print(f"⚠️ 跳过 {description}:脚本不存在")
return False
print(f"
{description}...")
# 这里可以根据需要修改脚本来支持特定订单班参数
# 暂时还是运行整体脚本,但只会影响选中的订单班
temp_script = create_temp_script_for_orders(script_name, order_classes)
if temp_script:
os.system(f'python3 "{temp_script}"')
os.remove(temp_script)
else:
os.system(f'python3 "{script_path}"')
return True
def create_temp_script_for_orders(script_name: str, order_classes: List[str]) -> Optional[Path]:
"""创建临时脚本,只处理指定的订单班"""
# 这是一个示例实现,根据实际脚本结构可能需要调整
return None
def validate_order_class(order_name: str):
"""验证单个订单班的图片"""
print(f"
检查 {order_name} ...")
order_dir = DATA_PATH / order_name
if not order_dir.exists():
print(f" ⚠️ 目录不存在")
return
# 检查图片目录
image_dir = order_dir / "notion文稿" / "image"
if not image_dir.exists():
print(f" ⚠️ 没有图片目录")
return
# 统计图片
jpg_files = list(image_dir.glob("*.jpg"))
png_files = list(image_dir.glob("*.png"))
jpeg_files = list(image_dir.glob("*.jpeg"))
total = len(jpg_files) + len(png_files) + len(jpeg_files)
print(f" 图片统计: {len(jpg_files)} JPG, {len(png_files)} PNG, {len(jpeg_files)} JPEG")
print(f" 总计: {total} 张图片")
# 检查命名规范
non_standard = []
for img in jpg_files:
if not (img.name.startswith("图片_") or
img.name.startswith("设计图_") or
img.name.startswith("场景图_") or
img.name.startswith("展示图_")):
non_standard.append(img.name)
if non_standard:
print(f" ⚠️ 非标准命名: {len(non_standard)}")
for name in non_standard[:5]: # 只显示前5个
print(f" - {name}")
if len(non_standard) > 5:
print(f" ... 还有 {len(non_standard)-5}")
else:
print(f" ✅ 所有图片命名规范")
# 检查MD文件中的图片引用
md_files = list((order_dir / "notion文稿").glob("*.md"))
if md_files:
import re
broken_refs = []
for md_file in md_files:
content = md_file.read_text(encoding='utf-8')
# 查找图片引用
img_refs = re.findall(r'!\[.*?\]\((.*?)\)', content)
for ref in img_refs:
if ref.startswith('image/'):
img_path = order_dir / "notion文稿" / ref
if not img_path.exists():
broken_refs.append((md_file.name, ref))
if broken_refs:
print(f" ⚠️ 损坏的图片引用: {len(broken_refs)}")
for md_name, ref in broken_refs[:3]:
print(f" - {md_name}: {ref}")
else:
print(f" ✅ 所有图片引用有效")
def quick_validate(order_classes: List[str]):
"""快速验证指定订单班的结果"""
print("
" + "=" * 60)
print("快速验证结果")
print("=" * 60)
# 统计图片数量
total_images = 0
for dir_name in order_classes:
order_dir = DATA_PATH / dir_name
if order_dir.exists() and order_dir.is_dir():
image_dir = order_dir / "notion文稿" / "image"
if image_dir.exists():
count = len(list(image_dir.glob("*.jpg")))
if count > 0:
print(f" {dir_name}: {count} 张图片")
total_images += count
print(f"
总计: {total_images} 张图片")
# 检查URL编码只在选定的目录中
print("
检查URL编码路径...")
import subprocess
url_paths_count = 0
for dir_name in order_classes:
order_dir = DATA_PATH / dir_name
if order_dir.exists():
result = subprocess.run(
['grep', '-r', '%[0-9A-F][0-9A-F]', str(order_dir)],
capture_output=True,
text=True
)
url_paths = [line for line in result.stdout.split('
')
if any(ext in line for ext in ['.jpg', '.jpeg', '.png'])]
if url_paths:
url_paths_count += len(url_paths)
print(f" {dir_name}: {len(url_paths)} 个URL编码路径")
if url_paths_count == 0:
print("✅ 没有URL编码路径")
def process_single_order_class(order_name: str, action: str):
"""处理单个订单班"""
print(f"
=== 处理 {order_name} ===")
order_dir = DATA_PATH / order_name
if not order_dir.exists():
print(f"⚠️ {order_name} 目录不存在")
return
if action == "validate":
validate_order_class(order_name)
elif action == "fix":
# 这里可以添加具体的修复逻辑
print(f"修复 {order_name} 的图片...")
# TODO: 实现针对单个订单班的修复
pass
elif action == "backup":
backup_data([order_name])
def main():
"""主函数"""
print_banner()
print("请选择操作模式:")
print("1. 处理所有订单班")
print("2. 选择特定订单班")
print("3. 单独检查每个订单班")
print("4. 快速验证")
print("5. 退出")
print()
mode = input("请输入选项 (1-5): ").strip()
if mode == "1":
# 处理所有订单班(原有逻辑)
print("
=== 处理所有订单班 ===")
print("
请选择执行操作")
print("1. 完整执行(推荐)")
print("2. 仅验证")
print("3. 仅修复路径")
print()
choice = input("请输入选项 (1-3): ").strip()
if choice == "1":
# 完整执行
print("
=== 完整执行模式 ===
")
# 询问是否备份
backup_choice = input("是否需要先备份?(y/n): ").strip().lower()
if backup_choice == 'y':
backup_data()
# 执行主要脚本
print("
开始处理图片...")
# 1. 处理所有订单班图片
run_script_for_orders("fix_all_orders_images.py", "统一图片格式和重命名", ALL_ORDER_CLASSES)
# 2. 修复Markdown路径
run_script_for_orders("fix_all_markdown_paths.py", "修复Markdown路径", ALL_ORDER_CLASSES)
# 3. 完成最终修复
run_script_for_orders("complete_fix_paths.py", "修复特殊路径问题", ALL_ORDER_CLASSES)
# 4. 验证结果
run_script_for_orders("final_validation.py", "验证所有路径", ALL_ORDER_CLASSES)
elif choice == "2":
# 仅验证
print("
=== 验证模式 ===")
run_script_for_orders("final_validation.py", "验证所有路径", ALL_ORDER_CLASSES)
quick_validate(ALL_ORDER_CLASSES)
elif choice == "3":
# 仅修复路径
print("
=== 修复路径模式 ===")
run_script_for_orders("fix_all_markdown_paths.py", "修复Markdown路径", ALL_ORDER_CLASSES)
run_script_for_orders("complete_fix_paths.py", "修复特殊路径问题", ALL_ORDER_CLASSES)
run_script_for_orders("final_validation.py", "验证修复结果", ALL_ORDER_CLASSES)
elif mode == "2":
# 选择特定订单班
print("
=== 选择特定订单班 ===")
selected = select_order_classes()
if not selected:
print("未选择任何订单班")
return
print(f"
已选择: {', '.join(selected)}")
print("
请选择操作")
print("1. 验证")
print("2. 修复")
print("3. 备份")
print("4. 完整处理")
print()
action = input("请输入选项 (1-4): ").strip()
if action == "1":
for order_name in selected:
validate_order_class(order_name)
elif action == "2":
print("
修复选定的订单班...")
# 可以在这里添加具体的修复逻辑
for order_name in selected:
process_single_order_class(order_name, "fix")
elif action == "3":
backup_data(selected)
elif action == "4":
backup_choice = input("是否需要先备份?(y/n): ").strip().lower()
if backup_choice == 'y':
backup_data(selected)
print("
开始处理选定的订单班...")
run_script_for_orders("fix_all_orders_images.py", "统一图片格式和重命名", selected)
run_script_for_orders("fix_all_markdown_paths.py", "修复Markdown路径", selected)
run_script_for_orders("complete_fix_paths.py", "修复特殊路径问题", selected)
quick_validate(selected)
elif mode == "3":
# 单独检查每个订单班
print("
=== 单独检查每个订单班 ===")
available = list_order_classes()
for order_name in available:
validate_order_class(order_name)
# 询问是否继续
if order_name != available[-1]: # 如果不是最后一个
cont = input("
按Enter继续输入q退出: ").strip()
if cont.lower() == 'q':
break
elif mode == "4":
# 快速验证
print("
=== 快速验证模式 ===")
selected = select_order_classes()
quick_validate(selected)
elif mode == "5":
print("退出程序")
sys.exit(0)
else:
print("无效选项")
sys.exit(1)
print("
" + "=" * 60)
print("✅ 执行完成!")
print("=" * 60)
if __name__ == "__main__":
main()