第8节:打造可配置,可扩展的自动化预处理流水线

张开发
2026/4/12 8:36:52 15 分钟阅读

分享文章

第8节:打造可配置,可扩展的自动化预处理流水线
RAG与Agent性能调优第8节打造可配置可扩展的自动化预处理流水线Gitee地址https://gitee.com/agiforgagaplus/OptiRAGAgent文章详情目录RAG与Agent性能调优上一节第7节图像切分不合理文本矫正和版面区域检查保证信息不完整且不冗余下一节待更新关键步骤实现PDF文本提取功能调用PaddleOCR提取pdf的文本支持多语言复杂排版import requests class OCREngine: def __init__(self, config): self.api_url config[ocr][api_url] # 从配置加载 OCR API 地址 self.high_precision config[ocr].get(high_precision, False) def extract_text(self, file_path): 调用 PaddleOCR API 提取 PDF 文本 with open(file_path, rb) as f: files {file: f} response requests.post(self.api_url, filesfiles) if response.status_code 200: return response.json()[text] else: raise Exception(fOCR 提取失败: {response.text})文本预处理与分段功能清洗OCR文本并按配置规则分段import re class TextProcessor: def __init__(self, config): self.config config self.rules config[dify][process_rules] def preprocess(self, text): 应用预处理规则如去空格、去 URL if self.rules[pre_processing][0][enabled]: text re.sub(r\s, , text) # 去除多余空格 [[7]] return text def segment(self, text): 按配置分段如按标题分割 separator self.rules[segmentation][separator] return re.split(separator, text) # 分段逻辑 [[7]]上传Dify知识库import requests import json class DifyUploader: def __init__(self, config): self.api_key config[dify][api_key] self.dataset_id config[dify][dataset_id] self.base_url fhttps://api.dify.ai/v1/datasets/ {self.dataset_id} def upload_by_text(self, segments): 通过文本创建文档 headers {Authorization: fBearer {self.api_key}, Content-Type: application/json} for i, content in enumerate(segments): payload { name: fsegment_{i}.txt, text: content, indexing_technique: high_quality, process_rule: self.rules # 从配置加载分段规则 [[7]] } response requests.post(f{self.base_url}/document/create_by_text, headersheaders, datajson.dumps(payload)) print(fSegment {i} uploaded: {response.status_code}) return response.json()[document][id]动态元数据绑定def bind_metadata(document_id, config, filename): 动态绑定元数据如从文件名提取来源 url fhttps://api.dify.ai/v1/datasets/ {config[dify][dataset_id]}/documents/metadata headers {Authorization: fBearer {config[dify][api_key]}, Content-Type: application/json} metadata_list [] for field in config[metadata][fields]: value field[value] if field[value_from] filename: value filename # 从文件名动态赋值 [[7]] metadata_list.append({ id: generate_metadata_id(field[name]), # 从 Dify API 获取字段 ID value: value, name: field[name] }) payload {operation_data: [{document_id: document_id, metadata_list: metadata_list}]} requests.post(url, headersheaders, datajson.dumps(payload))可配置与可扩展设计配置驱动外部化参数管理ocr: api_url: http://localhost:8080/ocr # PaddleOCR API 地址 [[4]] high_precision: true # 启用高精度模式 dify: api_key: your_api_key # Dify API 密钥 [[7]] dataset_id: your_dataset_id process_rules: pre_processing: - id: remove_extra_spaces enabled: true segmentation: separator: ### # 分段分隔符 max_tokens: 500 # 每段最大 token 数 metadata: fields: - name: author value: default_author - name: source value_from: filename # 从文件名提取来源 [[7]]插件式扩展新增预处理规则class RemoveSpecialChars: def __init__(self, enabledTrue): self.enabled enabled def apply(self, text): if self.enabled: return re.sub(r[^\w\s], , text) # 去除特殊字符 return text # 集成到 TextProcessor class TextProcessor: def __init__(self, config): self.plugins [ RemoveSpecialChars(enabledTrue), # 动态加载插件 [[9]] # 可扩展新增插件 ] def preprocess(self, text): for plugin in self.plugins: text plugin.apply(text) return text异步任务与扩展性支持from celery import Celery app Celery(tasks, brokerredis://localhost:6379/0) app.task def process_pdf_async(pdf_path): config load_config() ocr_engine OCREngine(config) raw_text ocr_engine.extract_text(pdf_path) processor TextProcessor(config) cleaned_text processor.preprocess(raw_text) segments processor.segment(cleaned_text) uploader DifyUploader(config) document_id uploader.upload_by_text(segments) bind_metadata(document_id, config, pdf_path)完整流程示例def main(pdf_path): config load_config() # 加载配置文件 [[4]] # 1. OCR 提取文本 ocr_engine OCREngine(config) raw_text ocr_engine.extract_text(pdf_path) # 2. 文本预处理与分段 processor TextProcessor(config) cleaned_text processor.preprocess(raw_text) segments processor.segment(cleaned_text) # 3. 上传至 Dify uploader DifyUploader(config) document_id uploader.upload_by_text(segments) # 4. 绑定元数据 bind_metadata(document_id, config, pdf_path) if __name__ __main__: main(example.pdf)总结可配置性通过config.yaml集中管理OCR模式分段规则索引策略等参数可扩展性插件化设计支持新增预处理插件异步支持结合Celery和Redis实现高并发处理元数据动态绑定通过配置定义字段来源如文件名时间戳通过上述设置系统可灵活应对多元异构数据处理场景显著提升RNG构建效率与维护性

更多文章