多轮对话长上下文-增量摘要和结构化摘要示例

张开发
2026/4/16 23:55:49 15 分钟阅读

分享文章

多轮对话长上下文-增量摘要和结构化摘要示例
在处理多轮对话的上下文管理时理论往往很美但工程落地全是坑。之前探索了多轮对话长上下文截断技巧示例https://blog.csdn.net/liliang199/article/details/160157872这里进一步探索增量摘要和结构化摘要所用示例参考和修改自网络资料。1 摘要技巧摘要能极大节省 Token但会丢失细节。这里的建议是分阶段实施。具体为增量摘要、结构化摘要、反思式摘要。1.1 增量摘要增量摘要是指维护一个summary变量。比如每满 5 轮对话调用轻量级模型如 GPT-4o-mini 或本地 7B 模型将新产生的“5 轮对话”和“旧摘要” 合并生成一个新摘要。如此对话上下文不会随着总轮数的增加而线性增加能控制在一个相对合理的范围内。1.2 结构化摘要在摘要过程中LLM为节约字数有可能仅仅是总结下历史对话。对于编程类对话总结历史对话可能丢失很多编程实现细节而这些细节往往是更重要的。在这种情况下更有效的做法就是强制输出 JSON 格式也就是结构化摘要。示例如下{user_personal_info: {name: null, preference: like coffee},task_progress: 选定了酒店正在对比机票,pending_questions: [需要确认返程日期],key_facts: [预算 5000 元, 不去海边]}这种填充式摘要比纯文本摘要的信噪比高得多后续程序可以直接读取 JSON 做逻辑判断。1.3 反思式摘要在生成摘要时增加一个 Prompt 指令指出对话中用户尚未得到解答的悬而未决的问题。这就是反思式摘要反思式摘要解决长对话中经常被冗余文本淹没的重要的未回答的问题。比如LLM说了一半问题时突然岔开话题最后LLM忘记回答该问题的场景。2 代码示例2.1 场景说明这是一个编程助手场景示例如何通过增量摘要和结构化摘要处理长达数十轮的技术讨论。此场景演示编程助手对话中如何压缩历史同时保留最近几轮的精确代码上下文。2.2 代码示例编程助手场景中增量摘要 结构化摘要的实现代码示例如下。1环境配置由于涉及大模型调用这里选用openai格式需要配置api key和base url示例代码如下所示。import os os.environ[HF_ENDPOINT] https://hf-mirror.com model_name gpt_model_name # LLM名称,比如deepseek-r1, qwen3.5-8b os.environ[OPENAI_API_KEY] gpt_api_key # LLM供应商提供的api key os.environ[OPENAI_BASE_URL] gpt_api_url # LLM供应商提供llm访问api的url2增量结构化摘要增量摘要 结构化摘要的具体内容示例如下场景二编程助手 - 增量摘要 结构化摘要演示- 每 4 轮对话触发一次摘要- 摘要以结构化 JSON 格式存储包含代码上下文、未解决问题等- 最近 4 轮完整对话保留原文- 历史部分以摘要形式注入代码示例如下import os import json from openai import OpenAI class ProgrammingAssistant: def __init__(self, model: str gpt-4o-mini): self.client OpenAI() self.model model # 完整对话历史用于触发摘要前的积累 self.full_history [] # 结构化摘要 self.structured_summary { topic: None, code_snippets: [], decisions_made: [], pending_questions: [], key_facts: [] } # 最近保留的完整对话轮数 self.recent_window [] self.summary_threshold 4 # 每 4 轮触发一次摘要 def _generate_structured_summary(self, messages_to_summarize: list) - dict: 调用 LLM 生成结构化摘要 conversation_text for msg in messages_to_summarize: conversation_text f{msg[role]}: {msg[content]}\n prompt f请将以下编程对话提炼为结构化 JSON 摘要 对话内容 {conversation_text} 请输出如下格式的 JSON只输出 JSON不要其他文字 {{ topic: 讨论的核心主题一句话, code_snippets: [用户或助手分享的重要代码片段], decisions_made: [已确定的决策或方案], pending_questions: [尚未解决的问题], key_facts: [重要事实如用户的环境、版本等] }} response self.client.chat.completions.create( modelself.model, messages[ {role: system, content: 你是一个专业的技术摘要助手只输出合法的 JSON。}, {role: user, content: prompt} ], temperature0.3, ) try: return json.loads(response.choices[0].message.content.strip()) except json.JSONDecodeError: # 容错返回基础结构 return { topic: 编程讨论, code_snippets: [], decisions_made: [], pending_questions: [], key_facts: [] } def _merge_summaries(self, old_summary: dict, new_summary: dict) - dict: 合并新旧摘要保留关键信息 merged { topic: new_summary.get(topic) or old_summary.get(topic), code_snippets: list(set(old_summary.get(code_snippets, []) new_summary.get(code_snippets, [])))[-5:], # 保留最多5个代码片段 decisions_made: old_summary.get(decisions_made, []) new_summary.get(decisions_made, []), pending_questions: new_summary.get(pending_questions, []), # 新摘要覆盖未解决问题 key_facts: list(set(old_summary.get(key_facts, []) new_summary.get(key_facts, []))) } return merged def _build_context(self, current_user_input: str) - list: 构建包含摘要和近期原文的上下文 self.full_history.append({role: user, content: current_user_input}) self.recent_window.append({role: user, content: current_user_input}) # 检查是否需要生成摘要 if len(self.full_history) self.summary_threshold * 2: # userassistant 成对计算 messages_to_summarize self.full_history[:-self.summary_threshold * 2] if messages_to_summarize: new_summary self._generate_structured_summary(messages_to_summarize) self.structured_summary self._merge_summaries(self.structured_summary, new_summary) # 清空已摘要的历史只保留最近窗口 self.full_history self.full_history[-self.summary_threshold * 2:] # 构建上下文 system_prompt { role: system, content: 你是一位经验丰富的编程导师擅长解答 Python、算法和系统设计问题。 } # 将摘要作为 system 消息的一部分注入 summary_text if self.structured_summary.get(topic): summary_text f[历史对话摘要]\n summary_text f主题: {self.structured_summary[topic]}\n if self.structured_summary[code_snippets]: summary_text f重要代码片段: {self.structured_summary[code_snippets][:3]}\n if self.structured_summary[decisions_made]: summary_text f已定方案: {self.structured_summary[decisions_made]}\n if self.structured_summary[pending_questions]: summary_text f待解决问题: {self.structured_summary[pending_questions]}\n if self.structured_summary[key_facts]: summary_text f关键事实: {self.structured_summary[key_facts]}\n context [system_prompt] if summary_text: context.append({role: system, content: summary_text}) # 添加最近的完整对话 context.extend(self.recent_window[-self.summary_threshold * 2:]) return context def chat(self, user_input: str) - str: context self._build_context(user_input) print(f\n[调试] 结构化摘要: {json.dumps(self.structured_summary, ensure_asciiFalse, indent2)[:200]}...) response self.client.chat.completions.create( modelself.model, messagescontext, temperature0.5, ) ai_response response.choices[0].message.content self.full_history.append({role: assistant, content: ai_response}) self.recent_window.append({role: assistant, content: ai_response}) # 保持 recent_window 不超过窗口大小 if len(self.recent_window) self.summary_threshold * 4: self.recent_window self.recent_window[-self.summary_threshold * 2:] return ai_response3运行测试以下是运行测试的示例# 测试运行 if __name__ __main__: assistant ProgrammingAssistant(modelmodel_name) print( 编程助手测试 (增量摘要) ) test_queries [ 我想用 Python 写一个并发下载器有什么建议, asyncio 和 threading 哪个更适合, 我决定用 asyncio代码大概怎么写, import asyncio\nimport aiohttp\n\nasync def download(url):\n ...\n这样开始对吗, 如果我要限制并发数用 Semaphore 对吗, 好的那超时和重试怎么处理, 如果下载的文件很大内存会不会爆, 我明白了用流式写入文件。还有什么需要注意的吗, ] for i, query in enumerate(test_queries): print(f\n 用户: {query}) response assistant.chat(query) print(f 助手: {response[:600]}... if len(response) 150 else f 助手: {response}) if (i 1) % 4 0: print(f\n [第 {i1} 轮后触发了摘要生成])输出如下所示 编程助手测试 (增量摘要) 用户: 我想用 Python 写一个并发下载器有什么建议[调试] 结构化摘要: {topic: null,code_snippets: [],decisions_made: [],pending_questions: [],key_facts: []}... 助手: 要用 Python 编写一个高效的并发下载器你可以从以下几个方面入手构建一个稳定、可扩展且高性能的系统---## 一、并发方案选择根据任务类型选择适合的并发模型| 方案 | 适用场景 | 优点 | 缺点 ||--------------|--------------------|--------------------------|--------------------------|| threading | I/O 密集型任务 | 实现简单资源开销小 | 受 GIL 限制不适合计算密集 || multiprocessing | CPU 密集型任务 | 绕过 GIL真正并行 | 进程开销大通信复杂 || asyncio aiohttp | 高并发网络任务 | 性能最佳资源利用率高 | 学习曲线较陡需异步编程 |✅ 推荐网络下载属于 I/O 密集型优先使用 asyncio aiohttp其次可用 ThreadPoolExecutor。---## 二、核心库推荐- aiohttp异步 HTTP 客户端适合高并发下载- r... 用户: asyncio 和 threading 哪个更适合[调试] 结构化摘要: {topic: null,code_snippets: [],decisions_made: [],pending_questions: [],key_facts: []}... 助手: 对于 **并发下载器** 这个特定场景结论如下* **追求高性能、高并发100 个任务**选 **asyncio**。* **追求开发简单、快速脚本50 个任务**选 **threading**。下面从 5 个核心维度进行深度对比帮助你做出决定### 1. 核心对比表| 维度 | asyncio (异步 IO) | threading (多线程) || :--- | :--- | :--- || **并发能力** | ⭐⭐⭐⭐⭐ (轻松支持 1000 连接) | ⭐⭐⭐ (通常建议 200 线程否则开销大) || **内存占用** | ⭐⭐⭐⭐⭐ (极低协程开销小) | ⭐⭐ (每个线程需独立栈空间开销大) || **CPU 利用** | 单核 (事件循环)无锁竞争 | 多核 (受 GIL 限制但 I/O 等待时会释放) || **代码难度** | ⭐⭐⭐⭐ (需理解 async/await所有库需异步) | ⭐⭐ (同步代码思维直观库丰富) || **依赖库** | aiohttp, aiomysql 等异步库 | requests, urllib 等同步库 || **调试难度** | 较高 (报错堆栈复杂易阻塞事件循环) | 较低 (标准同步报错但需注意线程安... 用户: 我决定用 asyncio代码大概怎么写[调试] 结构化摘要: {topic: null,code_snippets: [],decisions_made: [],pending_questions: [],key_facts: []}... 助手: 太好了asyncio 是构建高性能并发下载器的最佳选择。下面我为你提供一个**生产级别的完整示例**包含并发控制、进度条、重试机制、超时处理等核心功能并附带使用说明和扩展建议。---## 完整代码示例pythonimport asyncioimport aiohttpfrom pathlib import Pathfrom tqdm import tqdmfrom typing import List, Optionalimport timeclass AsyncDownloader:def __init__(self,max_concurrent: int 10,timeout: int 30,retry_times: int 3,chunk_size: int 8192):self.max_concurrent max_concurrentself.timeout timeoutself.retry_times retry_timesself.chunk_size chunk_sizeself.sema... 用户: import asyncioimport aiohttpasync def download(url):...这样开始对吗[调试] 结构化摘要: {topic: null,code_snippets: [],decisions_made: [],pending_questions: [],key_facts: []}... 助手: 你的开头**基本正确**但还**不完整**。async def 只是定义了异步函数要真正运行它还需要正确的**入口点**和**Session 管理**。---## ✅ 正确的最小可用结构pythonimport asyncioimport aiohttpasync def download(session, url):下载单个文件async with session.get(url) as resp:content await resp.read()print(f✓ 已下载{url})return contentasync def main():程序入口urls [https://example.com/file1.zip, https://example.com/file2.zip]# 必须创建 ClientSession 并正确关闭async with aiohttp.ClientSession() as session:tasks [download(session, url) for url in urls]await asyn... [第 4 轮后触发了摘要生成] 用户: 如果我要限制并发数用 Semaphore 对吗[调试] 结构化摘要: {topic: 编程讨论,code_snippets: [],decisions_made: [],pending_questions: [],key_facts: []}... 助手: **对的完全正确** 在 asyncio 中asyncio.Semaphore 是控制并发数的**标准做法**。它的作用就像是一个“令牌”只有拿到令牌的协程才能执行下载任务从而防止同时发起过多请求导致服务器拒绝或本地资源耗尽。---### 1. 核心用法示例pythonimport asyncioimport aiohttpasync def download(session, url, semaphore):# ✅ 关键使用 async with 获取锁async with semaphore:print(f开始下载{url})async with session.get(url) as resp:await resp.read()print(f完成下载{url})async def main():urls [http://example.com/file1, http://example.com/file2, ...] * 100# ✅ 关键在外部创建 Semaphore所有任务共享semaphore asyncio.Semaphore(10) # 限... 用户: 好的那超时和重试怎么处理[调试] 结构化摘要: {topic: Python 高效并发下载器的实现方案选择与代码示例,code_snippets: [async def download_file(session, url, save_path, semaphore):\n async with semaphore:\n async with session.get(url, timeou... 助手: 非常好的问题**超时**和**重试**是生产级下载器的必备功能。下面我为你展示完整的实现方案。---## 1. 超时控制Timeoutaiohttp 支持多种超时设置推荐使用 ClientTimeoutpythonimport aiohttp# ✅ 推荐使用 ClientTimeout 对象timeout aiohttp.ClientTimeout(total60, # 整个请求最大时间秒connect10, # 连接建立超时sock_read30 # 读取数据超时)async with session.get(url, timeouttimeout) as resp:...| 参数 | 说明 | 推荐值 ||------|------|--------|| total | 整个请求的最大时间 | 60~300 秒大文件增加 || connect | 建立连接超时 | 5~10 秒 || sock_read | 读取数据超时 | 30~60 秒 || sock_connect | TCP 握手超时 | 5~10 秒 |---## 2. 重试机制Retry推荐使用 **指数退避** 策略避免频繁... 用户: 如果下载的文件很大内存会不会爆[调试] 结构化摘要: {topic: Python 并发下载器中 asyncio 与 threading 的技术选型及实现讨论,code_snippets: [Asyncio 异步思维async def download(url): data await session.get(url); save(data),Threading 同步思维def downloa... 助手: **非常好的问题** 这正是很多人踩过的坑。**答案如果用错方法内存会爆如果用对方法100GB 的文件也没问题。**---## ❌ 错误写法一次性加载到内存python# ⚠️ 危险大文件会撑爆内存async def download_wrong(session, url, save_path):async with session.get(url) as resp:content await resp.read() # ❌ 整个文件加载到内存with open(save_path, wb) as f:f.write(content) # 再写入磁盘| 文件大小 | 内存占用 | 结果 ||---------|---------|------|| 10 MB | 10 MB | ✅ 正常 || 500 MB | 500 MB | ⚠️ 危险 || 2 GB | 2 GB | ❌ 可能 OOM || 10 GB | 10 GB | ❌ 必然崩溃 |---## ✅ 正确写法流式分块读取python# ✅ 安全内存占用恒定与文件大小无关async def download_correct(session, url... 用户: 我明白了用流式写入文件。还有什么需要注意的吗[调试] 结构化摘要: {topic: 编程讨论,code_snippets: [Threading 同步思维def download(url): data requests.get(url); save(data),async def download_file(session, url, save_path, semaphore):\n async wit... 助手: 你理解得很到位流式写入是核心但**生产级下载器**还有很多细节需要注意。我整理了最关键的 6 个要点---## 1️⃣ 临时文件 原子操作避免半成品**问题**下载中断会留下损坏的文件下次无法判断是否完整。**方案**先下载到 .tmp 文件完成后再重命名。pythonasync def download(self, session, url, save_path):# ✅ 使用临时文件temp_path save_path.with_suffix(f{save_path.suffix}.tmp)try:async with session.get(url, timeoutself.timeout) as resp:resp.raise_for_status()with open(temp_path, wb) as f:async for chunk in resp.content.iter_chunked(8192):f.write(chunk)# ✅ 下载完成再重命名... [第 8 轮后触发了摘要生成]增量摘要结构化摘要 采用JSON格式存储代码片段、决策和待办并对新旧摘要智能合并。每 4 轮对话后早期对话被压缩为结构化 JSON 摘要包含代码片段、决策、待办事项。后续对话模型既能通过摘要了解历史背景又能通过保留最近4轮精确引用刚讨论过的代码细节。reference---多轮对话长上下文截断技巧示例https://blog.csdn.net/liliang199/article/details/160157872LLM上下文管理探索-滑动窗口摘要压缩优先级丢弃https://blog.csdn.net/liliang199/article/details/159986000DeepSeek 如果对话过长如何处理上下文长度限制问题https://bbs.itying.com/topic/67a6875f55a429007d7d3b26

更多文章