402 lines
13 KiB
Python
402 lines
13 KiB
Python
|
|
#!/usr/bin/env python3
|
|||
|
|
"""
|
|||
|
|
安全的图片整理脚本 - 包含备份和恢复功能
|
|||
|
|
运行前会自动备份,出问题可以恢复
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
import os
|
|||
|
|
import re
|
|||
|
|
import shutil
|
|||
|
|
import json
|
|||
|
|
from pathlib import Path
|
|||
|
|
from datetime import datetime
|
|||
|
|
from typing import Dict, List
|
|||
|
|
import sys
|
|||
|
|
|
|||
|
|
# 配置
|
|||
|
|
BASE_PATH = Path("/Users/xiaoqi/Documents/Dev/Project/2025-09-08_n8nDEMO演示/data/订单班文档资料")
|
|||
|
|
BACKUP_DIR = BASE_PATH.parent.parent / "backups" / f"backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
|
|||
|
|
|
|||
|
|
def main():
|
|||
|
|
"""主函数"""
|
|||
|
|
# 解析参数
|
|||
|
|
if len(sys.argv) > 1:
|
|||
|
|
if sys.argv[1] == '--restore':
|
|||
|
|
restore_from_backup()
|
|||
|
|
return
|
|||
|
|
elif sys.argv[1] == '--test':
|
|||
|
|
# 只处理文旅订单班作为测试
|
|||
|
|
process_test_order()
|
|||
|
|
return
|
|||
|
|
elif sys.argv[1] == '--help':
|
|||
|
|
print_help()
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
# 默认执行全部整理
|
|||
|
|
organize_all_orders()
|
|||
|
|
|
|||
|
|
def print_help():
|
|||
|
|
"""打印帮助信息"""
|
|||
|
|
print("""
|
|||
|
|
图片整理脚本 - 使用说明
|
|||
|
|
========================
|
|||
|
|
|
|||
|
|
命令选项:
|
|||
|
|
python3 safe_organize_images.py # 处理所有订单班(会先备份)
|
|||
|
|
python3 safe_organize_images.py --test # 只测试文旅订单班
|
|||
|
|
python3 safe_organize_images.py --restore # 从最近的备份恢复
|
|||
|
|
python3 safe_organize_images.py --help # 显示帮助
|
|||
|
|
|
|||
|
|
功能说明:
|
|||
|
|
1. 统一所有图片格式为 .jpg
|
|||
|
|
2. 规范化图片命名(设计图_01.jpg、展示图_02.jpg 等)
|
|||
|
|
3. 将散落的图片移动到 notion文稿/image/ 目录
|
|||
|
|
4. 更新 Markdown 文件中的图片引用
|
|||
|
|
5. 生成图片映射文件和索引
|
|||
|
|
|
|||
|
|
安全特性:
|
|||
|
|
- 执行前自动备份所有文件
|
|||
|
|
- 可以随时恢复到备份状态
|
|||
|
|
- 先在文旅订单班测试,确认无误后再处理全部
|
|||
|
|
""")
|
|||
|
|
|
|||
|
|
def process_test_order():
|
|||
|
|
"""只处理文旅订单班作为测试"""
|
|||
|
|
print("=" * 60)
|
|||
|
|
print("🧪 测试模式:只处理文旅订单班")
|
|||
|
|
print("=" * 60)
|
|||
|
|
|
|||
|
|
order_dir = BASE_PATH / "文旅"
|
|||
|
|
if not order_dir.exists():
|
|||
|
|
print("❌ 文旅订单班目录不存在")
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
# 创建测试备份
|
|||
|
|
test_backup = BASE_PATH.parent.parent / "backups" / f"test_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
|
|||
|
|
backup_order(order_dir, test_backup / "文旅")
|
|||
|
|
|
|||
|
|
print(f"✅ 已备份到: {test_backup}")
|
|||
|
|
print("")
|
|||
|
|
|
|||
|
|
# 处理文旅订单班
|
|||
|
|
notion_dir = order_dir / "notion文稿"
|
|||
|
|
if notion_dir.exists():
|
|||
|
|
process_single_order("文旅", notion_dir)
|
|||
|
|
|
|||
|
|
print("\n" + "=" * 60)
|
|||
|
|
print("✅ 测试完成!")
|
|||
|
|
print("请检查 文旅/notion文稿/image/ 目录")
|
|||
|
|
print("如果满意,运行完整脚本处理所有订单班")
|
|||
|
|
print("如果有问题,运行 --restore 恢复")
|
|||
|
|
|
|||
|
|
def organize_all_orders():
|
|||
|
|
"""处理所有订单班"""
|
|||
|
|
print("=" * 60)
|
|||
|
|
print("🔧 开始整理所有订单班图片")
|
|||
|
|
print("=" * 60)
|
|||
|
|
|
|||
|
|
# 先创建完整备份
|
|||
|
|
print("\n📦 创建备份...")
|
|||
|
|
create_full_backup()
|
|||
|
|
print(f"✅ 备份完成: {BACKUP_DIR}")
|
|||
|
|
|
|||
|
|
# 订单班列表
|
|||
|
|
order_classes = [
|
|||
|
|
"文旅", "财经商贸", "食品", "智能开发", "智能制造",
|
|||
|
|
"视觉设计", "交通物流", "土木", "大健康", "能源",
|
|||
|
|
"化工", "环保"
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
processed = 0
|
|||
|
|
errors = []
|
|||
|
|
|
|||
|
|
for order_class in order_classes:
|
|||
|
|
order_dir = BASE_PATH / order_class
|
|||
|
|
if not order_dir.exists():
|
|||
|
|
print(f"⚠️ 跳过 {order_class}:目录不存在")
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
notion_dir = order_dir / "notion文稿"
|
|||
|
|
if not notion_dir.exists():
|
|||
|
|
print(f"⚠️ 跳过 {order_class}:notion文稿目录不存在")
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
process_single_order(order_class, notion_dir)
|
|||
|
|
processed += 1
|
|||
|
|
except Exception as e:
|
|||
|
|
errors.append(f"{order_class}: {str(e)}")
|
|||
|
|
print(f"❌ {order_class} 处理失败: {e}")
|
|||
|
|
|
|||
|
|
# 打印总结
|
|||
|
|
print("\n" + "=" * 60)
|
|||
|
|
print("📊 处理完成")
|
|||
|
|
print("=" * 60)
|
|||
|
|
print(f"成功处理: {processed} 个订单班")
|
|||
|
|
|
|||
|
|
if errors:
|
|||
|
|
print(f"失败: {len(errors)} 个")
|
|||
|
|
for err in errors:
|
|||
|
|
print(f" - {err}")
|
|||
|
|
print(f"\n如需恢复,运行: python3 safe_organize_images.py --restore")
|
|||
|
|
else:
|
|||
|
|
print("✅ 全部成功!")
|
|||
|
|
|
|||
|
|
def process_single_order(order_class: str, notion_dir: Path):
|
|||
|
|
"""处理单个订单班"""
|
|||
|
|
print(f"\n处理 {order_class}...")
|
|||
|
|
|
|||
|
|
# 确保image目录存在
|
|||
|
|
image_dir = notion_dir / "image"
|
|||
|
|
image_dir.mkdir(exist_ok=True)
|
|||
|
|
|
|||
|
|
# 收集所有图片
|
|||
|
|
all_images = []
|
|||
|
|
|
|||
|
|
# 查找所有图片文件
|
|||
|
|
for ext in ['jpg', 'jpeg', 'png', 'gif', 'bmp', 'webp', 'JPG', 'JPEG', 'PNG']:
|
|||
|
|
all_images.extend(notion_dir.glob(f"*.{ext}"))
|
|||
|
|
all_images.extend(notion_dir.glob(f"image/*.{ext}"))
|
|||
|
|
|
|||
|
|
# 去重
|
|||
|
|
all_images = list(set(all_images))
|
|||
|
|
|
|||
|
|
if not all_images:
|
|||
|
|
print(f" 没有找到图片")
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
print(f" 找到 {len(all_images)} 张图片")
|
|||
|
|
|
|||
|
|
# 记录映射关系
|
|||
|
|
image_mapping = {}
|
|||
|
|
|
|||
|
|
# 分类计数器
|
|||
|
|
counters = {
|
|||
|
|
"设计图": 0,
|
|||
|
|
"展示图": 0,
|
|||
|
|
"效果图": 0,
|
|||
|
|
"流程图": 0,
|
|||
|
|
"场景图": 0,
|
|||
|
|
"图片": 0
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
# 处理每张图片
|
|||
|
|
for img_path in sorted(all_images):
|
|||
|
|
old_name = img_path.name
|
|||
|
|
new_name = generate_standard_name(old_name, order_class, counters)
|
|||
|
|
|
|||
|
|
# 目标路径
|
|||
|
|
new_path = image_dir / new_name
|
|||
|
|
|
|||
|
|
# 如果需要移动或重命名
|
|||
|
|
if img_path != new_path:
|
|||
|
|
# 处理文件名冲突
|
|||
|
|
if new_path.exists():
|
|||
|
|
# 生成唯一名称
|
|||
|
|
base_name = new_name.rsplit('.', 1)[0]
|
|||
|
|
for i in range(2, 100):
|
|||
|
|
new_name = f"{base_name}_{i}.jpg"
|
|||
|
|
new_path = image_dir / new_name
|
|||
|
|
if not new_path.exists():
|
|||
|
|
break
|
|||
|
|
|
|||
|
|
# 移动并转换格式
|
|||
|
|
try:
|
|||
|
|
# 简单的移动或复制
|
|||
|
|
if img_path.suffix.lower() == '.jpg' and img_path.parent == image_dir:
|
|||
|
|
# 只是重命名
|
|||
|
|
shutil.move(str(img_path), str(new_path))
|
|||
|
|
else:
|
|||
|
|
# 需要移动到image目录
|
|||
|
|
shutil.copy2(str(img_path), str(new_path))
|
|||
|
|
if img_path.parent != image_dir:
|
|||
|
|
img_path.unlink() # 删除原位置的文件
|
|||
|
|
|
|||
|
|
image_mapping[old_name] = new_name
|
|||
|
|
print(f" ✓ {old_name} → {new_name}")
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f" ✗ 处理失败 {old_name}: {e}")
|
|||
|
|
|
|||
|
|
# 更新Markdown文件
|
|||
|
|
if image_mapping:
|
|||
|
|
update_markdown_files(notion_dir, image_mapping)
|
|||
|
|
save_mapping(notion_dir, image_mapping)
|
|||
|
|
|
|||
|
|
# 创建图片索引
|
|||
|
|
create_index(order_class, image_dir)
|
|||
|
|
|
|||
|
|
print(f" 完成!处理了 {len(image_mapping)} 张图片")
|
|||
|
|
|
|||
|
|
def generate_standard_name(filename: str, order_class: str, counters: Dict[str, int]) -> str:
|
|||
|
|
"""生成标准化的文件名"""
|
|||
|
|
name = Path(filename).stem.lower()
|
|||
|
|
|
|||
|
|
# 特殊处理文旅订单班的Whisk图片
|
|||
|
|
if order_class == "文旅" and 'whisk' in name:
|
|||
|
|
counters["设计图"] += 1
|
|||
|
|
return f"设计图_{counters['设计图']:02d}.jpg"
|
|||
|
|
|
|||
|
|
# 根据关键词分类
|
|||
|
|
if any(kw in name for kw in ['设计', 'design', 'whisk', 'cad', '3d', '三维', '渲染']):
|
|||
|
|
counters["设计图"] += 1
|
|||
|
|
return f"设计图_{counters['设计图']:02d}.jpg"
|
|||
|
|
elif any(kw in name for kw in ['展示', 'display', 'show', '展台', '展位']):
|
|||
|
|
counters["展示图"] += 1
|
|||
|
|
return f"展示图_{counters['展示图']:02d}.jpg"
|
|||
|
|
elif any(kw in name for kw in ['效果', 'effect', 'render']):
|
|||
|
|
counters["效果图"] += 1
|
|||
|
|
return f"效果图_{counters['效果图']:02d}.jpg"
|
|||
|
|
elif any(kw in name for kw in ['流程', 'flow', 'process', '步骤']):
|
|||
|
|
counters["流程图"] += 1
|
|||
|
|
return f"流程图_{counters['流程图']:02d}.jpg"
|
|||
|
|
elif any(kw in name for kw in ['场景', 'scene', '展会', '博览', '会场', '签到', '试驾', '小米']):
|
|||
|
|
counters["场景图"] += 1
|
|||
|
|
return f"场景图_{counters['场景图']:02d}.jpg"
|
|||
|
|
elif name[0:1].isdigit():
|
|||
|
|
counters["展示图"] += 1
|
|||
|
|
return f"展示图_{counters['展示图']:02d}.jpg"
|
|||
|
|
else:
|
|||
|
|
counters["图片"] += 1
|
|||
|
|
return f"图片_{counters['图片']:02d}.jpg"
|
|||
|
|
|
|||
|
|
def update_markdown_files(notion_dir: Path, image_mapping: Dict[str, str]):
|
|||
|
|
"""更新Markdown文件中的图片引用"""
|
|||
|
|
md_files = list(notion_dir.glob("*.md"))
|
|||
|
|
|
|||
|
|
for md_file in md_files:
|
|||
|
|
try:
|
|||
|
|
content = md_file.read_text(encoding='utf-8')
|
|||
|
|
original_content = content
|
|||
|
|
|
|||
|
|
for old_name, new_name in image_mapping.items():
|
|||
|
|
# 替换各种可能的引用格式
|
|||
|
|
patterns = [
|
|||
|
|
(f"!\\[([^\\]]*)\\]\\({re.escape(old_name)}\\)", f""),
|
|||
|
|
(f"!\\[([^\\]]*)\\]\\(\\./image/{re.escape(old_name)}\\)", f""),
|
|||
|
|
(f"!\\[([^\\]]*)\\]\\(image/{re.escape(old_name)}\\)", f""),
|
|||
|
|
(f'src="{re.escape(old_name)}"', f'src="./image/{new_name}"'),
|
|||
|
|
(f"src='{re.escape(old_name)}'", f"src='./image/{new_name}'"),
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
for pattern, replacement in patterns:
|
|||
|
|
content = re.sub(pattern, replacement, content, flags=re.IGNORECASE)
|
|||
|
|
|
|||
|
|
# 如果内容变化,保存文件
|
|||
|
|
if content != original_content:
|
|||
|
|
md_file.write_text(content, encoding='utf-8')
|
|||
|
|
print(f" ✓ 更新MD: {md_file.name}")
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f" ✗ 更新MD失败 {md_file.name}: {e}")
|
|||
|
|
|
|||
|
|
def save_mapping(notion_dir: Path, image_mapping: Dict[str, str]):
|
|||
|
|
"""保存映射文件"""
|
|||
|
|
mapping_file = notion_dir / "图片映射.json"
|
|||
|
|
with open(mapping_file, 'w', encoding='utf-8') as f:
|
|||
|
|
json.dump(image_mapping, f, ensure_ascii=False, indent=2)
|
|||
|
|
|
|||
|
|
def create_index(order_class: str, image_dir: Path):
|
|||
|
|
"""创建图片索引"""
|
|||
|
|
images = sorted(image_dir.glob("*.jpg"))
|
|||
|
|
|
|||
|
|
if not images:
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
# 分类
|
|||
|
|
categories = {
|
|||
|
|
"设计图": [],
|
|||
|
|
"展示图": [],
|
|||
|
|
"效果图": [],
|
|||
|
|
"流程图": [],
|
|||
|
|
"场景图": [],
|
|||
|
|
"其他": []
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
for img in images:
|
|||
|
|
name = img.name
|
|||
|
|
categorized = False
|
|||
|
|
for cat in ["设计图", "展示图", "效果图", "流程图", "场景图"]:
|
|||
|
|
if name.startswith(cat):
|
|||
|
|
categories[cat].append(name)
|
|||
|
|
categorized = True
|
|||
|
|
break
|
|||
|
|
if not categorized:
|
|||
|
|
categories["其他"].append(name)
|
|||
|
|
|
|||
|
|
# 生成索引内容
|
|||
|
|
content = f"# {order_class}订单班 - 图片索引\n\n"
|
|||
|
|
content += f"**图片总数**: {len(images)} 张\n"
|
|||
|
|
content += f"**更新时间**: {datetime.now().strftime('%Y-%m-%d %H:%M')}\n\n"
|
|||
|
|
|
|||
|
|
for cat, files in categories.items():
|
|||
|
|
if files:
|
|||
|
|
content += f"## {cat} ({len(files)}张)\n"
|
|||
|
|
for file in files:
|
|||
|
|
content += f"- {file}\n"
|
|||
|
|
content += "\n"
|
|||
|
|
|
|||
|
|
# 保存索引
|
|||
|
|
index_file = image_dir.parent / "图片索引.md"
|
|||
|
|
index_file.write_text(content, encoding='utf-8')
|
|||
|
|
|
|||
|
|
def create_full_backup():
|
|||
|
|
"""创建完整备份"""
|
|||
|
|
BACKUP_DIR.mkdir(parents=True, exist_ok=True)
|
|||
|
|
|
|||
|
|
# 备份所有订单班
|
|||
|
|
for order_dir in BASE_PATH.iterdir():
|
|||
|
|
if order_dir.is_dir() and order_dir.name != '.DS_Store':
|
|||
|
|
backup_order(order_dir, BACKUP_DIR / order_dir.name)
|
|||
|
|
|
|||
|
|
def backup_order(source_dir: Path, backup_dir: Path):
|
|||
|
|
"""备份单个订单班"""
|
|||
|
|
if source_dir.exists():
|
|||
|
|
shutil.copytree(source_dir, backup_dir, dirs_exist_ok=True)
|
|||
|
|
|
|||
|
|
def restore_from_backup():
|
|||
|
|
"""从备份恢复"""
|
|||
|
|
backups_dir = BASE_PATH.parent.parent / "backups"
|
|||
|
|
|
|||
|
|
if not backups_dir.exists():
|
|||
|
|
print("❌ 没有找到备份目录")
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
# 列出所有备份
|
|||
|
|
backups = sorted([d for d in backups_dir.iterdir() if d.is_dir()])
|
|||
|
|
|
|||
|
|
if not backups:
|
|||
|
|
print("❌ 没有找到任何备份")
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
print("可用的备份:")
|
|||
|
|
for i, backup in enumerate(backups[-5:], 1): # 只显示最近5个
|
|||
|
|
print(f"{i}. {backup.name}")
|
|||
|
|
|
|||
|
|
# 选择备份
|
|||
|
|
choice = input("\n请选择要恢复的备份 (输入编号): ")
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
backup_dir = backups[-5:][int(choice) - 1]
|
|||
|
|
except:
|
|||
|
|
print("❌ 无效的选择")
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
print(f"\n准备从 {backup_dir.name} 恢复...")
|
|||
|
|
confirm = input("确认恢复?这将覆盖当前文件 (y/n): ")
|
|||
|
|
|
|||
|
|
if confirm.lower() != 'y':
|
|||
|
|
print("取消恢复")
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
# 执行恢复
|
|||
|
|
for order_dir in backup_dir.iterdir():
|
|||
|
|
if order_dir.is_dir():
|
|||
|
|
target_dir = BASE_PATH / order_dir.name
|
|||
|
|
if target_dir.exists():
|
|||
|
|
shutil.rmtree(target_dir)
|
|||
|
|
shutil.copytree(order_dir, target_dir)
|
|||
|
|
print(f"✅ 恢复 {order_dir.name}")
|
|||
|
|
|
|||
|
|
print("\n✅ 恢复完成!")
|
|||
|
|
|
|||
|
|
if __name__ == "__main__":
|
|||
|
|
main()
|