【AI】十三.Runnable解析和多实现类案例实战 小滴课堂讲师 2025年09月19日 ai大模型, aigc 预计阅读 16 分钟 #### LangChain核心之Runnable接口底层实现 ##### 什么是`Runnable`接口 * 是LangChain框架中所有组件的核心抽象接口,用于封装可执行的逻辑单元(如模型调用、数据处理、API集成等)。 * 通过实现统一的`invoke`、`batch`、`stream`等方法,支持模块化构建链式任务,允许开发者以声明式编程LCEL串联不同组件 ``` from langchain_core.runnables ``` * 为什么要使用`Runnable` - **统一接口**:所有组件(如Prompt模板、模型、解析器)均实现Runnable接口,确保类型兼容和链式调用的无缝衔接 - **灵活组合**:通过管道符`|`将多个Runnable串联成链,简化复杂逻辑的编排,类似数据流处理 - **动态配置**:支持运行时参数绑定、组件替换和错误恢复机制(如`with_retry()`),提升系统灵活性和鲁棒性 - **异步与性能优化**:内置异步方法(如`ainvoke`)和并行处理(如`RunnableParallel`),适用于高并发场景 ##### 什么是`RunnableSequence` * 是LangChain中用于构建**顺序执行链**的核心组件,通过管道符`|`将多个Runnable串联,形成线性执行流程,是`Runnable`子类 ``` from langchain_core.runnables import RunnableSequence ``` * 执行 LCEL链调用的方法(invoke/stream/batch),链中的每个组件也调用对应的方法,将输出作为下一个组件的输入 ```python #RunnableSequence.invoke 的源码解读 def invoke( self, input: Input, config: Optional[RunnableConfig] = None, **kwargs: Any ) -> Output: # invoke all steps in sequence try: for i, step in enumerate(self.steps): # mark each step as a child run config = patch_config( config, callbacks=run_manager.get_child(f"seq:step:{i + 1}") ) with set_config_context(config) as context: if i == 0: input = context.run(step.invoke, input, config, **kwargs) else: input = context.run(step.invoke, input, config) ``` ##### 从LECL表达式开始理解 ``` chain = prompt | model | output_parser # 通过|直接连接 ``` * **数据流传递** * 每个Runnable的输出作为下一个Runnable的输入,形成单向数据流。 * 例如,若链为`A | B | C`,则执行流程为`A的输出 → B的输入 → B的输出 → C的输入` * **统一接口**: * 所有组件(如Prompt模板、模型、输出解析器)均实现`Runnable`接口,确保类型兼容性和链式调用的无缝衔接 * **延迟执行**: * 链的构建仅定义逻辑关系,实际执行在调用`invoke`或`stream`时触发,支持动态参数绑定和运行时配置 * **底层实现**: * 管道符`|`在Python中被重写为`__or__`方法,实际调用`RunnableSequence`构造函数, * 将多个Runnable存入内部列表`steps`中, 执行时按顺序遍历列表并调用每个Runnable的`invoke`方法 ##### Runnable接口定义了以下核心方法,支持多种执行模式 ```python class Runnable(Generic[Input, Output]): #处理单个输入,返回输出。 def invoke(self, input: Input) -> Output: ... #异步处理单个输入。 async def ainvoke(self, input: Input) -> Output: ... #逐块生成输出,适用于实时响应。 def stream(self, input: Input) -> Iterator[Output]: ... #批量处理输入列表,提升吞吐量。 def batch(self, inputs: List[Input]) -> List[Output]: ... ``` | 方法 | 说明 | 使用场景 | | :---------: | :----------: | :----------: | | `invoke()` | 同步执行 | 单次调用 | | `batch()` | 批量同步执行 | 处理数据集 | | `stream()` | 流式输出 | 实时生成文本 | | `ainvoke()` | 异步执行 | Web服务集成 | * 具有多个子类实现 | 组件 | 特点 | 适用场景 | | :-------------------: | :------: | :------------: | | `RunnableSequence` | 顺序执行 | 线性处理流水线 | | `RunnableBranch` | 条件路由 | 分支选择逻辑 | | `RunnableParallel` | 并行执行 | 多任务独立处理 | | `RunnablePassthrough` | 数据透传 | 保留原始输入 | #### RunnablePassthrough透传参数实战 ##### RunnablePassthrough * 核心功能:用于在链中直接传递输入数据,不进行任何修改,或通过 `.assign()` 扩展上下文字段  * 应用场景: - 保留原始输入数据供后续步骤使用。 - 动态添加新字段到上下文中(如结合检索结果与用户问题) * 基础用法 ```python from langchain_core.runnables import RunnablePassthrough # 直接传递输入 chain = RunnablePassthrough() | model output = chain.invoke("Hello") ``` ##### 扩展字段案例 * 案例一 ```python # 使用 assign() 添加新字段 from langchain_core.runnables import RunnablePassthrough # 使用 assign() 方法添加新字段,该方法接收一个关键字参数,其值是一个函数 # 这个函数定义了如何处理输入数据以生成新字段 # 在这个例子中,lambda 函数接收一个输入 x,并返回 x["num"] * 2 的结果 # 这将创建一个新的字段 'processed',其值是输入字段 'num' 的两倍 chain = RunnablePassthrough.assign(processed=lambda x: x["num"] * 2) # 调用 chain 对象的 invoke 方法,传入一个包含 'num' 字段的字典 # 这将执行之前定义的 lambda 函数,并在输入字典的基础上添加 'processed' 字段 # 最后输出处理后的字典 output = chain.invoke({"num": 3}) # 输出 {'num':3, 'processed':6} print(output) ``` * 案例二(伪代码) ```python # 构建包含原始问题和处理上下文的链 chain = ( RunnablePassthrough.assign( context=lambda x: retrieve_documents(x["question"]) ) | prompt | llm ) # 输入结构 input_data = {"question": "LangChain是什么?"} response = chain.invoke(input_data) ``` ##### 透传参数LLM案例实战 * 用户输入的问题会同时传给`retriever`和`RunnablePassthrough()` * `retriever`完成检索后,会自动把结果赋值给`context`。 * 检索结果`context`和用户输入`question`一并传给提示模板`prompt_template`。 * **输出**:模型根据检索到的上下文生成答案 ```python from langchain_community.embeddings import DashScopeEmbeddings from langchain_milvus import Milvus from langchain_core.documents import Document from langchain_core.runnables import RunnablePassthrough from langchain_core.prompts import ChatPromptTemplate from langchain_openai import ChatOpenAI # 初始化模型 embeddings = DashScopeEmbeddings( model="text-embedding-v2", # 第二代通用模型 max_retries=3, dashscope_api_key="sk-005c3c25f6d042848b29d75f2f020f08" ) document_1 = Document( page_content="LangChain支持多种数据库集成,小滴课堂的AI大课", metadata={"source": "xdclass.net/doc1"}, ) document_2 = Document( page_content="Milvus擅长处理向量搜索,老王的课程不错", metadata={"source": "xdclass.net/doc2"}, ) documents = [document_1,document_2] vector_store = Milvus.from_documents( documents=documents, embedding=embeddings, collection_name="runnable_test", connection_args={"uri": "http://47.119.128.20:19530"} ) #默认是 similarity search retriever = vector_store.as_retriever(search_kwargs={"k": 2}) prompt = ChatPromptTemplate.from_template("基于上下文回答:{context}\n问题:{question}") #定义模型 model = ChatOpenAI( model_name = "qwen-plus", base_url="https://dashscope.aliyuncs.com/compatible-mode/v1", api_key="sk-005c3c25f6d042848b29d75f2f020f08", temperature=0.7 ) chain = { "context": retriever, "question": RunnablePassthrough() # 直接传递用户问题 } | prompt | model result = chain.invoke("LangChain支持数据库吗") print(result) ``` #### AI智能推荐实战之RunnableParallel并行链 ##### RunnableParallel介绍 * 并行执行多个 Runnable,合并结果为一个字典,键为子链名称,值为对应输出 ``` class RunnableParallel(Runnable[Input, Dict[str, Any]]): """ 并行执行多个Runnable的容器类 输出结果为字典结构:{key1: result1, key2: result2...} """ ``` * 在 LCEL 链上,会将字典隐形转换为`RunnableParallel` ```python multi_retrieval_chain = ( RunnableParallel({ "context1": retriever1, #数据源一 "context2": retriever2, #数据源二 "question": RunnablePassthrough() }) | prompt_template | llm | outputParser ) ======= 自动化转换为下面,写法一样 ======== multi_retrieval_chain = ( { "context1": retriever1, #数据源一 "context2": retriever2, #数据源二 "question": RunnablePassthrough() } | prompt_template | llm | outputParser ) ``` ##### 特点 | 特性 | 说明 | 示例 | | :----------: | :--------------------: | :------------------------: | | **并行执行** | 所有子Runnable同时运行 | 3个任务耗时2秒(而非累加) | | **类型安全** | 强制校验输入输出类型 | 自动检测字典字段类型 | * API 与用法, 构造函数所有子链接收相同的输入 ```python from langchain_core.runnables import RunnableParallel runnable = RunnableParallel( key1=chain1, key2=chain2 ) ``` ##### 应用场景 * **数据并行处理器**:同时处理多个数据流 * **结构化数据装配器**:构建标准化的输出格式 * **流水线分叉合并器**:实现Map-Reduce模式中的Map阶段 * 举例 * 多维度数据分析 ``` analysis_chain = RunnableParallel({ "sentiment": sentiment_analyzer, "keywords": keyword_extractor, "entities": ner_recognizer }) ``` * 多模型对比系统 ```python model_comparison = RunnableParallel({ "gpt4": gpt4_chain, "claude": claude_chain, "gemini": gemini_chain }) ``` * 智能文档处理系统 ```python document_analyzer = RunnableParallel({ "summary": summary_chain, # 摘要生成 "toc": toc_generator, # 目录提取 "stats": RunnableLambda(lambda doc: { "char_count": len(doc), "page_count": doc.count("PAGE_BREAK") + 1 }) }) # 处理200页PDF文本 analysis_result = document_analyzer.invoke(pdf_text) ``` ##### 案例实战 * 场景:并行生成景点与书籍推荐 ```python from langchain_core.runnables import RunnableParallel from langchain_core.prompts import ChatPromptTemplate from langchain_openai import ChatOpenAI from langchain_core.output_parsers import JsonOutputParser #定义模型 model = ChatOpenAI( model_name = "qwen-plus", base_url="https://dashscope.aliyuncs.com/compatible-mode/v1", api_key="sk-005c3c25f6d042848b29d75f2f020f08", temperature=0.7 ) #构建解析器 parser = JsonOutputParser() prompt_attractions = ChatPromptTemplate.from_template("""列出{city}的{num}个景点。返回 JSON 格式: {{ "num": "编号", "city": "城市", "introduce": "景点介绍", }} """) prompt_books = ChatPromptTemplate.from_template("""列出{city}相关的{num}本书返回 JSON 格式: {{ "num": "编号", "city": "城市", "introduce": "书籍介绍", }} """) chain1 = prompt_attractions | model | parser chain2 = prompt_books | model | parser chain = RunnableParallel( attractions = chain1 , books = chain2 ) output = chain.invoke({"city": "南京", "num": 3}) print(output) ``` #### RunnableLambda介绍和包装链式函数实战 ##### RunnableLambda * 核心功能 * 将任意 Python 函数转换为 Runnable,将普通的 Python 函数或可调用对象转换为 `Runnable`对象,无缝集成到链中 * 把自己需要的功能通过自定义函数 + RunnableLambda的方式包装,可以跟任何外部系统打通,集成到 LCEL 链 ``` class RunnableLambda(Runnable[Input, Output]): """ 将任意Python函数转换为符合Runnable协议的对象 实现自定义逻辑与LangChain生态的无缝集成 """ ``` * 与普通函数的区别 | 特性 | 普通函数 | RunnableLambda | | :------: | :-----------------: | :-------------------: | | 可组合性 | ❌ 无法直接接入Chain | ✅ 支持` | | 类型校验 | ❌ 动态类型 | ✅ 静态类型检查 | | 异步支持 | ❌ 需手动实现 | ✅ 原生支持async/await | | 批量处理 | ❌ 需循环调用 | ✅ 自动批量优化 | ##### 适合场景 * 插入自定义逻辑(如日志记录、数据清洗) * 转换数据格式(如 JSON 解析)。 * API 与用法 ```python from langchain_core.runnables import RunnableLambda def log_input(x): print(f"Input: {x}") return x chain = prompt | RunnableLambda(log_input) | model ``` ##### 案例实战 * 基础文本处理链 ```python from langchain_core.runnables import RunnableLambda text_clean_chain = ( RunnableLambda(lambda x: x.strip()) | RunnableLambda(str.lower) ) result = text_clean_chain.invoke(" Hello123World ") print(result) # 输出 "helloworld" ``` * 打印中间结果并过滤敏感词(在链中插入自定义处理逻辑) ```python from langchain_core.runnables import RunnableLambda from langchain_openai import ChatOpenAI def filter_content(text: str) -> str: return text.replace("暴力", "***") #定义模型 model = ChatOpenAI( model_name = "qwen-plus", base_url="https://dashscope.aliyuncs.com/compatible-mode/v1", api_key="sk-005c3c25f6d042848b29d75f2f020f08", temperature=0.7 ) chain = ( RunnableLambda(lambda x: x["user_input"]) | RunnableLambda(filter_content) | model ) result = chain.invoke({"user_input": "暴力内容"}) print(result) # 输出过滤后的结果 ``` #### 智能客服路由实战之RunnableBranch条件分支 ##### RunnableBranch * 核心功能:根据条件选择执行不同的子链,类似 if-else 路由 * API 与用法 ```python from langchain_core.runnables import RunnableBranch #条件函数:接收输入,返回布尔值。 branch = RunnableBranch( (condition1, chain1), (condition2, chain2), default_chain ) """ 参数说明: - Condition: 返回bool的可调用对象 - Runnable: 条件满足时执行的分支 - default: 所有条件不满足时执行的默认分支 技术细节: 1. 条件按声明顺序 2. 第一个满足条件的分支会被执行 3. 无默认分支且所有条件不满足时抛出异常 """ ``` ##### 适合场景 * 多任务分类(如区分数学问题与物理问题) * 错误处理分支(如主链失败时调用备用链) * 多轮对话路由(根据对话历史选择回复策略) ``` # 根据对话历史选择回复策略 branch = RunnableBranch( (lambda x: "投诉" in x["history"], complaint_handler), (lambda x: "咨询" in x["history"], inquiry_handler), default_responder ) ``` * 智能路由系统(根据输入类型路由处理方式) ```python # 定义分类函数 def detect_topic(input_text): if "天气" in input_text: return "weather" elif "新闻" in input_text: return "news" else: return "general" # 构建分支链 branch_chain = RunnableBranch( (lambda x: detect_topic(x["input"]) == "weather", weather_chain), (lambda x: detect_topic(x["input"]) == "news", news_chain), general_chain ) # 执行示例 branch_chain.invoke({"input": "北京今天天气怎么样?"}) ``` ##### 案例实战 需要构建一个 **智能客服系统**,根据用户输入的请求类型自动路由到不同的处理流程: * **技术问题**:路由到技术支持链。 * **账单问题**:路由到财务链。 * **默认问题**:路由到通用问答链。 * 步骤 * 导入依赖 ```python from langchain_core.runnables import RunnableBranch, RunnableLambda from langchain_core.prompts import ChatPromptTemplate from langchain_openai import ChatOpenAI from langchain_core.output_parsers import StrOutputParser ``` * 定义模型 ```python #定义模型 model = ChatOpenAI( model_name = "qwen-plus", base_url="https://dashscope.aliyuncs.com/compatible-mode/v1", api_key="sk-005c3c25f6d042848b29d75f2f020f08", temperature=0.7 ) ``` * 定义子链 ```python # 技术支持链 tech_prompt = ChatPromptTemplate.from_template( "你是一名技术支持专家,请回答以下技术问题:{input}" ) tech_chain = tech_prompt | model | StrOutputParser() # 财务链 billing_prompt = ChatPromptTemplate.from_template( "你是一名财务专员,请处理以下账单问题:{input}" ) billing_chain = billing_prompt | model | StrOutputParser() # 默认通用链 default_prompt = ChatPromptTemplate.from_template( "你是一名客服,请回答以下问题:{input}" ) default_chain = default_prompt | model | StrOutputParser() ``` * 定义路由条件函数 ```python def is_tech_question(input: dict) -> bool: # 获取 "input" 键对应的值 input_value = input.get("input", "") # 检查是否包含关键词 return "技术" in input_value or "故障" in input_value def is_billing_question(input: dict) -> bool: # 获取 "input" 键对应的值 input_value = input.get("input", "") # 检查是否包含关键词 return "账单" in input_value or "支付" in input_value ``` * 构建 RunnableBranch ```python branch = RunnableBranch( (is_tech_question, tech_chain), # 技术问题 → tech_chain (is_billing_question, billing_chain), # 账单问题 → billing_chain default_chain # 默认问题 → default_chain ) full_chain = RunnableLambda(lambda x: {"input": x}) | branch ``` * 测试案例 ```python # 测试技术问题 tech_response = full_chain.invoke("我的账号登录失败,提示技术故障") print("技术问题响应:", tech_response) # 测试账单问题 billing_response = full_chain.invoke("我的账单金额有误,请核对") print("账单问题响应:", billing_response) # 测试默认问题 default_response = full_chain.invoke("你们公司的地址在哪里?") print("默认问题响应:", default_response) #输出示例 #技术问题响应: 建议您尝试清除浏览器缓存或重置密码。若问题持续,请联系我们的技术支持团队。 #账单问题响应: 已记录您的账单问题,财务部门将在24小时内与您联系核实。 #默认问题响应: 我们的公司地址是北京市海淀区中关村大街1号。 ``` ##### 关键原理解析 * **条件路由逻辑** * `RunnableBranch` 接收一个由 `(条件函数, Runnable)` 组成的列表。 * 按顺序检查条件,第一个满足条件的分支会被执行,若均不满足则执行默认分支 * **输入处理**: * 输入需为字典格式(如 `{"input": "问题内容"}`),通过 `RunnableLambda` 包装原始输入为字典 * **链式组合**: * 每个分支链(如 `tech_chain`)独立处理输入,输出结果直接返回给调用方 * **调试技巧**: - 添加日志中间件(通过 `RunnableLambda`)记录路由决策过程 ```python def log_decision(input_data): print(f"路由检查输入:{input_data}") return input_data log_chain_branch = RunnableLambda(log_decision) | branch full_chain = RunnableLambda(lambda x: {"input": x}) | log_chain_branch ``` ##### 总结与最佳实践 * **组合使用**:通过 `|` 串联或嵌套 `Runnable` 类,构建复杂逻辑。 * **性能优化**:利用 `RunnableParallel` 减少 IO 等待时间。 * **调试技巧**:使用 `RunnableLambda` 插入日志或数据检查点。 * **容错设计**:结合 `RunnableBranch` 和 提升健壮性
评论区