链式调用
链(Chain)是 LangChain 的核心概念之一,它允许你将多个组件串联起来,形成一个完整的处理流程。本章将深入介绍 LangChain 的链式调用机制。
什么是链
链是将多个组件按顺序组合在一起的方式,上一个组件的输出作为下一个组件的输入:
输入 → 组件1 → 组件2 → 组件3 → 输出
LCEL:LangChain 表达式语言
LCEL(LangChain Expression Language) 是 LangChain 推荐的链构建方式,使用管道操作符 | 连接组件:
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
# 创建组件
prompt = ChatPromptTemplate.from_template("翻译成英文:{text}")
llm = ChatOpenAI(model="gpt-3.5-turbo")
parser = StrOutputParser()
# 使用 | 组装链
chain = prompt | llm | parser
# 调用链
result = chain.invoke({"text": "你好,世界"})
print(result) # "Hello, World"
LCEL 的优势
- 简洁直观 - 代码清晰易读
- 支持流式 - 自动支持流式输出
- 支持异步 - 自动支持异步调用
- 支持批处理 - 自动支持批量调用
- 可观测性 - 与 LangSmith 无缝集成
基础链类型
简单链
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
prompt = ChatPromptTemplate.from_template(
"你是一个{role}。请回答:{question}"
)
llm = ChatOpenAI()
parser = StrOutputParser()
chain = prompt | llm | parser
result = chain.invoke({
"role": "Python专家",
"question": "什么是列表推导式?"
})
多步骤链
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI
llm = ChatOpenAI()
# 第一步:生成大纲
outline_prompt = ChatPromptTemplate.from_template(
"为主题'{topic}'生成一个简短的文章大纲(3-5个要点)"
)
outline_chain = outline_prompt | llm | StrOutputParser()
# 第二步:根据大纲写文章
article_prompt = ChatPromptTemplate.from_template(
"""根据以下大纲写一篇短文:
大纲:
{outline}
请写一篇300字左右的文章。"""
)
article_chain = article_prompt | llm | StrOutputParser()
# 组合
def write_article(topic: str) -> str:
outline = outline_chain.invoke({"topic": topic})
article = article_chain.invoke({"outline": outline})
return article
result = write_article("人工智能的未来")
print(result)
RunnablePassthrough 和 RunnableLambda
RunnablePassthrough
传递输入或添加额外数据:
from langchain_core.runnables import RunnablePassthrough
# 直接传递
chain = RunnablePassthrough() | llm
# 添加额外数据
chain = {
"question": RunnablePassthrough(),
"context": lambda _: "这是一些背景信息"
} | prompt | llm
RunnableLambda
将任意函数转换为 Runnable:
from langchain_core.runnables import RunnableLambda
def preprocess(text: str) -> str:
return text.strip().lower()
def postprocess(response) -> dict:
return {
"content": response.content,
"length": len(response.content)
}
chain = (
RunnableLambda(preprocess)
| prompt
| llm
| RunnableLambda(postprocess)
)
并行执行
RunnableParallel
同时执行多个链:
from langchain_core.runnables import RunnableParallel
# 定义多个子链
positive_chain = ChatPromptTemplate.from_template(
"列出{topic}的3个优点"
) | llm | StrOutputParser()
negative_chain = ChatPromptTemplate.from_template(
"列出{topic}的3个缺点"
) | llm | StrOutputParser()
summary_chain = ChatPromptTemplate.from_template(
"用一句话总结{topic}"
) | llm | StrOutputParser()
# 并行执行
parallel_chain = RunnableParallel(
pros=positive_chain,
cons=negative_chain,
summary=summary_chain
)
result = parallel_chain.invoke({"topic": "远程工作"})
print(result["pros"])
print(result["cons"])
print(result["summary"])
使用字典简写
# 更简洁的写法
chain = {
"pros": positive_chain,
"cons": negative_chain,
"summary": summary_chain
} | RunnableLambda(format_output)
条件分支
RunnableBranch
根据条件选择不同的执行路径:
from langchain_core.runnables import RunnableBranch
# 定义不同的处理链
code_chain = ChatPromptTemplate.from_template(
"你是编程专家。请回答这个编程问题:{input}"
) | llm | StrOutputParser()
math_chain = ChatPromptTemplate.from_template(
"你是数学专家。请解决这个数学问题:{input}"
) | llm | StrOutputParser()
general_chain = ChatPromptTemplate.from_template(
"请回答这个问题:{input}"
) | llm | StrOutputParser()
# 分类器
def classify(input_dict):
text = input_dict["input"].lower()
if any(word in text for word in ["代码", "编程", "python", "函数"]):
return "code"
elif any(word in text for word in ["数学", "计算", "方程", "求解"]):
return "math"
return "general"
# 创建分支
branch = RunnableBranch(
(lambda x: classify(x) == "code", code_chain),
(lambda x: classify(x) == "math", math_chain),
general_chain # 默认分支
)
# 测试
print(branch.invoke({"input": "写一个Python排序函数"}))
print(branch.invoke({"input": "求解方程 x^2 = 16"}))
print(branch.invoke({"input": "今天天气怎么样"}))
链的配置
bind 方法
为链绑定固定参数:
# 绑定模型参数
chain = prompt | llm.bind(temperature=0, max_tokens=100)
# 绑定输出格式
json_chain = prompt | llm.bind(
response_format={"type": "json_object"}
)
with_config 方法
# 添加配置
chain = prompt | llm
result = chain.with_config(
run_name="my_chain",
tags=["production"],
metadata={"version": "1.0"}
).invoke({"input": "测试"})
with_retry 方法
自动重试失败的调用:
chain_with_retry = (prompt | llm).with_retry(
stop_after_attempt=3,
wait_exponential_jitter=True
)
链的调用方式
同步调用
# 单次调用
result = chain.invoke({"input": "你好"})
# 批量调用
results = chain.batch([
{"input": "问题1"},
{"input": "问题2"},
{"input": "问题3"}
])
# 流式调用
for chunk in chain.stream({"input": "讲个故事"}):
print(chunk, end="", flush=True)
异步调用
import asyncio
async def async_example():
# 异步单次调用
result = await chain.ainvoke({"input": "你好"})
# 异步批量调用
results = await chain.abatch([
{"input": "问题1"},
{"input": "问题2"}
])
# 异步流式调用
async for chunk in chain.astream({"input": "讲个故事"}):
print(chunk, end="", flush=True)
asyncio.run(async_example())
输出解析器
StrOutputParser
解析为字符串:
from langchain_core.output_parsers import StrOutputParser
chain = prompt | llm | StrOutputParser()
result = chain.invoke({"input": "你好"})
print(type(result)) # <class 'str'>
JsonOutputParser
解析为 JSON:
from langchain_core.output_parsers import JsonOutputParser
from pydantic import BaseModel, Field
class Person(BaseModel):
name: str = Field(description="姓名")
age: int = Field(description="年龄")
parser = JsonOutputParser(pydantic_object=Person)
prompt = ChatPromptTemplate.from_template(
"""生成一个虚构人物的信息。
{format_instructions}
人物描述:{description}"""
).partial(format_instructions=parser.get_format_instructions())
chain = prompt | llm | parser
result = chain.invoke({"description": "一个年轻的程序员"})
print(result) # {"name": "张三", "age": 28}
CommaSeparatedListOutputParser
解析为列表:
from langchain_core.output_parsers import CommaSeparatedListOutputParser
parser = CommaSeparatedListOutputParser()
prompt = ChatPromptTemplate.from_template(
"列出5种常见的编程语言,用逗号分隔:\n{format_instructions}"
).partial(format_instructions=parser.get_format_instructions())
chain = prompt | llm | parser
result = chain.invoke({})
print(result) # ['Python', 'JavaScript', 'Java', 'C++', 'Go']
完整示例:文章处理流水线
"""
文章处理流水线 - 链式调用示例
"""
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser, JsonOutputParser
from langchain_core.runnables import RunnableParallel, RunnableLambda
from pydantic import BaseModel, Field
from dotenv import load_dotenv
load_dotenv()
# 定义输出结构
class ArticleAnalysis(BaseModel):
title: str = Field(description="建议的标题")
summary: str = Field(description="文章摘要")
keywords: list[str] = Field(description="关键词列表")
sentiment: str = Field(description="情感倾向:正面/负面/中性")
def create_article_pipeline():
"""创建文章处理流水线"""
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.3)
# 1. 内容分析链
analysis_parser = JsonOutputParser(pydantic_object=ArticleAnalysis)
analysis_prompt = ChatPromptTemplate.from_template(
"""分析以下文章内容:
{article}
请按以下格式输出分析结果:
{format_instructions}"""
).partial(format_instructions=analysis_parser.get_format_instructions())
analysis_chain = analysis_prompt | llm | analysis_parser
# 2. 改写链
rewrite_prompt = ChatPromptTemplate.from_template(
"""将以下文章改写得更加简洁易懂,保持核心信息:
原文:
{article}
改写后的文章(200字以内):"""
)
rewrite_chain = rewrite_prompt | llm | StrOutputParser()
# 3. 翻译链
translate_prompt = ChatPromptTemplate.from_template(
"""将以下中文文章翻译成英文:
{article}
英文翻译:"""
)
translate_chain = translate_prompt | llm | StrOutputParser()
# 并行处理
parallel_chain = RunnableParallel(
analysis=analysis_chain,
rewrite=rewrite_chain,
translation=translate_chain
)
# 格式化最终输出
def format_result(result: dict) -> dict:
return {
"原文分析": result["analysis"],
"简化版本": result["rewrite"],
"英文翻译": result["translation"]
}
return parallel_chain | RunnableLambda(format_result)
def main():
pipeline = create_article_pipeline()
article = """
人工智能(AI)正在深刻改变我们的生活方式。从智能手机中的语音助手,
到推荐系统为我们挑选内容,AI 已经无处不在。机器学习作为 AI 的核心技术,
使计算机能够从数据中学习并不断改进。随着大语言模型的出现,AI 的能力
更是达到了前所未有的高度,它们可以理解和生成人类语言,完成各种复杂任务。
"""
print("正在处理文章...\n")
result = pipeline.invoke({"article": article})
print("=" * 50)
print("【原文分析】")
print(f"建议标题:{result['原文分析']['title']}")
print(f"摘要:{result['原文分析']['summary']}")
print(f"关键词:{', '.join(result['原文分析']['keywords'])}")
print(f"情感倾向:{result['原文分析']['sentiment']}")
print("\n" + "=" * 50)
print("【简化版本】")
print(result["简化版本"])
print("\n" + "=" * 50)
print("【英文翻译】")
print(result["英文翻译"])
if __name__ == "__main__":
main()
小结
本章介绍了:
✅ LCEL 链式表达语言
✅ RunnablePassthrough 和 RunnableLambda
✅ RunnableParallel 并行执行
✅ RunnableBranch 条件分支
✅ 链的配置和重试机制
✅ 各种输出解析器
✅ 完整的链式调用示例
下一步
掌握链式调用后,让我们学习记忆功能,让 AI 应用能够记住对话历史。
练习
- 创建一个多步骤的内容创作流水线
- 实现一个带有条件分支的智能路由系统
- 使用 JsonOutputParser 解析结构化数据
- 比较同步和异步调用的性能差异