#!/usr/bin/env python3 """ 安全的图片整理脚本 - 包含备份和恢复功能 运行前会自动备份,出问题可以恢复 """ import os import re import shutil import json from pathlib import Path from datetime import datetime from typing import Dict, List import sys # 配置 BASE_PATH = Path("/Users/xiaoqi/Documents/Dev/Project/2025-09-08_n8nDEMO演示/data/订单班文档资料") BACKUP_DIR = BASE_PATH.parent.parent / "backups" / f"backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}" def main(): """主函数""" # 解析参数 if len(sys.argv) > 1: if sys.argv[1] == '--restore': restore_from_backup() return elif sys.argv[1] == '--test': # 只处理文旅订单班作为测试 process_test_order() return elif sys.argv[1] == '--help': print_help() return # 默认执行全部整理 organize_all_orders() def print_help(): """打印帮助信息""" print(""" 图片整理脚本 - 使用说明 ======================== 命令选项: python3 safe_organize_images.py # 处理所有订单班(会先备份) python3 safe_organize_images.py --test # 只测试文旅订单班 python3 safe_organize_images.py --restore # 从最近的备份恢复 python3 safe_organize_images.py --help # 显示帮助 功能说明: 1. 统一所有图片格式为 .jpg 2. 规范化图片命名(设计图_01.jpg、展示图_02.jpg 等) 3. 将散落的图片移动到 notion文稿/image/ 目录 4. 更新 Markdown 文件中的图片引用 5. 生成图片映射文件和索引 安全特性: - 执行前自动备份所有文件 - 可以随时恢复到备份状态 - 先在文旅订单班测试,确认无误后再处理全部 """) def process_test_order(): """只处理文旅订单班作为测试""" print("=" * 60) print("🧪 测试模式:只处理文旅订单班") print("=" * 60) order_dir = BASE_PATH / "文旅" if not order_dir.exists(): print("❌ 文旅订单班目录不存在") return # 创建测试备份 test_backup = BASE_PATH.parent.parent / "backups" / f"test_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}" backup_order(order_dir, test_backup / "文旅") print(f"✅ 已备份到: {test_backup}") print("") # 处理文旅订单班 notion_dir = order_dir / "notion文稿" if notion_dir.exists(): process_single_order("文旅", notion_dir) print("\n" + "=" * 60) print("✅ 测试完成!") print("请检查 文旅/notion文稿/image/ 目录") print("如果满意,运行完整脚本处理所有订单班") print("如果有问题,运行 --restore 恢复") def organize_all_orders(): """处理所有订单班""" print("=" * 60) print("🔧 开始整理所有订单班图片") print("=" * 60) # 先创建完整备份 print("\n📦 创建备份...") create_full_backup() print(f"✅ 备份完成: {BACKUP_DIR}") # 订单班列表 order_classes = [ "文旅", "财经商贸", "食品", "智能开发", "智能制造", "视觉设计", "交通物流", "土木", "大健康", "能源", "化工", "环保" ] processed = 0 errors = [] for order_class in order_classes: order_dir = BASE_PATH / order_class if not order_dir.exists(): print(f"⚠️ 跳过 {order_class}:目录不存在") continue notion_dir = order_dir / "notion文稿" if not notion_dir.exists(): print(f"⚠️ 跳过 {order_class}:notion文稿目录不存在") continue try: process_single_order(order_class, notion_dir) processed += 1 except Exception as e: errors.append(f"{order_class}: {str(e)}") print(f"❌ {order_class} 处理失败: {e}") # 打印总结 print("\n" + "=" * 60) print("📊 处理完成") print("=" * 60) print(f"成功处理: {processed} 个订单班") if errors: print(f"失败: {len(errors)} 个") for err in errors: print(f" - {err}") print(f"\n如需恢复,运行: python3 safe_organize_images.py --restore") else: print("✅ 全部成功!") def process_single_order(order_class: str, notion_dir: Path): """处理单个订单班""" print(f"\n处理 {order_class}...") # 确保image目录存在 image_dir = notion_dir / "image" image_dir.mkdir(exist_ok=True) # 收集所有图片 all_images = [] # 查找所有图片文件 for ext in ['jpg', 'jpeg', 'png', 'gif', 'bmp', 'webp', 'JPG', 'JPEG', 'PNG']: all_images.extend(notion_dir.glob(f"*.{ext}")) all_images.extend(notion_dir.glob(f"image/*.{ext}")) # 去重 all_images = list(set(all_images)) if not all_images: print(f" 没有找到图片") return print(f" 找到 {len(all_images)} 张图片") # 记录映射关系 image_mapping = {} # 分类计数器 counters = { "设计图": 0, "展示图": 0, "效果图": 0, "流程图": 0, "场景图": 0, "图片": 0 } # 处理每张图片 for img_path in sorted(all_images): old_name = img_path.name new_name = generate_standard_name(old_name, order_class, counters) # 目标路径 new_path = image_dir / new_name # 如果需要移动或重命名 if img_path != new_path: # 处理文件名冲突 if new_path.exists(): # 生成唯一名称 base_name = new_name.rsplit('.', 1)[0] for i in range(2, 100): new_name = f"{base_name}_{i}.jpg" new_path = image_dir / new_name if not new_path.exists(): break # 移动并转换格式 try: # 简单的移动或复制 if img_path.suffix.lower() == '.jpg' and img_path.parent == image_dir: # 只是重命名 shutil.move(str(img_path), str(new_path)) else: # 需要移动到image目录 shutil.copy2(str(img_path), str(new_path)) if img_path.parent != image_dir: img_path.unlink() # 删除原位置的文件 image_mapping[old_name] = new_name print(f" ✓ {old_name} → {new_name}") except Exception as e: print(f" ✗ 处理失败 {old_name}: {e}") # 更新Markdown文件 if image_mapping: update_markdown_files(notion_dir, image_mapping) save_mapping(notion_dir, image_mapping) # 创建图片索引 create_index(order_class, image_dir) print(f" 完成!处理了 {len(image_mapping)} 张图片") def generate_standard_name(filename: str, order_class: str, counters: Dict[str, int]) -> str: """生成标准化的文件名""" name = Path(filename).stem.lower() # 特殊处理文旅订单班的Whisk图片 if order_class == "文旅" and 'whisk' in name: counters["设计图"] += 1 return f"设计图_{counters['设计图']:02d}.jpg" # 根据关键词分类 if any(kw in name for kw in ['设计', 'design', 'whisk', 'cad', '3d', '三维', '渲染']): counters["设计图"] += 1 return f"设计图_{counters['设计图']:02d}.jpg" elif any(kw in name for kw in ['展示', 'display', 'show', '展台', '展位']): counters["展示图"] += 1 return f"展示图_{counters['展示图']:02d}.jpg" elif any(kw in name for kw in ['效果', 'effect', 'render']): counters["效果图"] += 1 return f"效果图_{counters['效果图']:02d}.jpg" elif any(kw in name for kw in ['流程', 'flow', 'process', '步骤']): counters["流程图"] += 1 return f"流程图_{counters['流程图']:02d}.jpg" elif any(kw in name for kw in ['场景', 'scene', '展会', '博览', '会场', '签到', '试驾', '小米']): counters["场景图"] += 1 return f"场景图_{counters['场景图']:02d}.jpg" elif name[0:1].isdigit(): counters["展示图"] += 1 return f"展示图_{counters['展示图']:02d}.jpg" else: counters["图片"] += 1 return f"图片_{counters['图片']:02d}.jpg" def update_markdown_files(notion_dir: Path, image_mapping: Dict[str, str]): """更新Markdown文件中的图片引用""" md_files = list(notion_dir.glob("*.md")) for md_file in md_files: try: content = md_file.read_text(encoding='utf-8') original_content = content for old_name, new_name in image_mapping.items(): # 替换各种可能的引用格式 patterns = [ (f"!\\[([^\\]]*)\\]\\({re.escape(old_name)}\\)", f"![\\1](./image/{new_name})"), (f"!\\[([^\\]]*)\\]\\(\\./image/{re.escape(old_name)}\\)", f"![\\1](./image/{new_name})"), (f"!\\[([^\\]]*)\\]\\(image/{re.escape(old_name)}\\)", f"![\\1](./image/{new_name})"), (f'src="{re.escape(old_name)}"', f'src="./image/{new_name}"'), (f"src='{re.escape(old_name)}'", f"src='./image/{new_name}'"), ] for pattern, replacement in patterns: content = re.sub(pattern, replacement, content, flags=re.IGNORECASE) # 如果内容变化,保存文件 if content != original_content: md_file.write_text(content, encoding='utf-8') print(f" ✓ 更新MD: {md_file.name}") except Exception as e: print(f" ✗ 更新MD失败 {md_file.name}: {e}") def save_mapping(notion_dir: Path, image_mapping: Dict[str, str]): """保存映射文件""" mapping_file = notion_dir / "图片映射.json" with open(mapping_file, 'w', encoding='utf-8') as f: json.dump(image_mapping, f, ensure_ascii=False, indent=2) def create_index(order_class: str, image_dir: Path): """创建图片索引""" images = sorted(image_dir.glob("*.jpg")) if not images: return # 分类 categories = { "设计图": [], "展示图": [], "效果图": [], "流程图": [], "场景图": [], "其他": [] } for img in images: name = img.name categorized = False for cat in ["设计图", "展示图", "效果图", "流程图", "场景图"]: if name.startswith(cat): categories[cat].append(name) categorized = True break if not categorized: categories["其他"].append(name) # 生成索引内容 content = f"# {order_class}订单班 - 图片索引\n\n" content += f"**图片总数**: {len(images)} 张\n" content += f"**更新时间**: {datetime.now().strftime('%Y-%m-%d %H:%M')}\n\n" for cat, files in categories.items(): if files: content += f"## {cat} ({len(files)}张)\n" for file in files: content += f"- {file}\n" content += "\n" # 保存索引 index_file = image_dir.parent / "图片索引.md" index_file.write_text(content, encoding='utf-8') def create_full_backup(): """创建完整备份""" BACKUP_DIR.mkdir(parents=True, exist_ok=True) # 备份所有订单班 for order_dir in BASE_PATH.iterdir(): if order_dir.is_dir() and order_dir.name != '.DS_Store': backup_order(order_dir, BACKUP_DIR / order_dir.name) def backup_order(source_dir: Path, backup_dir: Path): """备份单个订单班""" if source_dir.exists(): shutil.copytree(source_dir, backup_dir, dirs_exist_ok=True) def restore_from_backup(): """从备份恢复""" backups_dir = BASE_PATH.parent.parent / "backups" if not backups_dir.exists(): print("❌ 没有找到备份目录") return # 列出所有备份 backups = sorted([d for d in backups_dir.iterdir() if d.is_dir()]) if not backups: print("❌ 没有找到任何备份") return print("可用的备份:") for i, backup in enumerate(backups[-5:], 1): # 只显示最近5个 print(f"{i}. {backup.name}") # 选择备份 choice = input("\n请选择要恢复的备份 (输入编号): ") try: backup_dir = backups[-5:][int(choice) - 1] except: print("❌ 无效的选择") return print(f"\n准备从 {backup_dir.name} 恢复...") confirm = input("确认恢复?这将覆盖当前文件 (y/n): ") if confirm.lower() != 'y': print("取消恢复") return # 执行恢复 for order_dir in backup_dir.iterdir(): if order_dir.is_dir(): target_dir = BASE_PATH / order_dir.name if target_dir.exists(): shutil.rmtree(target_dir) shutil.copytree(order_dir, target_dir) print(f"✅ 恢复 {order_dir.name}") print("\n✅ 恢复完成!") if __name__ == "__main__": main()