Files
n8n_Demo/scripts/safe_organize_images.py

402 lines
13 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""
安全的图片整理脚本 - 包含备份和恢复功能
运行前会自动备份出问题可以恢复
"""
import os
import re
import shutil
import json
from pathlib import Path
from datetime import datetime
from typing import Dict, List
import sys
# 配置
BASE_PATH = Path("/Users/xiaoqi/Documents/Dev/Project/2025-09-08_n8nDEMO演示/data/订单班文档资料")
BACKUP_DIR = BASE_PATH.parent.parent / "backups" / f"backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
def main():
"""主函数"""
# 解析参数
if len(sys.argv) > 1:
if sys.argv[1] == '--restore':
restore_from_backup()
return
elif sys.argv[1] == '--test':
# 只处理文旅订单班作为测试
process_test_order()
return
elif sys.argv[1] == '--help':
print_help()
return
# 默认执行全部整理
organize_all_orders()
def print_help():
"""打印帮助信息"""
print("""
图片整理脚本 - 使用说明
========================
命令选项
python3 safe_organize_images.py # 处理所有订单班(会先备份)
python3 safe_organize_images.py --test # 只测试文旅订单班
python3 safe_organize_images.py --restore # 从最近的备份恢复
python3 safe_organize_images.py --help # 显示帮助
功能说明
1. 统一所有图片格式为 .jpg
2. 规范化图片命名设计图_01.jpg展示图_02.jpg
3. 将散落的图片移动到 notion文稿/image/ 目录
4. 更新 Markdown 文件中的图片引用
5. 生成图片映射文件和索引
安全特性
- 执行前自动备份所有文件
- 可以随时恢复到备份状态
- 先在文旅订单班测试确认无误后再处理全部
""")
def process_test_order():
"""只处理文旅订单班作为测试"""
print("=" * 60)
print("🧪 测试模式:只处理文旅订单班")
print("=" * 60)
order_dir = BASE_PATH / "文旅"
if not order_dir.exists():
print("❌ 文旅订单班目录不存在")
return
# 创建测试备份
test_backup = BASE_PATH.parent.parent / "backups" / f"test_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
backup_order(order_dir, test_backup / "文旅")
print(f"✅ 已备份到: {test_backup}")
print("")
# 处理文旅订单班
notion_dir = order_dir / "notion文稿"
if notion_dir.exists():
process_single_order("文旅", notion_dir)
print("\n" + "=" * 60)
print("✅ 测试完成!")
print("请检查 文旅/notion文稿/image/ 目录")
print("如果满意,运行完整脚本处理所有订单班")
print("如果有问题,运行 --restore 恢复")
def organize_all_orders():
"""处理所有订单班"""
print("=" * 60)
print("🔧 开始整理所有订单班图片")
print("=" * 60)
# 先创建完整备份
print("\n📦 创建备份...")
create_full_backup()
print(f"✅ 备份完成: {BACKUP_DIR}")
# 订单班列表
order_classes = [
"文旅", "财经商贸", "食品", "智能开发", "智能制造",
"视觉设计", "交通物流", "土木", "大健康", "能源",
"化工", "环保"
]
processed = 0
errors = []
for order_class in order_classes:
order_dir = BASE_PATH / order_class
if not order_dir.exists():
print(f"⚠️ 跳过 {order_class}:目录不存在")
continue
notion_dir = order_dir / "notion文稿"
if not notion_dir.exists():
print(f"⚠️ 跳过 {order_class}notion文稿目录不存在")
continue
try:
process_single_order(order_class, notion_dir)
processed += 1
except Exception as e:
errors.append(f"{order_class}: {str(e)}")
print(f"{order_class} 处理失败: {e}")
# 打印总结
print("\n" + "=" * 60)
print("📊 处理完成")
print("=" * 60)
print(f"成功处理: {processed} 个订单班")
if errors:
print(f"失败: {len(errors)}")
for err in errors:
print(f" - {err}")
print(f"\n如需恢复,运行: python3 safe_organize_images.py --restore")
else:
print("✅ 全部成功!")
def process_single_order(order_class: str, notion_dir: Path):
"""处理单个订单班"""
print(f"\n处理 {order_class}...")
# 确保image目录存在
image_dir = notion_dir / "image"
image_dir.mkdir(exist_ok=True)
# 收集所有图片
all_images = []
# 查找所有图片文件
for ext in ['jpg', 'jpeg', 'png', 'gif', 'bmp', 'webp', 'JPG', 'JPEG', 'PNG']:
all_images.extend(notion_dir.glob(f"*.{ext}"))
all_images.extend(notion_dir.glob(f"image/*.{ext}"))
# 去重
all_images = list(set(all_images))
if not all_images:
print(f" 没有找到图片")
return
print(f" 找到 {len(all_images)} 张图片")
# 记录映射关系
image_mapping = {}
# 分类计数器
counters = {
"设计图": 0,
"展示图": 0,
"效果图": 0,
"流程图": 0,
"场景图": 0,
"图片": 0
}
# 处理每张图片
for img_path in sorted(all_images):
old_name = img_path.name
new_name = generate_standard_name(old_name, order_class, counters)
# 目标路径
new_path = image_dir / new_name
# 如果需要移动或重命名
if img_path != new_path:
# 处理文件名冲突
if new_path.exists():
# 生成唯一名称
base_name = new_name.rsplit('.', 1)[0]
for i in range(2, 100):
new_name = f"{base_name}_{i}.jpg"
new_path = image_dir / new_name
if not new_path.exists():
break
# 移动并转换格式
try:
# 简单的移动或复制
if img_path.suffix.lower() == '.jpg' and img_path.parent == image_dir:
# 只是重命名
shutil.move(str(img_path), str(new_path))
else:
# 需要移动到image目录
shutil.copy2(str(img_path), str(new_path))
if img_path.parent != image_dir:
img_path.unlink() # 删除原位置的文件
image_mapping[old_name] = new_name
print(f"{old_name}{new_name}")
except Exception as e:
print(f" ✗ 处理失败 {old_name}: {e}")
# 更新Markdown文件
if image_mapping:
update_markdown_files(notion_dir, image_mapping)
save_mapping(notion_dir, image_mapping)
# 创建图片索引
create_index(order_class, image_dir)
print(f" 完成!处理了 {len(image_mapping)} 张图片")
def generate_standard_name(filename: str, order_class: str, counters: Dict[str, int]) -> str:
"""生成标准化的文件名"""
name = Path(filename).stem.lower()
# 特殊处理文旅订单班的Whisk图片
if order_class == "文旅" and 'whisk' in name:
counters["设计图"] += 1
return f"设计图_{counters['设计图']:02d}.jpg"
# 根据关键词分类
if any(kw in name for kw in ['设计', 'design', 'whisk', 'cad', '3d', '三维', '渲染']):
counters["设计图"] += 1
return f"设计图_{counters['设计图']:02d}.jpg"
elif any(kw in name for kw in ['展示', 'display', 'show', '展台', '展位']):
counters["展示图"] += 1
return f"展示图_{counters['展示图']:02d}.jpg"
elif any(kw in name for kw in ['效果', 'effect', 'render']):
counters["效果图"] += 1
return f"效果图_{counters['效果图']:02d}.jpg"
elif any(kw in name for kw in ['流程', 'flow', 'process', '步骤']):
counters["流程图"] += 1
return f"流程图_{counters['流程图']:02d}.jpg"
elif any(kw in name for kw in ['场景', 'scene', '展会', '博览', '会场', '签到', '试驾', '小米']):
counters["场景图"] += 1
return f"场景图_{counters['场景图']:02d}.jpg"
elif name[0:1].isdigit():
counters["展示图"] += 1
return f"展示图_{counters['展示图']:02d}.jpg"
else:
counters["图片"] += 1
return f"图片_{counters['图片']:02d}.jpg"
def update_markdown_files(notion_dir: Path, image_mapping: Dict[str, str]):
"""更新Markdown文件中的图片引用"""
md_files = list(notion_dir.glob("*.md"))
for md_file in md_files:
try:
content = md_file.read_text(encoding='utf-8')
original_content = content
for old_name, new_name in image_mapping.items():
# 替换各种可能的引用格式
patterns = [
(f"!\\[([^\\]]*)\\]\\({re.escape(old_name)}\\)", f"![\\1](./image/{new_name})"),
(f"!\\[([^\\]]*)\\]\\(\\./image/{re.escape(old_name)}\\)", f"![\\1](./image/{new_name})"),
(f"!\\[([^\\]]*)\\]\\(image/{re.escape(old_name)}\\)", f"![\\1](./image/{new_name})"),
(f'src="{re.escape(old_name)}"', f'src="./image/{new_name}"'),
(f"src='{re.escape(old_name)}'", f"src='./image/{new_name}'"),
]
for pattern, replacement in patterns:
content = re.sub(pattern, replacement, content, flags=re.IGNORECASE)
# 如果内容变化,保存文件
if content != original_content:
md_file.write_text(content, encoding='utf-8')
print(f" ✓ 更新MD: {md_file.name}")
except Exception as e:
print(f" ✗ 更新MD失败 {md_file.name}: {e}")
def save_mapping(notion_dir: Path, image_mapping: Dict[str, str]):
"""保存映射文件"""
mapping_file = notion_dir / "图片映射.json"
with open(mapping_file, 'w', encoding='utf-8') as f:
json.dump(image_mapping, f, ensure_ascii=False, indent=2)
def create_index(order_class: str, image_dir: Path):
"""创建图片索引"""
images = sorted(image_dir.glob("*.jpg"))
if not images:
return
# 分类
categories = {
"设计图": [],
"展示图": [],
"效果图": [],
"流程图": [],
"场景图": [],
"其他": []
}
for img in images:
name = img.name
categorized = False
for cat in ["设计图", "展示图", "效果图", "流程图", "场景图"]:
if name.startswith(cat):
categories[cat].append(name)
categorized = True
break
if not categorized:
categories["其他"].append(name)
# 生成索引内容
content = f"# {order_class}订单班 - 图片索引\n\n"
content += f"**图片总数**: {len(images)}\n"
content += f"**更新时间**: {datetime.now().strftime('%Y-%m-%d %H:%M')}\n\n"
for cat, files in categories.items():
if files:
content += f"## {cat} ({len(files)}张)\n"
for file in files:
content += f"- {file}\n"
content += "\n"
# 保存索引
index_file = image_dir.parent / "图片索引.md"
index_file.write_text(content, encoding='utf-8')
def create_full_backup():
"""创建完整备份"""
BACKUP_DIR.mkdir(parents=True, exist_ok=True)
# 备份所有订单班
for order_dir in BASE_PATH.iterdir():
if order_dir.is_dir() and order_dir.name != '.DS_Store':
backup_order(order_dir, BACKUP_DIR / order_dir.name)
def backup_order(source_dir: Path, backup_dir: Path):
"""备份单个订单班"""
if source_dir.exists():
shutil.copytree(source_dir, backup_dir, dirs_exist_ok=True)
def restore_from_backup():
"""从备份恢复"""
backups_dir = BASE_PATH.parent.parent / "backups"
if not backups_dir.exists():
print("❌ 没有找到备份目录")
return
# 列出所有备份
backups = sorted([d for d in backups_dir.iterdir() if d.is_dir()])
if not backups:
print("❌ 没有找到任何备份")
return
print("可用的备份:")
for i, backup in enumerate(backups[-5:], 1): # 只显示最近5个
print(f"{i}. {backup.name}")
# 选择备份
choice = input("\n请选择要恢复的备份 (输入编号): ")
try:
backup_dir = backups[-5:][int(choice) - 1]
except:
print("❌ 无效的选择")
return
print(f"\n准备从 {backup_dir.name} 恢复...")
confirm = input("确认恢复?这将覆盖当前文件 (y/n): ")
if confirm.lower() != 'y':
print("取消恢复")
return
# 执行恢复
for order_dir in backup_dir.iterdir():
if order_dir.is_dir():
target_dir = BASE_PATH / order_dir.name
if target_dir.exists():
shutil.rmtree(target_dir)
shutil.copytree(order_dir, target_dir)
print(f"✅ 恢复 {order_dir.name}")
print("\n✅ 恢复完成!")
if __name__ == "__main__":
main()