Files
Agent-n8n/scripts/safe_organize_images.py
Yep_Q 67f5dfbe50 feat: 实现多订单班支持系统
主要功能:
- 修改RequirementModal支持12个订单班选择
- 添加OrderClassIconMap图标映射组件
- Store中添加selectedOrderClass状态管理
- WorkflowPage支持传递orderClass参数
- web_result添加URL参数切换功能
- 创建order-class-handler.js动态处理页面主题

技术改进:
- 创建软链接关联订单班数据目录
- 生成wenlu.json和food.json数据结构
- 删除重复的web_result目录
- 添加测试页面test-order-class.html

影响范围:
- 展会策划系统现支持12个订单班
- 结果展示页面自动适配不同订单班主题
- 用户可选择不同行业生成对应方案

🤖 Generated with Claude Code

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-29 10:02:15 +08:00

402 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
安全的图片整理脚本 - 包含备份和恢复功能
运行前会自动备份,出问题可以恢复
"""
import os
import re
import shutil
import json
from pathlib import Path
from datetime import datetime
from typing import Dict, List
import sys
# 配置
BASE_PATH = Path("/Users/xiaoqi/Documents/Dev/Project/2025-09-08_n8nDEMO演示/data/订单班文档资料")
BACKUP_DIR = BASE_PATH.parent.parent / "backups" / f"backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
def main():
"""主函数"""
# 解析参数
if len(sys.argv) > 1:
if sys.argv[1] == '--restore':
restore_from_backup()
return
elif sys.argv[1] == '--test':
# 只处理文旅订单班作为测试
process_test_order()
return
elif sys.argv[1] == '--help':
print_help()
return
# 默认执行全部整理
organize_all_orders()
def print_help():
"""打印帮助信息"""
print("""
图片整理脚本 - 使用说明
========================
命令选项:
python3 safe_organize_images.py # 处理所有订单班(会先备份)
python3 safe_organize_images.py --test # 只测试文旅订单班
python3 safe_organize_images.py --restore # 从最近的备份恢复
python3 safe_organize_images.py --help # 显示帮助
功能说明:
1. 统一所有图片格式为 .jpg
2. 规范化图片命名设计图_01.jpg、展示图_02.jpg 等)
3. 将散落的图片移动到 notion文稿/image/ 目录
4. 更新 Markdown 文件中的图片引用
5. 生成图片映射文件和索引
安全特性:
- 执行前自动备份所有文件
- 可以随时恢复到备份状态
- 先在文旅订单班测试,确认无误后再处理全部
""")
def process_test_order():
"""只处理文旅订单班作为测试"""
print("=" * 60)
print("🧪 测试模式:只处理文旅订单班")
print("=" * 60)
order_dir = BASE_PATH / "文旅"
if not order_dir.exists():
print("❌ 文旅订单班目录不存在")
return
# 创建测试备份
test_backup = BASE_PATH.parent.parent / "backups" / f"test_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
backup_order(order_dir, test_backup / "文旅")
print(f"✅ 已备份到: {test_backup}")
print("")
# 处理文旅订单班
notion_dir = order_dir / "notion文稿"
if notion_dir.exists():
process_single_order("文旅", notion_dir)
print("\n" + "=" * 60)
print("✅ 测试完成!")
print("请检查 文旅/notion文稿/image/ 目录")
print("如果满意,运行完整脚本处理所有订单班")
print("如果有问题,运行 --restore 恢复")
def organize_all_orders():
"""处理所有订单班"""
print("=" * 60)
print("🔧 开始整理所有订单班图片")
print("=" * 60)
# 先创建完整备份
print("\n📦 创建备份...")
create_full_backup()
print(f"✅ 备份完成: {BACKUP_DIR}")
# 订单班列表
order_classes = [
"文旅", "财经商贸", "食品", "智能开发", "智能制造",
"视觉设计", "交通物流", "土木", "大健康", "能源",
"化工", "环保"
]
processed = 0
errors = []
for order_class in order_classes:
order_dir = BASE_PATH / order_class
if not order_dir.exists():
print(f"⚠️ 跳过 {order_class}:目录不存在")
continue
notion_dir = order_dir / "notion文稿"
if not notion_dir.exists():
print(f"⚠️ 跳过 {order_class}notion文稿目录不存在")
continue
try:
process_single_order(order_class, notion_dir)
processed += 1
except Exception as e:
errors.append(f"{order_class}: {str(e)}")
print(f"{order_class} 处理失败: {e}")
# 打印总结
print("\n" + "=" * 60)
print("📊 处理完成")
print("=" * 60)
print(f"成功处理: {processed} 个订单班")
if errors:
print(f"失败: {len(errors)}")
for err in errors:
print(f" - {err}")
print(f"\n如需恢复,运行: python3 safe_organize_images.py --restore")
else:
print("✅ 全部成功!")
def process_single_order(order_class: str, notion_dir: Path):
"""处理单个订单班"""
print(f"\n处理 {order_class}...")
# 确保image目录存在
image_dir = notion_dir / "image"
image_dir.mkdir(exist_ok=True)
# 收集所有图片
all_images = []
# 查找所有图片文件
for ext in ['jpg', 'jpeg', 'png', 'gif', 'bmp', 'webp', 'JPG', 'JPEG', 'PNG']:
all_images.extend(notion_dir.glob(f"*.{ext}"))
all_images.extend(notion_dir.glob(f"image/*.{ext}"))
# 去重
all_images = list(set(all_images))
if not all_images:
print(f" 没有找到图片")
return
print(f" 找到 {len(all_images)} 张图片")
# 记录映射关系
image_mapping = {}
# 分类计数器
counters = {
"设计图": 0,
"展示图": 0,
"效果图": 0,
"流程图": 0,
"场景图": 0,
"图片": 0
}
# 处理每张图片
for img_path in sorted(all_images):
old_name = img_path.name
new_name = generate_standard_name(old_name, order_class, counters)
# 目标路径
new_path = image_dir / new_name
# 如果需要移动或重命名
if img_path != new_path:
# 处理文件名冲突
if new_path.exists():
# 生成唯一名称
base_name = new_name.rsplit('.', 1)[0]
for i in range(2, 100):
new_name = f"{base_name}_{i}.jpg"
new_path = image_dir / new_name
if not new_path.exists():
break
# 移动并转换格式
try:
# 简单的移动或复制
if img_path.suffix.lower() == '.jpg' and img_path.parent == image_dir:
# 只是重命名
shutil.move(str(img_path), str(new_path))
else:
# 需要移动到image目录
shutil.copy2(str(img_path), str(new_path))
if img_path.parent != image_dir:
img_path.unlink() # 删除原位置的文件
image_mapping[old_name] = new_name
print(f"{old_name}{new_name}")
except Exception as e:
print(f" ✗ 处理失败 {old_name}: {e}")
# 更新Markdown文件
if image_mapping:
update_markdown_files(notion_dir, image_mapping)
save_mapping(notion_dir, image_mapping)
# 创建图片索引
create_index(order_class, image_dir)
print(f" 完成!处理了 {len(image_mapping)} 张图片")
def generate_standard_name(filename: str, order_class: str, counters: Dict[str, int]) -> str:
"""生成标准化的文件名"""
name = Path(filename).stem.lower()
# 特殊处理文旅订单班的Whisk图片
if order_class == "文旅" and 'whisk' in name:
counters["设计图"] += 1
return f"设计图_{counters['设计图']:02d}.jpg"
# 根据关键词分类
if any(kw in name for kw in ['设计', 'design', 'whisk', 'cad', '3d', '三维', '渲染']):
counters["设计图"] += 1
return f"设计图_{counters['设计图']:02d}.jpg"
elif any(kw in name for kw in ['展示', 'display', 'show', '展台', '展位']):
counters["展示图"] += 1
return f"展示图_{counters['展示图']:02d}.jpg"
elif any(kw in name for kw in ['效果', 'effect', 'render']):
counters["效果图"] += 1
return f"效果图_{counters['效果图']:02d}.jpg"
elif any(kw in name for kw in ['流程', 'flow', 'process', '步骤']):
counters["流程图"] += 1
return f"流程图_{counters['流程图']:02d}.jpg"
elif any(kw in name for kw in ['场景', 'scene', '展会', '博览', '会场', '签到', '试驾', '小米']):
counters["场景图"] += 1
return f"场景图_{counters['场景图']:02d}.jpg"
elif name[0:1].isdigit():
counters["展示图"] += 1
return f"展示图_{counters['展示图']:02d}.jpg"
else:
counters["图片"] += 1
return f"图片_{counters['图片']:02d}.jpg"
def update_markdown_files(notion_dir: Path, image_mapping: Dict[str, str]):
"""更新Markdown文件中的图片引用"""
md_files = list(notion_dir.glob("*.md"))
for md_file in md_files:
try:
content = md_file.read_text(encoding='utf-8')
original_content = content
for old_name, new_name in image_mapping.items():
# 替换各种可能的引用格式
patterns = [
(f"!\\[([^\\]]*)\\]\\({re.escape(old_name)}\\)", f"![\\1](./image/{new_name})"),
(f"!\\[([^\\]]*)\\]\\(\\./image/{re.escape(old_name)}\\)", f"![\\1](./image/{new_name})"),
(f"!\\[([^\\]]*)\\]\\(image/{re.escape(old_name)}\\)", f"![\\1](./image/{new_name})"),
(f'src="{re.escape(old_name)}"', f'src="./image/{new_name}"'),
(f"src='{re.escape(old_name)}'", f"src='./image/{new_name}'"),
]
for pattern, replacement in patterns:
content = re.sub(pattern, replacement, content, flags=re.IGNORECASE)
# 如果内容变化,保存文件
if content != original_content:
md_file.write_text(content, encoding='utf-8')
print(f" ✓ 更新MD: {md_file.name}")
except Exception as e:
print(f" ✗ 更新MD失败 {md_file.name}: {e}")
def save_mapping(notion_dir: Path, image_mapping: Dict[str, str]):
"""保存映射文件"""
mapping_file = notion_dir / "图片映射.json"
with open(mapping_file, 'w', encoding='utf-8') as f:
json.dump(image_mapping, f, ensure_ascii=False, indent=2)
def create_index(order_class: str, image_dir: Path):
"""创建图片索引"""
images = sorted(image_dir.glob("*.jpg"))
if not images:
return
# 分类
categories = {
"设计图": [],
"展示图": [],
"效果图": [],
"流程图": [],
"场景图": [],
"其他": []
}
for img in images:
name = img.name
categorized = False
for cat in ["设计图", "展示图", "效果图", "流程图", "场景图"]:
if name.startswith(cat):
categories[cat].append(name)
categorized = True
break
if not categorized:
categories["其他"].append(name)
# 生成索引内容
content = f"# {order_class}订单班 - 图片索引\n\n"
content += f"**图片总数**: {len(images)}\n"
content += f"**更新时间**: {datetime.now().strftime('%Y-%m-%d %H:%M')}\n\n"
for cat, files in categories.items():
if files:
content += f"## {cat} ({len(files)}张)\n"
for file in files:
content += f"- {file}\n"
content += "\n"
# 保存索引
index_file = image_dir.parent / "图片索引.md"
index_file.write_text(content, encoding='utf-8')
def create_full_backup():
"""创建完整备份"""
BACKUP_DIR.mkdir(parents=True, exist_ok=True)
# 备份所有订单班
for order_dir in BASE_PATH.iterdir():
if order_dir.is_dir() and order_dir.name != '.DS_Store':
backup_order(order_dir, BACKUP_DIR / order_dir.name)
def backup_order(source_dir: Path, backup_dir: Path):
"""备份单个订单班"""
if source_dir.exists():
shutil.copytree(source_dir, backup_dir, dirs_exist_ok=True)
def restore_from_backup():
"""从备份恢复"""
backups_dir = BASE_PATH.parent.parent / "backups"
if not backups_dir.exists():
print("❌ 没有找到备份目录")
return
# 列出所有备份
backups = sorted([d for d in backups_dir.iterdir() if d.is_dir()])
if not backups:
print("❌ 没有找到任何备份")
return
print("可用的备份:")
for i, backup in enumerate(backups[-5:], 1): # 只显示最近5个
print(f"{i}. {backup.name}")
# 选择备份
choice = input("\n请选择要恢复的备份 (输入编号): ")
try:
backup_dir = backups[-5:][int(choice) - 1]
except:
print("❌ 无效的选择")
return
print(f"\n准备从 {backup_dir.name} 恢复...")
confirm = input("确认恢复?这将覆盖当前文件 (y/n): ")
if confirm.lower() != 'y':
print("取消恢复")
return
# 执行恢复
for order_dir in backup_dir.iterdir():
if order_dir.is_dir():
target_dir = BASE_PATH / order_dir.name
if target_dir.exists():
shutil.rmtree(target_dir)
shutil.copytree(order_dir, target_dir)
print(f"✅ 恢复 {order_dir.name}")
print("\n✅ 恢复完成!")
if __name__ == "__main__":
main()