本文最后更新于117 天前,其中的信息可能已经过时,如有错误请发送邮件到1520903968@qq.com
学长让我完成一个爬虫项目,内容是提取一本书内的知识点。(作为当计算机苦力的)我以下分享流程和代码:(以《Java面向对象程序设计》一书为例)
首先准备所提取的书目的电子版,优先为txt格式。但是一般获取的电子版书籍保存后都是pdf格式,故第一步用python内的pdfplumber
库进行转换。
import pdfplumber
pdf_path = '电子书的pdf路径.pdf'
txt_path = '转换后保存的路径.txt'
with pdfplumber.open(pdf_path) as pdf:
text = ""
for page in pdf.pages:
text += page.extract_text() + "\n"
with open(txt_path, 'w', encoding='utf-8') as f:
f.write(text)
注意!若你的PDF阅读器打开文件时无法选中文字,则该PDF为扫描件/图片型PDF,则会更麻烦一些。需要使用以下OCR(光学字符识别)技术提取文字:
先安装Tesseract OCR引擎:
https://digi.bib.uni-mannheim.de/tesseract/tesseract-ocr-w64-setup-5.3.3.20231005.exe
注意是否有中文语言包!安装过程时注意勾选!!否则无法识别汉字。
就绪后运行以下代码:
import pdfplumber
import pytesseract
from PIL import Image
import logging
# 配置Tesseract路径(重要!)
pytesseract.pytesseract.tesseract_cmd = r'C:\Program Files\Tesseract-OCR\tesseract.exe' # 修改为你的实际路径
# 关闭CropBox警告
logging.getLogger("pdfplumber").setLevel(logging.ERROR)
def pdf_to_txt_with_ocr(pdf_path, txt_path, resolution=300):
try:
with pdfplumber.open(pdf_path) as pdf, open(txt_path, 'w', encoding='utf-8') as f:
print(f"正在处理PDF,共{len(pdf.pages)}页...")
for i, page in enumerate(pdf.pages, 1):
# 提取页面图像
img_pil = page.to_image(resolution=resolution).original
# 图像预处理(提升识别率)
img_pil = img_pil.convert('L') # 灰度化
# OCR识别
text = pytesseract.image_to_string(img_pil, lang='chi_sim+eng')
# 写入结果
f.write(f"=== 第 {i} 页 ===\n{text.strip()}\n\n")
print(f"已完成第 {i}/{len(pdf.pages)} 页")
print(f"转换完成!结果保存至:{txt_path}")
except Exception as e:
if "password" in str(e).lower():
print("错误:PDF文件已加密,需要密码")
else:
print(f"发生错误:{str(e)}")
if __name__ == "__main__":
pdf_path = "example.pdf"#这两处改为自己的路径!
txt_path = "output.txt"
resolution = 300 # 分辨率越高越清晰,但处理速度越慢
pdf_to_txt_with_ocr(pdf_path, txt_path, resolution)
第二步,文本文件按照章节分割,并将每个章节保存到单独的文件中。
(此步之前,需要对txt文件进行预处理,可手动修改或编写代码使文件中的章节部分为”第 1 章“格式。并且文档内容只能包含一个”第 1 章“)
import os
import re # 导入正则表达式模块
def split_chapters(file_path, output_folder):
# 确保输出目录存在
os.makedirs(output_folder, exist_ok=True)
with open(file_path, 'r', encoding='utf-8') as file:
content = file.readlines()
current_chapter = None
chapter_content = []
# 使用正则表达式匹配章节标题
chapter_pattern = re.compile(r'^第\s*(\d+)\s*章\s*(.*)')
for line in content:
# 检查是否是章节标题
match = chapter_pattern.match(line.strip())
if match:
if current_chapter is not None:
# 保存当前章节内容
output_path = os.path.join(output_folder, f'chapter_{current_chapter}.txt')
with open(output_path, 'w', encoding='utf-8') as f:
f.writelines(chapter_content)
chapter_content = []
# 提取章节编号和标题
chapter_number = match.group(1)
chapter_title = match.group(2)
current_chapter = int(chapter_number)
print(f"检测到章节:第{current_chapter}章 - {chapter_title}")
else:
# 将非章节标题的行添加到当前章节内容
chapter_content.append(line)
# 保存最后一个章节
if current_chapter is not None:
output_path = os.path.join(output_folder, f'chapter_{current_chapter}.txt')
with open(output_path, 'w', encoding='utf-8') as f:
f.writelines(chapter_content)
if __name__ == "__main__":
file_path = 'source/Java_bookZ-library.txt'
output_folder = 'source/chapters' # 确保路径正确
split_chapters(file_path, output_folder)
处理后的文档:
第三步,处理指定文件夹中的章节文件,使用OpenAI的API生成每个章节的核心知识点,并将结果保存到另一个文件夹中。
from pathlib import Path
from openai import OpenAI
import time
import os
from tenacity import (
retry,
wait_exponential,
stop_after_attempt,
retry_if_exception_type,
before_sleep_log
)
import logging
# 配置日志记录
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# 配置 OpenAI 客户端
client = OpenAI(
api_key="sk-49YrM30WvphxoBtYcUzmnYMMdU4nkKHkHvEXDpDBcs2AIDX8",
base_url="https://api.moonshot.cn/v1",
)
# 路径配置
current_dir = Path(__file__).parent
chapters_folder = current_dir / "source/chapters"
answer_folder = current_dir / "source/answer"
answer_folder.mkdir(parents=True, exist_ok=True) # 自动创建目录
# 自定义重试条件
def is_rate_limit_error(exception):
return isinstance(exception, Exception) and "rate_limit" in str(exception)
# 带重试机制的API调用装饰器
@retry(
wait=wait_exponential(multiplier=1, min=10, max=60), # 指数退避
stop=stop_after_attempt(5), # 最大重试5次
retry=retry_if_exception_type(Exception),
before_sleep=before_sleep_log(logger, logging.WARNING)
)
def safe_api_call(messages):
"""安全执行API调用,自动处理速率限制"""
try:
response = client.chat.completions.create(
model="moonshot-v1-32k",
messages=messages,
temperature=0.3,
)
return response
except Exception as e:
# 特殊处理速率限制错误
if hasattr(e, 'status_code') and e.status_code == 429:
if e.response and 'X-RateLimit-Reset' in e.response.headers:
reset_time = int(e.response.headers['X-RateLimit-Reset'])
logger.warning(f"速率限制触发,等待 {reset_time} 秒")
time.sleep(reset_time + 1) # 安全缓冲
else:
logger.warning("默认等待20秒")
time.sleep(20)
raise # 重新抛出异常以触发重试
def process_chapters():
"""处理所有章节文件"""
for file_path in sorted(chapters_folder.glob("*.txt")):
chapter_number = file_path.stem
answer_file = answer_folder / f"{chapter_number}.txt"
# 跳过已处理文件
if answer_file.exists():
logger.info(f"跳过已处理章节 {chapter_number}")
continue
try:
# 读取文件内容
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# 构建消息
messages = [
{"role": "system", "content": "你是 Kimi...(保持原有系统提示)"},
{"role": "system", "content": f"文本内容:\n{content[:30000]}"}, # 安全截断
{"role": "user", "content": "请用中文整理以下文字材料的核心知识点,要求结构清晰、重点突出:"}
]
# 执行API调用
start_time = time.time()
response = safe_api_call(messages)
# 写入结果
with open(answer_file, 'w', encoding='utf-8') as f:
f.write(response.choices[0].message.content)
# 强制请求间隔(即使成功也等待)
elapsed = time.time() - start_time
if elapsed < 20: # 确保每分钟不超过3次
sleep_time = 20 - elapsed
logger.info(f"强制等待 {sleep_time:.1f} 秒")
time.sleep(sleep_time)
logger.info(f"成功处理 {chapter_number}")
except Exception as e:
logger.error(f"处理 {chapter_number} 失败: {str(e)}")
continue
if __name__ == "__main__":
process_chapters()
经openai处理后的文档:
得到了总计17个txt文件,最后一步可以将其合并到一个txt文件内,并根据txt文件名(chapter_1.txt)生成对应的第*章隔开。
import os
def merge_chapters(input_folder, output_file):
"""
合并指定文件夹中的所有章节TXT文件到一个文件中,
并在每个章节的第一行插入“第*章”的标识
参数:
input_folder: 包含章节TXT文件的文件夹路径
output_file: 合并后的输出文件路径
"""
# 获取文件夹中所有TXT文件
chapter_files = [f for f in os.listdir(input_folder) if f.endswith('.txt')]
# 按文件名排序,确保章节顺序正确
chapter_files.sort()
# 合并文件
with open(output_file, 'w', encoding='utf-8') as outfile:
for i, filename in enumerate(chapter_files, 1):
file_path = os.path.join(input_folder, filename)
try:
with open(file_path, 'r', encoding='utf-8') as infile:
# 读取文件内容
content = infile.read()
# 在内容开头插入“第*章”的标识
chapter_header = f"第{i}章\n"
updated_content = chapter_header + content
# 写入更新后的内容
outfile.write(updated_content)
# 添加分隔符
outfile.write("\n\n")
print(f"已合并: {filename}")
except Exception as e:
print(f"合并 {filename} 时出错: {e}")
print(f"所有章节已合并到: {output_file}")
# 使用示例
if __name__ == "__main__":
# 输入文件夹路径(包含章节TXT文件)
input_folder = "source/answer" # 替换为你的章节文件夹路径
# 输出文件路径
output_file = "source/merged_chapters.txt" # 合并后的文件名
merge_chapters(input_folder, output_file)
完成!
$E=mc^2$
加油!小尚同学