298 lines
11 KiB
Python
298 lines
11 KiB
Python
|
|
#!/usr/bin/env python3
|
|||
|
|
"""
|
|||
|
|
恢复图片映射关系脚本
|
|||
|
|
通过分析备份文件和当前文件,建立原始文件名到新文件名的映射
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
import os
|
|||
|
|
import re
|
|||
|
|
import json
|
|||
|
|
from pathlib import Path
|
|||
|
|
from urllib.parse import unquote, quote
|
|||
|
|
from typing import Dict, List, Tuple
|
|||
|
|
|
|||
|
|
class ImageMappingRestorer:
|
|||
|
|
def __init__(self, base_path: Path):
|
|||
|
|
self.base_path = base_path
|
|||
|
|
self.data_path = base_path / "data/订单班文档资料"
|
|||
|
|
|
|||
|
|
def extract_image_refs(self, content: str) -> List[str]:
|
|||
|
|
"""从内容中提取所有图片引用"""
|
|||
|
|
refs = []
|
|||
|
|
# 匹配  格式
|
|||
|
|
pattern = r'!\[.*?\]\((.*?)\)'
|
|||
|
|
matches = re.finditer(pattern, content)
|
|||
|
|
for match in matches:
|
|||
|
|
path = match.group(1)
|
|||
|
|
# 跳过外部链接
|
|||
|
|
if not path.startswith('http'):
|
|||
|
|
refs.append(path)
|
|||
|
|
return refs
|
|||
|
|
|
|||
|
|
def normalize_path(self, path: str) -> str:
|
|||
|
|
"""标准化路径,提取文件名"""
|
|||
|
|
# 解码URL编码
|
|||
|
|
path = unquote(path)
|
|||
|
|
|
|||
|
|
# 移除路径前缀
|
|||
|
|
if path.startswith('./image/'):
|
|||
|
|
return path[8:]
|
|||
|
|
elif path.startswith('image/'):
|
|||
|
|
return path[6:]
|
|||
|
|
elif path.startswith('./'):
|
|||
|
|
return path[2:]
|
|||
|
|
else:
|
|||
|
|
return Path(path).name
|
|||
|
|
|
|||
|
|
def analyze_order_class(self, order_dir: Path) -> Dict:
|
|||
|
|
"""分析一个订单班,建立映射关系"""
|
|||
|
|
result = {
|
|||
|
|
"name": order_dir.name,
|
|||
|
|
"mappings": {},
|
|||
|
|
"stats": {
|
|||
|
|
"total_images": 0,
|
|||
|
|
"mapped": 0,
|
|||
|
|
"unmapped": 0,
|
|||
|
|
"conflicts": []
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
notion_dir = order_dir / "notion文稿"
|
|||
|
|
if not notion_dir.exists():
|
|||
|
|
return result
|
|||
|
|
|
|||
|
|
# 1. 获取实际存在的图片文件
|
|||
|
|
image_dir = notion_dir / "image"
|
|||
|
|
actual_images = []
|
|||
|
|
if image_dir.exists():
|
|||
|
|
for img in image_dir.iterdir():
|
|||
|
|
if img.suffix.lower() in ['.jpg', '.jpeg', '.png', '.gif', '.bmp']:
|
|||
|
|
actual_images.append(img.name)
|
|||
|
|
result["stats"]["total_images"] = len(actual_images)
|
|||
|
|
|
|||
|
|
# 2. 分析备份文件,获取原始引用
|
|||
|
|
original_refs = set()
|
|||
|
|
for backup_file in notion_dir.glob("*.md.bak*"):
|
|||
|
|
try:
|
|||
|
|
content = backup_file.read_text(encoding='utf-8')
|
|||
|
|
refs = self.extract_image_refs(content)
|
|||
|
|
for ref in refs:
|
|||
|
|
original_refs.add(self.normalize_path(ref))
|
|||
|
|
except:
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
# 3. 分析当前文件,获取新引用
|
|||
|
|
current_refs = {}
|
|||
|
|
for md_file in notion_dir.glob("*.md"):
|
|||
|
|
if not md_file.name.endswith('.bak'):
|
|||
|
|
try:
|
|||
|
|
content = md_file.read_text(encoding='utf-8')
|
|||
|
|
refs = self.extract_image_refs(content)
|
|||
|
|
for ref in refs:
|
|||
|
|
normalized = self.normalize_path(ref)
|
|||
|
|
if normalized not in current_refs:
|
|||
|
|
current_refs[normalized] = ref
|
|||
|
|
except:
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
# 4. 建立映射关系
|
|||
|
|
# 策略:通过文件出现顺序和命名模式匹配
|
|||
|
|
|
|||
|
|
# 获取有序的原始文件列表和新文件列表
|
|||
|
|
original_list = sorted(original_refs)
|
|||
|
|
current_list = sorted([f for f in current_refs.keys() if f in actual_images])
|
|||
|
|
|
|||
|
|
# 分类文件
|
|||
|
|
categorized = {
|
|||
|
|
"展示图": [],
|
|||
|
|
"设计图": [],
|
|||
|
|
"场景图": [],
|
|||
|
|
"图片": [],
|
|||
|
|
"分镜设计": [],
|
|||
|
|
"其他": []
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
for img in actual_images:
|
|||
|
|
if img.startswith("展示图_"):
|
|||
|
|
categorized["展示图"].append(img)
|
|||
|
|
elif img.startswith("设计图_"):
|
|||
|
|
categorized["设计图"].append(img)
|
|||
|
|
elif img.startswith("场景图_"):
|
|||
|
|
categorized["场景图"].append(img)
|
|||
|
|
elif img.startswith("图片_"):
|
|||
|
|
categorized["图片"].append(img)
|
|||
|
|
elif "分镜设计" in img:
|
|||
|
|
categorized["分镜设计"].append(img)
|
|||
|
|
else:
|
|||
|
|
categorized["其他"].append(img)
|
|||
|
|
|
|||
|
|
# 对每个类别的文件进行排序
|
|||
|
|
for category in categorized:
|
|||
|
|
categorized[category].sort()
|
|||
|
|
|
|||
|
|
# 5. 智能映射
|
|||
|
|
# 根据原始文件名的特征映射到新文件名
|
|||
|
|
for orig_name in original_list:
|
|||
|
|
if orig_name in actual_images:
|
|||
|
|
# 文件名未改变
|
|||
|
|
result["mappings"][orig_name] = orig_name
|
|||
|
|
result["stats"]["mapped"] += 1
|
|||
|
|
else:
|
|||
|
|
# 尝试智能匹配
|
|||
|
|
mapped = None
|
|||
|
|
|
|||
|
|
# 检查是否包含关键词
|
|||
|
|
if "展示" in orig_name or "display" in orig_name.lower():
|
|||
|
|
if categorized["展示图"]:
|
|||
|
|
mapped = categorized["展示图"].pop(0)
|
|||
|
|
elif "设计" in orig_name or "design" in orig_name.lower():
|
|||
|
|
if categorized["设计图"]:
|
|||
|
|
mapped = categorized["设计图"].pop(0)
|
|||
|
|
elif "场景" in orig_name or "scene" in orig_name.lower():
|
|||
|
|
if categorized["场景图"]:
|
|||
|
|
mapped = categorized["场景图"].pop(0)
|
|||
|
|
elif "分镜" in orig_name:
|
|||
|
|
if categorized["分镜设计"]:
|
|||
|
|
mapped = categorized["分镜设计"].pop(0)
|
|||
|
|
elif any(keyword in orig_name for keyword in ["一、", "二、", "三、", "四、", "五、", "六、"]):
|
|||
|
|
# 章节标题图片,通常是"图片_"类别
|
|||
|
|
if categorized["图片"]:
|
|||
|
|
mapped = categorized["图片"].pop(0)
|
|||
|
|
|
|||
|
|
# 如果没有匹配到特定类别,使用通用图片
|
|||
|
|
if not mapped and categorized["图片"]:
|
|||
|
|
mapped = categorized["图片"].pop(0)
|
|||
|
|
|
|||
|
|
if mapped:
|
|||
|
|
result["mappings"][orig_name] = mapped
|
|||
|
|
result["stats"]["mapped"] += 1
|
|||
|
|
else:
|
|||
|
|
result["stats"]["unmapped"] += 1
|
|||
|
|
|
|||
|
|
return result
|
|||
|
|
|
|||
|
|
def save_mapping(self, order_dir: Path, mappings: Dict):
|
|||
|
|
"""保存映射关系到JSON文件"""
|
|||
|
|
mapping_file = order_dir / "notion文稿" / "image_mapping.json"
|
|||
|
|
|
|||
|
|
# 如果文件已存在,合并映射
|
|||
|
|
if mapping_file.exists():
|
|||
|
|
with open(mapping_file, 'r', encoding='utf-8') as f:
|
|||
|
|
existing = json.load(f)
|
|||
|
|
existing.update(mappings)
|
|||
|
|
mappings = existing
|
|||
|
|
|
|||
|
|
# 保存映射
|
|||
|
|
with open(mapping_file, 'w', encoding='utf-8') as f:
|
|||
|
|
json.dump(mappings, f, ensure_ascii=False, indent=2)
|
|||
|
|
|
|||
|
|
print(f" 💾 保存了 {len(mappings)} 个映射关系到 image_mapping.json")
|
|||
|
|
|
|||
|
|
def apply_mapping(self, order_dir: Path, mappings: Dict) -> int:
|
|||
|
|
"""应用映射关系修复Markdown文件"""
|
|||
|
|
fixed_count = 0
|
|||
|
|
notion_dir = order_dir / "notion文稿"
|
|||
|
|
|
|||
|
|
for md_file in notion_dir.glob("*.md"):
|
|||
|
|
if md_file.name.endswith('.bak') or md_file.name == "图片索引.md":
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
content = md_file.read_text(encoding='utf-8')
|
|||
|
|
original_content = content
|
|||
|
|
|
|||
|
|
def replace_ref(match):
|
|||
|
|
nonlocal fixed_count
|
|||
|
|
alt_text = match.group(1)
|
|||
|
|
img_path = match.group(2)
|
|||
|
|
|
|||
|
|
# 跳过外部链接
|
|||
|
|
if img_path.startswith('http'):
|
|||
|
|
return match.group(0)
|
|||
|
|
|
|||
|
|
# 标准化路径
|
|||
|
|
img_name = self.normalize_path(img_path)
|
|||
|
|
|
|||
|
|
# 查找映射
|
|||
|
|
if img_name in mappings:
|
|||
|
|
mapped_name = mappings[img_name]
|
|||
|
|
if mapped_name != img_name:
|
|||
|
|
fixed_count += 1
|
|||
|
|
print(f" {img_name} → {mapped_name}")
|
|||
|
|
return f""
|
|||
|
|
|
|||
|
|
# 保持原样但标准化路径
|
|||
|
|
if not img_path.startswith('image/'):
|
|||
|
|
return f""
|
|||
|
|
|
|||
|
|
return match.group(0)
|
|||
|
|
|
|||
|
|
# 替换所有引用
|
|||
|
|
content = re.sub(r'!\[(.*?)\]\((.*?)\)', replace_ref, content)
|
|||
|
|
|
|||
|
|
# 保存修改
|
|||
|
|
if content != original_content:
|
|||
|
|
# 创建备份
|
|||
|
|
backup_path = md_file.with_suffix('.md.restored_backup')
|
|||
|
|
if not backup_path.exists():
|
|||
|
|
md_file.rename(backup_path)
|
|||
|
|
|
|||
|
|
md_file.write_text(content, encoding='utf-8')
|
|||
|
|
print(f" ✅ 修复了 {md_file.name}")
|
|||
|
|
|
|||
|
|
return fixed_count
|
|||
|
|
|
|||
|
|
def restore_all(self, order_classes: List[str] = None):
|
|||
|
|
"""恢复所有或指定订单班的映射"""
|
|||
|
|
dirs_to_process = []
|
|||
|
|
|
|||
|
|
if order_classes:
|
|||
|
|
for name in order_classes:
|
|||
|
|
order_dir = self.data_path / name
|
|||
|
|
if order_dir.exists():
|
|||
|
|
dirs_to_process.append(order_dir)
|
|||
|
|
else:
|
|||
|
|
dirs_to_process = [d for d in self.data_path.iterdir()
|
|||
|
|
if d.is_dir() and not d.name.startswith('.')]
|
|||
|
|
|
|||
|
|
print("=" * 60)
|
|||
|
|
print("图片映射关系恢复工具")
|
|||
|
|
print("=" * 60)
|
|||
|
|
|
|||
|
|
for order_dir in dirs_to_process:
|
|||
|
|
print(f"\n处理 {order_dir.name}...")
|
|||
|
|
|
|||
|
|
# 分析并建立映射
|
|||
|
|
result = self.analyze_order_class(order_dir)
|
|||
|
|
|
|||
|
|
if result["mappings"]:
|
|||
|
|
print(f" 📊 找到 {len(result['mappings'])} 个映射关系")
|
|||
|
|
print(f" 成功映射: {result['stats']['mapped']}")
|
|||
|
|
print(f" 未映射: {result['stats']['unmapped']}")
|
|||
|
|
|
|||
|
|
# 保存映射
|
|||
|
|
self.save_mapping(order_dir, result["mappings"])
|
|||
|
|
|
|||
|
|
# 应用映射
|
|||
|
|
fixed = self.apply_mapping(order_dir, result["mappings"])
|
|||
|
|
print(f" ✅ 修复了 {fixed} 个引用")
|
|||
|
|
else:
|
|||
|
|
print(" ℹ️ 无需建立映射")
|
|||
|
|
|
|||
|
|
def main():
|
|||
|
|
"""主函数"""
|
|||
|
|
import sys
|
|||
|
|
|
|||
|
|
base_path = Path("/Users/xiaoqi/Documents/Dev/Project/2025-09-08_n8nDEMO演示")
|
|||
|
|
restorer = ImageMappingRestorer(base_path)
|
|||
|
|
|
|||
|
|
if len(sys.argv) > 1:
|
|||
|
|
# 处理指定的订单班
|
|||
|
|
order_classes = sys.argv[1].split(',')
|
|||
|
|
restorer.restore_all(order_classes)
|
|||
|
|
else:
|
|||
|
|
# 默认只处理有问题的订单班
|
|||
|
|
# 目前已知视觉设计有问题
|
|||
|
|
restorer.restore_all(["视觉设计"])
|
|||
|
|
|
|||
|
|
if __name__ == "__main__":
|
|||
|
|
main()
|