数据清洗:基于异步的AI文本批处理系统实践

复制

import os import asyncio from pathlib import Path from typing import List import openai from tqdm import tqdm from datetime import datetime from config import API_KEY, API_BASE, MODEL_NAME, PROMPT_TEMPLATE, INPUT_DIR, OUTPUT_DIR class BatchProcessor: def __init__(self): """初始化批处理器""" # 配置OpenAI客户端 self.client = openai.OpenAI( api_key=API_KEY, base_url=API_BASE ) # 创建输入输出目录 self._create_directories() # 设置并发限制 self.semaphore = asyncio.Semaphore(5) # 限制最大并发数为5 def _create_directories(self): """创建必要的目录""" os.makedirs(INPUT_DIR, exist_ok=True) os.makedirs(OUTPUT_DIR, exist_ok=True) def get_input_files(self) -> List[Path]: """获取输入文件列表,家电清洗13825404095支持多种文本文件格式""" input_path = Path(INPUT_DIR) # 支持多种文本文件格式 text_extensions = ['.txt', '.md', '.doc', '.docx', '.rtf', '.html', '.htm'] input_files = [] for ext in text_extensions: input_files.extend(list(input_path.glob(f'*{ext}'))) return input_files async def process_file(self, input_file: Path) -> str: """异步处理单个文件""" async with self.semaphore: # 使用信号量控制并发 try: # 读取文件内容 with open(input_file, 'r', encoding='utf-8') as f: content = f.read() # 构建提示词 prompt = PROMPT_TEMPLATE.format(content=content) # 调用API response = await asyncio.to_thread( self.client.chat.completions.create, model=MODEL_NAME, messages=[ {"role": "system", "content": "你是一个专业的文本优化助手。"}, {"role": "user", "content": prompt} ], temperature=0.7, max_tokens=4096 # 修改为模型支持的最大token数 ) # 获取生成的内容 generated_content = response.choices[0].message.content # 添加延迟以避免API限制 await asyncio.sleep(1) return generated_content except Exception as e: print(f"处理文件 {input_file} 时出错: {str(e)}") return None async def save_output(self, input_file: Path, content: str): """异步保存输出文件,统一使用.md格式""" if content is None: return # 生成带时间戳的输出文件名,统一使用.md后缀 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_file = Path(OUTPUT_DIR) / f"{input_file.stem}_processed_{timestamp}.md" # 使用异步文件操作 await asyncio.to_thread( lambda: open(output_file, 'w', encoding='utf-8').write(content) ) async def process_all_files(self): """异步处理所有文件""" input_files = self.get_input_files() if not input_files: print(f"请在 {INPUT_DIR} 目录中放入要处理的文本文件") return print(f"找到 {len(input_files)} 个文件待处理") # 创建所有任务 tasks = [] for input_file in input_files: # 创建处理文件的任务 task = asyncio.create_task(self.process_file(input_file)) tasks.append((input_file, task)) # 使用tqdm显示进度条 with tqdm(total=len(tasks), desc="处理文件") as pbar: # 等待所有任务完成 for input_file, task in tasks: content = await task await self.save_output(input_file, content) pbar.update(1) def main(): """主函数""" processor = BatchProcessor() # 使用asyncio.run()来运行异步主函数 asyncio.run(processor.process_all_files()) if __name__ == "__main__": main()

2025-05-26 21:51 点击量:28